feat: add namespace enforcement and API key requirement to client operations

This commit is contained in:
anonpenguin 2025-08-16 18:12:08 +03:00
parent 66cdf130fa
commit 17f72390c3
10 changed files with 363 additions and 914 deletions

629
PLAN.md
View File

@ -1,629 +0,0 @@
# DeBros Network Gateway Implementation Plan
## Overview
This document outlines the phased implementation plan for the DeBros Network Gateway system, which provides HTTP/gRPC interfaces for non-Go clients to access network features like pub-sub, RQLite database, and storage through Ethereum wallet-based authentication and subscription models.
## Architecture Summary
- Separate `cmd/gateway` binary (not embedded in node)
- HTTP endpoints by default, optional gRPC support
- WebSocket support for pub-sub subscriptions
- Core RQLite database for gateway operational data
- Ethereum wallet-based authentication
- Multi-tenant namespace isolation
- Subscription-based payment model
---
## Phase 1: Basic Gateway Foundation (Week 1)
### Objective
Create the core gateway structure without authentication - a working HTTP proxy to the network.
### Step 1.1: Gateway Skeleton
**Files to create:**
```
cmd/gateway/main.go
pkg/gateway/config/config.go
pkg/gateway/server/server.go
```
**Implementation:**
- Basic HTTP server setup with graceful shutdown
- Configuration loading (port, network client settings)
- Health check endpoint (`/health`)
- Signal handling for SIGTERM/SIGINT
- Structured logging integration
### Step 1.2: Network Client Integration
**Files to create:**
```
pkg/gateway/client/network.go
pkg/gateway/client/pool.go
```
**Implementation:**
- Initialize network client connection using existing `pkg/client`
- Connection management and pooling
- Basic error handling and retries
- Health monitoring of network connections
### Step 1.3: Basic HTTP Handlers (No Auth)
**Files to create:**
```
pkg/gateway/handlers/health.go
pkg/gateway/handlers/storage.go
pkg/gateway/handlers/network.go
pkg/gateway/middleware/cors.go
pkg/gateway/middleware/logging.go
```
**Implementation:**
- Health check endpoint
- Basic storage GET/PUT (pass-through to network)
- Network status endpoint
- CORS middleware for web clients
- Request/response logging middleware
### Deliverables Phase 1
- [ ] Working gateway that can proxy basic requests to the network
- [ ] `/health` and `/status` endpoints functional
- [ ] Basic storage operations working without auth
- [ ] Proper error handling and logging
---
## Phase 2: Core Database & Models (Week 1-2)
### Objective
Set up the foundation database schema and models for authentication and multi-tenancy.
### Step 2.1: Database Setup
**Files to create:**
```
migrations/001_initial.sql
migrations/002_indexes.sql
pkg/gateway/db/migrations.go
```
**Implementation:**
```sql
-- Core tables
CREATE TABLE apps (id, namespace, wallet_address, created_at, updated_at);
CREATE TABLE namespaces (id, name, owner_wallet, created_at);
CREATE TABLE api_keys (id, app_id, key_hash, created_at, last_used);
CREATE TABLE audit_events (id, namespace, action, resource, timestamp);
CREATE TABLE nonces (wallet_address, nonce, expires_at);
CREATE TABLE refresh_tokens (id, app_id, token_hash, expires_at);
```
### Step 2.2: Database Access Layer
**Files to create:**
```
pkg/gateway/db/connection.go
pkg/gateway/db/models.go
pkg/gateway/db/queries.go
pkg/gateway/db/migrate.go
```
**Implementation:**
- Database connection management
- Model structs for all tables
- CRUD operations for each model
- Migration runner and version tracking
### Step 2.3: Namespace Management
**Files to create:**
```
pkg/gateway/namespace/manager.go
pkg/gateway/namespace/validator.go
pkg/gateway/namespace/errors.go
```
**Implementation:**
- Namespace CRUD operations
- Validation rules (naming, uniqueness)
- Ownership verification
- Namespace reservation system
### Deliverables Phase 2
- [ ] Database schema deployed and versioned
- [ ] Basic CRUD operations for apps and namespaces
- [ ] Migration system working
- [ ] Namespace management API
---
## Phase 3: Ethereum Wallet Authentication (Week 2)
### Objective
Implement the core Ethereum wallet-based authentication system.
### Step 3.1: Wallet Signature Verification
**Files to create:**
```
pkg/gateway/auth/ethereum.go
pkg/gateway/auth/nonce.go
pkg/gateway/auth/signature.go
```
**Implementation:**
- Message signing/verification using secp256k1
- Nonce generation and management (prevent replay attacks)
- Address recovery from signatures
- EIP-191 message formatting
### Step 3.2: JWT Token System
**Files to create:**
```
pkg/gateway/auth/jwt.go
pkg/gateway/auth/claims.go
pkg/gateway/auth/refresh.go
```
**Implementation:**
- JWT token generation with namespace claims
- Token validation middleware
- Refresh token implementation
- Token blacklisting for logout
### Step 3.3: Authentication Endpoints
**Files to create:**
```
pkg/gateway/handlers/auth.go
pkg/gateway/middleware/auth.go
```
**Endpoints:**
- `POST /v1/auth/nonce` - Get signing nonce
- `POST /v1/auth/verify` - Verify signature and get JWT
- `POST /v1/auth/refresh` - Refresh JWT token
- `POST /v1/auth/logout` - Invalidate tokens
### Deliverables Phase 3
- [ ] Working Ethereum wallet authentication
- [ ] JWT tokens with namespace claims
- [ ] Session management with refresh tokens
- [ ] Secure logout functionality
---
## Phase 4: Namespace Isolation & Security (Week 2-3)
### Objective
Implement strict multi-tenant security with complete namespace isolation.
### Step 4.1: Namespace Enforcement Middleware
**Files to create:**
```
pkg/gateway/middleware/namespace.go
pkg/gateway/middleware/ownership.go
pkg/gateway/security/validator.go
```
**Implementation:**
- Extract namespace from JWT claims
- Validate namespace ownership against database
- Inject namespace into request context
- Block cross-namespace access attempts
### Step 4.2: Resource Prefixing
**Files to create:**
```
pkg/gateway/isolation/storage.go
pkg/gateway/isolation/pubsub.go
pkg/gateway/isolation/database.go
pkg/gateway/isolation/keys.go
```
**Implementation:**
- Storage keys: `ns::<namespace>::<key>`
- PubSub topics: `<namespace>.<topic>`
- Database tables: `ns__<namespace>__tablename`
- Consistent prefixing across all resources
### Step 4.3: Secure Handlers Update
**Files to update:**
```
pkg/gateway/handlers/storage.go
pkg/gateway/handlers/pubsub.go
pkg/gateway/handlers/database.go
```
**Implementation:**
- All handlers updated to use namespace isolation
- Resource access validation
- Audit logging for all operations
### Deliverables Phase 4
- [ ] All operations namespace-isolated
- [ ] Cross-namespace access prevented and logged
- [ ] Security tests passing
- [ ] Audit trail for all resource access
---
## Phase 5: Complete API Implementation (Week 3)
### Objective
Implement all remaining REST and WebSocket endpoints.
### Step 5.1: Storage API
**Files to create/update:**
```
pkg/gateway/handlers/storage.go
pkg/gateway/api/storage.go
```
**Endpoints:**
- `GET /v1/storage/:key` - Get value
- `PUT /v1/storage/:key` - Set value
- `DELETE /v1/storage/:key` - Delete key
- `GET /v1/storage` - List keys with prefix filter
### Step 5.2: PubSub API with WebSockets
**Files to create:**
```
pkg/gateway/handlers/pubsub.go
pkg/gateway/websocket/manager.go
pkg/gateway/websocket/subscriber.go
```
**Endpoints:**
- `POST /v1/pubsub/publish` - Publish message
- `WebSocket /v1/pubsub/subscribe` - Real-time subscriptions
- `GET /v1/pubsub/topics` - List topics
- `GET /v1/pubsub/subscriptions` - List active subscriptions
### Step 5.3: Database API
**Files to create:**
```
pkg/gateway/handlers/database.go
pkg/gateway/api/database.go
```
**Endpoints:**
- `POST /v1/db/query` - Execute SELECT queries
- `POST /v1/db/execute` - Execute INSERT/UPDATE/DELETE
- `POST /v1/db/batch` - Execute multiple statements
- `GET /v1/db/tables` - List namespace tables
- `POST /v1/db/migrate` - Run schema migrations
### Deliverables Phase 5
- [ ] All CRUD operations working with namespace isolation
- [ ] WebSocket subscriptions functional and secure
- [ ] Database operations isolated per namespace
- [ ] API documentation generated
---
## Phase 6: Rate Limiting & Quotas (Week 4)
### Objective
Add usage controls, monitoring, and tier-based quotas.
### Step 6.1: Rate Limiter Implementation
**Files to create:**
```
pkg/gateway/ratelimit/limiter.go
pkg/gateway/ratelimit/middleware.go
pkg/gateway/ratelimit/storage.go
pkg/gateway/ratelimit/config.go
```
**Implementation:**
- Token bucket algorithm implementation
- Per-namespace rate limiting
- Redis backend for distributed limiting
- Configurable limits per endpoint
### Step 6.2: Usage Tracking
**Files to create:**
```
pkg/gateway/usage/tracker.go
pkg/gateway/usage/quotas.go
pkg/gateway/usage/metrics.go
pkg/gateway/usage/reporter.go
```
**Implementation:**
- Track API calls per namespace
- Monitor resource usage (storage, database queries)
- Export Prometheus metrics
- Daily/monthly usage reports
### Step 6.3: Tier Enforcement
**Files to create:**
```
pkg/gateway/middleware/tier.go
pkg/gateway/subscription/tiers.go
pkg/gateway/subscription/limits.go
```
**Tier Limits:**
- **Free**: 250 RPM, 10k requests/day
- **Basic**: 1000 RPM, 100k requests/day, 100MB storage
- **Pro**: 5000 RPM, 1M requests/day, 1GB storage
- **Elite**: Unlimited RPM, 10M requests/day, 10GB storage
### Deliverables Phase 6
- [ ] Rate limiting active and configurable
- [ ] Usage tracking in database
- [ ] Tier-based quotas enforced
- [ ] Metrics exported for monitoring
---
## Phase 7: Payment & Subscription System (Week 4-5)
### Objective
Implement the Ethereum-based payment and subscription system.
### Step 7.1: Smart Contract Integration
**Files to create:**
```
pkg/gateway/blockchain/client.go
pkg/gateway/blockchain/contracts.go
pkg/gateway/blockchain/verifier.go
pkg/gateway/blockchain/events.go
```
**Implementation:**
- Ethereum client setup (mainnet + testnet)
- Payment verification smart contracts
- Event listening for payments
- Transaction verification
### Step 7.2: Subscription Management
**Files to create:**
```
pkg/gateway/subscription/manager.go
pkg/gateway/subscription/validator.go
pkg/gateway/subscription/renewal.go
pkg/gateway/subscription/pricing.go
```
**Pricing:**
- **Basic**: 0.1 ETH/month
- **Pro**: 0.2 ETH/month
- **Elite**: 0.3 ETH/month
- **Testnet**: Free for testing
### Step 7.3: Payment Endpoints
**Files to create:**
```
pkg/gateway/handlers/payments.go
pkg/gateway/handlers/subscriptions.go
```
**Endpoints:**
- `POST /v1/payments/subscribe` - Initiate subscription
- `GET /v1/payments/status` - Check payment status
- `POST /v1/payments/verify` - Verify blockchain payment
- `GET /v1/subscriptions/current` - Get subscription details
- `POST /v1/subscriptions/cancel` - Cancel subscription
### Deliverables Phase 7
- [ ] Payment verification working on mainnet/testnet
- [ ] Subscription status tracking
- [ ] Automatic tier application based on payments
- [ ] Payment event monitoring
---
## Phase 8: Testing & Hardening (Week 5)
### Objective
Comprehensive testing, security auditing, and performance optimization.
### Step 8.1: Integration Tests
**Files to create:**
```
tests/integration/auth_test.go
tests/integration/namespace_test.go
tests/integration/api_test.go
tests/integration/payments_test.go
tests/security/isolation_test.go
```
**Test Coverage:**
- Full API test suite
- Cross-namespace security tests
- Rate limit and quota tests
- Payment flow tests
- WebSocket connection tests
### Step 8.2: Load Testing
**Files to create:**
```
tests/load/k6_scripts/
tests/load/websocket_stress.js
tests/load/api_concurrent.js
```
**Testing:**
- Concurrent user simulations
- WebSocket stress testing
- Database connection pooling tests
- Rate limiter performance tests
### Step 8.3: Security Audit
**Files to create:**
```
docs/security_audit.md
tests/security/penetration_tests.go
```
**Security Checks:**
- Input validation on all endpoints
- SQL injection prevention
- Rate limit bypass attempts
- JWT security verification
- Cross-namespace isolation verification
### Deliverables Phase 8
- [ ] 80%+ test coverage across all components
- [ ] Load test results and performance benchmarks
- [ ] Security audit report with findings
- [ ] Performance optimization recommendations
---
## Quick Start Implementation Order
For immediate progress, implement in this exact order:
### Day 1-2: Minimal Gateway
1. Create `cmd/gateway/main.go` with basic HTTP server
2. Add health check endpoint (`/health`)
3. Connect to network using existing `pkg/client`
4. Test basic connectivity
### Day 3-4: Database Foundation
1. Create migration files with core tables
2. Setup database connection and models
3. Add app registration endpoint (no auth yet)
4. Test database operations
### Day 5-7: Basic Authentication
1. Implement nonce generation and storage
2. Add Ethereum wallet signature verification
3. Create JWT token system
4. Add authentication middleware
### Week 2: Core Security Features
1. Add namespace isolation middleware
2. Implement resource prefixing
3. Update handlers for namespace isolation
4. Add basic rate limiting
### Week 3: Complete API
1. Implement all storage endpoints
2. Add WebSocket support for pub-sub
3. Complete database API
4. Add comprehensive error handling
### Week 4: Production Ready
1. Add usage tracking and quotas
2. Implement tier-based limiting
3. Add payment verification
4. Complete subscription management
### Week 5: Testing & Launch
1. Write integration tests
2. Perform security testing
3. Load testing and optimization
4. Documentation and deployment
---
## File Structure Overview
```
cmd/gateway/
├── main.go # Entry point
└── config.yaml # Configuration
pkg/gateway/
├── server/
│ ├── server.go # HTTP server setup
│ └── routes.go # Route definitions
├── config/
│ └── config.go # Configuration management
├── db/
│ ├── connection.go # Database connection
│ ├── models.go # Data models
│ ├── queries.go # SQL queries
│ └── migrations.go # Migration runner
├── auth/
│ ├── ethereum.go # Wallet authentication
│ ├── jwt.go # JWT handling
│ └── nonce.go # Nonce management
├── handlers/
│ ├── auth.go # Auth endpoints
│ ├── storage.go # Storage API
│ ├── pubsub.go # PubSub API
│ ├── database.go # Database API
│ └── payments.go # Payment API
├── middleware/
│ ├── auth.go # Authentication
│ ├── namespace.go # Namespace isolation
│ ├── ratelimit.go # Rate limiting
│ └── cors.go # CORS handling
├── isolation/
│ ├── storage.go # Storage isolation
│ ├── pubsub.go # PubSub isolation
│ └── database.go # Database isolation
├── subscription/
│ ├── manager.go # Subscription management
│ └── tiers.go # Tier definitions
├── blockchain/
│ ├── client.go # Ethereum client
│ └── verifier.go # Payment verification
└── websocket/
├── manager.go # WebSocket management
└── subscriber.go # PubSub subscriptions
migrations/
├── 001_initial.sql # Initial schema
├── 002_indexes.sql # Performance indexes
└── 003_payments.sql # Payment tables
tests/
├── integration/ # Integration tests
├── security/ # Security tests
└── load/ # Load tests
docs/
├── api.md # API documentation
├── security.md # Security guidelines
└── deployment.md # Deployment guide
```
---
## Success Metrics
### Phase 1 Success Criteria
- [ ] Gateway starts and connects to network
- [ ] Health endpoint returns 200 OK
- [ ] Basic storage operations work
### Phase 2 Success Criteria
- [ ] Database migrations run successfully
- [ ] CRUD operations work for all models
- [ ] Namespace management functional
### Phase 3 Success Criteria
- [ ] Ethereum wallet authentication works
- [ ] JWT tokens generated and validated
- [ ] Session management operational
### Phase 4 Success Criteria
- [ ] Cross-namespace access blocked
- [ ] All resources properly isolated
- [ ] Security tests pass
### Phase 5 Success Criteria
- [ ] All API endpoints functional
- [ ] WebSocket subscriptions work
- [ ] Complete feature parity with direct client
### Phase 6 Success Criteria
- [ ] Rate limiting enforced
- [ ] Usage tracking accurate
- [ ] Tier limits respected
### Phase 7 Success Criteria
- [ ] Payment verification works
- [ ] Subscription management complete
- [ ] Automatic tier upgrades/downgrades
### Phase 8 Success Criteria
- [ ] 80%+ test coverage
- [ ] Security audit passed
- [ ] Load testing completed
- [ ] Production deployment ready
---
This implementation plan provides a clear roadmap from basic gateway functionality to a production-ready, secure, multi-tenant system with Ethereum-based payments and comprehensive API coverage.

395
TASK.md
View File

@ -1,298 +1,135 @@
# DeBros Network — Gateway, Auth & Staking TASKS # Task: Enforce API Key/JWT and Namespace in Go Client (Auto-Resolve Namespace) and Guard All Operations
This document captures the plan, endpoints, auth/staking model, data layout, security hardening, and an implementation roadmap for the new `gateway` service. It is a single-source checklist to turn the ideas discussed into working code and infra. Owner: To be assigned
Status: Ready to implement
Goals ## Objective
- Provide a standalone `cmd/gateway` binary that exposes HTTP (default) and optional gRPC to allow non-Go clients (JS/Swift/etc.) to access Database, Storage, PubSub and Network features. Implement strict client-side access enforcement in the Go client (`pkg/client`) so that:
- Authenticate and authorize apps via wallet-based verification and on-chain staking / NFT attestation. - An API key or JWT is required by default to use the client.
- Issue short-lived access tokens (JWT) + rotating refresh tokens + optional API keys for server apps. - The client auto-resolves the namespace from the provided API key or JWT without requiring callers to pass the namespace per call.
- Use staking or NFT ownership to grant higher rate limits and scopes. - Per-call namespace overrides via context are still allowed for compatibility, but must match the resolved namespace; otherwise, deny the call.
- Keep node processes and gateway process separable for scaling, security and reliability. - All operations (Storage, PubSub, Database/RQLite, and NetworkInfo) are guarded and return access errors when unauthenticated or namespace-mismatched.
- Store gateway/core metadata in a dedicated `core` RQLite database to avoid mixing runtime app data with cluster DB. - No backward compatibility guarantees required.
High-level architecture Note: This is client-side enforcement for now. Protocol-level auth/ACL for libp2p can be added later.
- `cmd/gateway` (new binary)
- HTTP server (REST + WebSocket)
- Optional gRPC server
- Bridge layer that calls `pkg/client` (NetworkClient) to interact with the network
- Auth & staking modules, token manager, rate-limiter, background chain watcher
- `pkg/gateway` packages
- `bridge` — adapters to call `client` methods
- `http` — REST handlers & middleware
- `ws` — WebSocket pubsub broker
- `auth` — challenge, register, JWT + refresh token handling
- `payments` — payment adapters, payment verification, subscription state
- `rate` — Redis-backed token-bucket / quota manager
- `db` — gateway schema migrations / helper for `core` DB
- Persistence
- `core` RQLite database (separate from application DBs) stores apps, stakes, tokens and nonces
- Redis for rate-limiting and ephemeral session state (optional fallback to in-memory for dev)
- Payment adapters
- Ethereum (EVM) JSON-RPC adapter (support for mainnet + testnets such as Goerli)
- Abstract interface allows adding other chains later if desired
Endpoints (HTTP + WebSocket) — MVP (no admin endpoints) ## High-level behavior
- General - `ClientConfig.RequireAPIKey` defaults to true. If true and neither `APIKey` nor `JWT` is present, `Connect()` fails.
- GET /v1/health - Namespace is automatically derived:
- GET /v1/version - From JWT: parse claims and read `Namespace` claim (no network roundtrip). Verification of signature is not required for this task; parsing is enough to derive namespace. Optionally, add a TODO hook for future verification against JWKS if provided.
- Network - From API key: the namespace must be embedded in the key using a documented format (below). The client parses it locally and derives the namespace without any remote calls.
- GET /v1/network/peers - All calls check that any provided per-call namespace override matches the derived namespace, else return an “access denied: namespace mismatch” error.
- GET /v1/network/status - All modules are guarded: Database (RQLite), Storage, PubSub, and NetworkInfo.
- POST /v1/network/connect { multiaddr }
- POST /v1/network/disconnect { peer_id }
- Database
- POST /v1/db/query { sql, params?, timeout? } (enforce scopes)
- POST /v1/db/transaction { queries: [] }
- GET /v1/db/schema
- POST /v1/db/create-table { sql } (admin / gated)
- Storage
- GET /v1/storage/get?key=&namespace=
- POST /v1/storage/put (binary body or JSON base64) ?key=&namespace=
- DELETE /v1/storage/delete { key, namespace }
- GET /v1/storage/list?prefix=&limit=&namespace=
- GET /v1/storage/exists?key=&namespace=
- Pub/Sub
- POST /v1/pubsub/publish { topic, data(base64|raw), namespace?, ttl? }
- GET /v1/pubsub/topics?namespace=
- WS /v1/pubsub/ws (subscribe/unsubscribe/publish over WS frames)
- SSE /v1/pubsub/sse?topic=... (optional read-only)
- Auth & App onboarding
- POST /v1/auth/challenge { wallet, wallet_type, app_name, metadata? }
- Response: { challenge, expires_in }
- POST /v1/auth/register { wallet, wallet_type, challenge, signature, app_name, metadata? }
- On success: create provisional App and return client_id (status: pending or active per flow)
- POST /v1/auth/refresh { client_id, refresh_token } -> new access_token
- GET /v1/auth/whoami (protected)
- Staking (on-chain)
- POST /v1/stake/info { client_id } -> returns required stake, contract address, memo format
- POST /v1/stake/commit { client_id, chain, tx_signature } -> verify on-chain, update stake
- GET /v1/stake/status?client_id=...
- POST /v1/stake/unstake { client_id } -> returns steps and marks pending_unstake
Authentication & authorization model (recommended MVP) ## API key and JWT formats
- App registration: - JWT: RS256 token with claim `Namespace` (string). We will parse claims (unverified) to obtain `Namespace`.
1. Client obtains ephemeral `challenge` for a wallet. - API key: change to an encoded format that includes the namespace so the client can parse locally. Options (pick one and implement consistently):
2. Client signs `challenge` with wallet and calls `/v1/auth/register`. - Option A (dotted): `ak_<random>.<namespace>`
3. Gateway verifies signature and creates `app` record (status pending or active). - Option B (colon): `ak_<random>:<namespace>`
- App activation: - Option C (base64 JSON): base64url of `{ "kid": "...", "ns": "<namespace>" }` prefixed by `ak_`
- NFT path: if the wallet holds qualifying NFT(s), gateway verifies and activates the app.
- Staking path: gateway asks you to stake tokens to a staking contract with a memo bound to `client_id`. After verifying the on-chain tx, gateway activates the app and assigns a tier.
- Tokens:
- Issue short-lived JWT access tokens (e.g., 15m) signed by gateway private key (RS256 or ES256). Publish JWKS at `/.well-known/jwks.json`.
- Issue rotating refresh tokens (keep hashed in `core` DB).
- Optionally issue API keys (hashed) for server-to-server use (longer TTL, revokable).
- JWT claims:
- iss, sub (client_id), aud, exp, iat, jti
- namespace, wallet, wallet_type, scopes, stake_amount, stake_chain, tier
- Scopes:
- `storage:read`, `storage:write`, `pubsub:publish`, `pubsub:subscribe`, `db:read`, `db:write`
- Enforce scopes in middleware for each endpoint.
Payment / Subscription model For simplicity and readability, choose Option B: `ak_<random>:<namespace>`.
- Pricing & plans (Ethereum-based monthly payments) - Parsing rules:
- The gateway requires paid subscriptions to use the network. Example starter plans: - If `APIKey` contains a single colon, split and use the right side as `namespace` (trim spaces). If empty -> error.
- Basic: 0.1 ETH / month -> default quota (e.g., 1,000 RPM) - If more than one colon or invalid format -> error.
- Pro: 0.2 ETH / month -> higher quota (e.g., 5,000 RPM)
- Elite: 0.3 ETH / month -> top quota (e.g., 50,000 RPM)
- Plans are configurable and billed per subscription period (monthly by default).
- Payment verification mode:
- Transaction-proof commit (MVP): the user pays the gateway's billing address or staking/payment contract on Ethereum. The payment transaction must include a memo/metadata field or be directed to a payment endpoint structured to identify the `client_id`. The gateway verifies the transaction on-chain (via JSON-RPC) and marks the subscription active for the paid period.
- Event-driven contract listening (optional): deploy a simple subscription contract that emits `PaymentMade(client_id, wallet, plan, amount, tx)` events — gateway listens and reconciles subscriptions automatically.
- Testnet support:
- Support Ethereum testnets (e.g., Goerli) for testing flows without spending real ETH. Gateway config must allow testnet mode and separate testnet payment address/contract.
- Billing cycle & renewal:
- When a payment is verified, set subscription validity for the plan period (e.g., 30 days). The gateway should notify (via webhook or SDK callback) before expiration and support manual or automated renewal (client submits another payment).
- If payment is missed or subscription expires, downgrade quotas to the free/default plan or suspend access depending on policy.
- Confirmation requirements:
- Require configurable confirmation counts before marking a payment as final (example: 12 confirmations for Ethereum mainnet; lower for testnet in dev).
- Refunds & dispute handling:
- Gateway should define a refund/dispute policy (manual or automated) — out of scope for MVP but planned.
Database & storage plan ## Changes to implement
- Use a separate RQLite logical DB called `core` to store gateway metadata. Rationale:
- Avoid mixing application data with gateway operational metadata.
- Easier backups / migrations for gateway-only state.
- `core` schema (sketch)
- `apps`:
- id UUID, client_id TEXT (unique), namespace TEXT, wallet_pubkey TEXT, wallet_type TEXT, scopes JSON, status TEXT, metadata JSON, created_at, updated_at
- `nonces`:
- nonce TEXT PK, wallet_pubkey TEXT, created_at, expires_at, used BOOL, ip_addr TEXT
- `subscriptions`:
- id UUID, app_id FK, chain TEXT, plan TEXT, amount_paid NUMERIC, tx_signature TEXT, confirmed BOOL, confirmations INT, period_start TIMESTAMP, period_end TIMESTAMP, auto_renew BOOL, testnet BOOL, created_at, updated_at
- `refresh_tokens`:
- jti TEXT PK, client_id FK, hashed_token TEXT, expires_at TIMESTAMP, revoked BOOL, created_at
- `api_keys`:
- id UUID, app_id, hashed_key TEXT, description, created_at, revoked BOOL
- `audit_events`:
- id UUID, app_id nullable, event_type TEXT, details JSON, created_at
- Access patterns:
- Gateway writes to `core`; background watcher verifies payments/subscriptions and writes audit events.
- When validating JWTs, check `jti` blacklist and optionally verify `apps.status` and `subscriptions` validity.
Rate-limiting architecture ### 1) Client configuration and types
- Use Redis token-bucket per `client_id` for production. Fallback to in-memory limiter for dev. - File: `pkg/client/interface.go`
- Keyed by `client_id`; token bucket parameters derived from subscription plan or default (free) plan. - Extend `ClientConfig`:
- Quota update flow: - `Namespace string` // optional; if empty, auto-derived from API key or JWT; if still empty, fallback to `AppName`.
- When subscription status changes (payment commit / expiration / renewal), background worker recalculates plan quotas and sets new token-bucket capacity in Redis. - `RequireAPIKey bool` // default true; when true, require either `APIKey` or `JWT`.
- Middleware consumes tokens on request; return 429 when exhausted. - `JWT string` // optional bearer token; used for namespace derivation and future protocol auth.
- Consider endpoint-specific quota (DB-write quotas smaller than read/storage). - Update `DefaultClientConfig(appName string)` to set:
- `RequireAPIKey: true`
- `Namespace: ""` (meaning auto)
Background workers & payment watcher ### 2) Namespace resolution and access gating
- Background service tasks: - File: `pkg/client/client.go`
- Payment watcher: listen to payment contract events or poll transactions; verify payments, mark confirmations, and update `subscriptions`. - At construction or `Connect()` time:
- Token revocation worker: expire revoked tokens from cache; enforce `jti` blacklist. - Implement `deriveNamespace()`:
- Quota reconciler: push new quotas to Redis after subscription changes (payment/expiration/renewal). - If `config.Namespace != ""`, use it.
- Audit logger: persist critical events to `core.audit_events`. - Else if `config.JWT != ""`, parse JWT claims (unverified) and read `Namespace` claim.
- Else if `config.APIKey != ""`, parse `ak_<random>:<namespace>` and extract namespace.
- Else use `config.AppName`.
- Store the resolved namespace back into `config.Namespace`.
- Enforce presence of credentials:
- If `config.RequireAPIKey` is true AND both `config.APIKey` and `config.JWT` are empty -> return error `access denied: API key or JWT required`.
- Add `func (c *Client) requireAccess(ctx context.Context) error` that:
- If `RequireAPIKey` and both `APIKey` and `JWT` are empty -> error `access denied: credentials required`.
- Resolve per-call namespace override from context (via storage/pubsub helpers below). If present and `override != c.config.Namespace` -> error `access denied: namespace mismatch`.
Security hardening (key points) ### 3) Guard all operations
- Transport: require TLS for all gateways. Support mTLS for internal connections if desired. - File: `pkg/client/implementations.go`
- Signature verification: - At the start of each public method, call `client.requireAccess(ctx)` and return the error if any.
- Support Solana (ed25519) and Ethereum (secp256k1 SIWE/EIP-191) signature formats. - DatabaseClientImpl: `Query`, `Transaction`, `CreateTable`, `DropTable`, `GetSchema`.
- Challenges are single-use and time-limited. - StorageClientImpl: `Get`, `Put`, `Delete`, `List`, `Exists`.
- JWT & keys: - NetworkInfoImpl: `GetPeers`, `GetStatus`, `ConnectToPeer`, `DisconnectFromPeer`.
- Use RS256/ES256 and store private keys in KMS/HSM. Publish JWKS for clients. - For Storage operations, ensure we propagate the effective namespace:
- Rotate keys and keep old keys in JWKS until no tokens signed by them remain. - If override present and equals `config.Namespace`, pass that context through; else use `storage.WithNamespace(ctx, config.Namespace)`.
- Token lifecycle:
- Short access TTL, rotating refresh tokens with hashed storage, jti blacklisting for revocation.
- Input validation:
- Enforce strict validation on SQL queries (parameterized), topic names, key names, sizes.
- Rate-limits + quotas:
- Enforce per-client quotas. Additional IP-based rate-limiting as fallback.
- Anti-sybil for registration:
- Rate-limit challenge requests per IP and per wallet address.
- Require payments/NFT for elevated capabilities (paid plans or verified NFT holders).
- Apply CAPTCHAs or step-up verification if suspicious activity is detected (many signups from same IP/wallet).
- Monitor behavioral signals (usage spikes, repeated failures) and automatically throttle or block abusive actors.
- Multi-tenant isolation & namespace enforcement: ### 4) PubSub context-based namespace override (parity with Storage)
- Principle: every App must be strictly namespaced. All cross-service operations (DB, Storage, Pub/Sub, Network actions) must be checked and scoped to the App's namespace as declared in the App record and embedded in JWTs. - Files: `pkg/pubsub/*`
- JWT binding: issue tokens that include a `namespace` claim. All gateway handlers must verify that any request-scoped `namespace` parameter or implied namespace (for example when creating resources) matches the token's `namespace`. - Add:
- Storage keys: internally prefix all storage operations with the namespace (e.g., `ns::<namespace>::<key>`). The gateway API must require either an explicit `namespace` parameter or infer it from the token; never accept a raw key without namespacing. - `type ctxKey string`
- Pub/Sub topics: require namespaced topic names (e.g., `<namespace>.<topic>`). Reject topic operations that omit or try to impersonate a different namespace. When forwarding subscriptions to the internal pubsub layer, map to namespaced topics only. - `const CtxKeyNamespaceOverride ctxKey = "pubsub_ns_override"`
- Database isolation and table naming: - `func WithNamespace(ctx context.Context, ns string) context.Context`
- Prefer one of these techniques (ordered by recommended deployment complexity): - Update topic naming in `manager.go` and `subscriptions.go`/`publish.go`:
1. Logical per-app RQLite DB (best isolation): create/assign a dedicated logical DB or database file for each app so tables live in the app's DB and cannot collide. - Before computing `namespacedTopic`, check for ctx override; if present and non-empty, use it; else fall back to `m.namespace`.
2. Table name prefixing (practical): prefix table names with namespace (e.g., `ns__<namespace>__users`) and enforce gateway-only creation to avoid collisions.
3. SQL sandboxing + query rewriting: rewrite queries to inject namespace-qualified table names or run them in a namespaced schema; disallow raw `ATTACH`/`DETACH` and other DDL that can escape namespace.
- For MVP, implement table name prefixing and forbid arbitrary DDL by non-admin apps. If you later need stronger isolation, migrate to per-app logical DBs.
- Resource creation rules:
- Only allow apps to create resources (tables, topics, storage keys) within their namespace.
- When a create request arrives, the gateway must validate:
- The token `namespace` matches the requested namespace.
- The resource name, after applying namespace prefix, does not already exist under another namespace.
- Enforce strict name validation (regex), disallowing `..`, slashes and control chars.
- Prevent namespace collisions and impersonation:
- Reserve a namespace namespace-ownership table in `core` that records `owner_app_id`, `namespace`, `created_at`, and optional `domain`/`metadata`.
- Reject any create attempt for a namespace that is already registered to another app.
- Provide an admin-only transfer mechanism for namespace ownership (manual transfer requires validation).
- Middleware & enforcement:
- Implement a centralized namespace enforcement middleware that runs before handler logic:
- Extract `namespace` from JWT.
- Compare to requested namespace (query/body/path). If mismatch, return 403.
- For internal DB calls, automatically apply namespace prefix mapping.
- Log and audit any 403 cross-namespace attempts for monitoring and alerts.
- SQL & command safety:
- Disallow dangerous SQL statements from non-admin tokens: `ATTACH`, `DETACH`, `PRAGMA` (sensitive), `ALTER` (unless permitted), and `DROP TABLE` without proper scope checks.
- Enforce parameterized queries only. Optionally maintain a whitelist of allowed DDL/DDL-like statements for higher-tier apps that request explicit permissions.
- Testing & validation:
- Add unit and integration tests that attempt cross-namespace access (DB reads/writes, storage key reads, pubsub subscriptions) and assert they are rejected.
- Add fuzz tests for topic and key naming to ensure sanitization and prefixing is robust.
- Migration & operator notes:
- Document the namespace naming convention and migration path if you need to move an app from prefixing to per-app DB later.
- Provide tooling to inspect `core.namespace_ownership` and reconcile accidental collisions or orphaned namespaces.
- Secrets management:
- Store API keys and refresh tokens hashed.
- Protect DB credentials and RQLite nodes with network controls.
- Audit & monitoring:
- Structured logs with redaction rules.
- Prometheus metrics for key events: stake commits, registration attempts, JWT issuance, quota breaches.
Operational & deployment considerations ### 5) Client context helper
- Separate gateway process to scale independently from nodes. - New file: `pkg/client/context.go`
- For local/demo: provide `--embedded` or `node --enable-gateway` dev-only mode that runs gateway in-process on localhost. - Add `func WithNamespace(ctx context.Context, ns string) context.Context` that applies both storage and pubsub overrides by chaining:
- Expose Prometheus `/metrics`. - `ctx = storage.WithNamespace(ctx, ns)`
- Provide configuration flags: - `ctx = pubsub.WithNamespace(ctx, ns)`
- `--http-listen`, `--grpc-listen`, `--tls-cert`, `--tls-key`, `--jwks-url`, `--stake-contracts`, `--chain-rpc` endpoints, `--redis-url`, `--core-db-endpoint` - return `ctx`
- Run gateway behind an API gateway / LB in production for TLS termination or WAF if needed.
- Use KMS for signing key management and automated cert issuance (cert-manager or cloud provider).
Implementation roadmap (milestones & tasks) ### 6) Documentation updates
Phase 0 — Design & infra - Files: `README.md`, `AI_CONTEXT.md`
- Finalize JWKS/token signing approach (RS256 vs ES256). - Document the new client auth behavior:
- Define staking contract interface for Solana (and EVM if planned). - An API key or JWT is required by default (`RequireAPIKey=true`).
- Create `core` RQLite schema SQL migrations. - Namespace auto-derived from token:
- JWT claim `Namespace`.
- API key format `ak_<random>:<namespace>`.
- Per-call override via `client.WithNamespace(ctx, ns)` allowed but must match derived namespace.
- All modules (Storage, PubSub, Database, NetworkInfo) are guarded.
- Provide usage examples for constructing `ClientConfig` with API key or JWT and making calls.
Phase 1 — Minimal Gateway MVP ## Helper details
- Scaffold `cmd/gateway` and `pkg/gateway/bridge`. - JWT parsing: implement a minimal helper to split the token and base64url-decode the payload; read `Namespace` field from JSON. Do not verify signature for this task. If parsing fails, return a clear error.
- Implement: - API key parsing: simple split on `:`; trim spaces; validate non-empty.
- `/v1/auth/challenge` and `/v1/auth/register` (wallet signature verification)
- Token issuance (JWT + refresh token)
- Simple `apps` CRUD in `core` DB
- Basic endpoints: `/v1/health`, `/v1/network/peers`, `/v1/pubsub/publish`, `/v1/storage/get|put`
- Basic WebSocket pubsub `/v1/pubsub/ws`
- Local in-memory rate limiter with default quotas
- Create TypeScript example client demonstrating challenge → sign → register → use token → WS pubsub.
Phase 2 — Payments & quotas ## Error messages (standardize)
- Add payment endpoints: `/v1/payments/info`, `/v1/payments/commit`, `/v1/payments/status` - Missing credentials: `access denied: API key or JWT required`
- Implement Ethereum (EVM) adapter to verify payment txs via JSON-RPC (support mainnet + testnets such as Goerli) - Namespace mismatch: `access denied: namespace mismatch`
- Add Redis-backed rate limiter and plan mapping based on subscription plan (Basic/Pro/Elite) - Client not connected: keep existing `client not connected` error.
- Implement background payment watcher to verify transactions, confirm payments, set subscription periods, and push quota updates
- Provide testnet configuration and flows so integrations can be tested without spending real ETH
Phase 3 — Production hardening & SDKs ## Acceptance criteria
- Integrate persistent Redis + RQLite `core` DB in prod config - Without credentials and `RequireAPIKey=true`, `Connect()` returns error and no operations are allowed.
- Replace in-memory limiter with Redis; add quota recalculation on stake changes - With API key `ak_abc123:myapp`, the client auto-resolves namespace `myapp`; operations succeed.
- Add JWKS endpoints, key rotation, KMS integration - With JWT containing `{ "Namespace": "myapp" }`, the client auto-resolves `myapp`; operations succeed.
- Add API key issuance (hashed) - If a caller sets `client.WithNamespace(ctx, "otherNS")` while resolved namespace is `myapp`, any operation returns `access denied: namespace mismatch`.
- Add OpenAPI spec and generate JS/Swift SDKs - PubSub topic names use the override when present (and allowed) else the resolved namespace.
- Add metrics, logging, alerting and documentation - NetworkInfo methods are also guarded and require credentials.
Phase 4 — Optional: On-chain contracts & advanced flows ## Out of scope (for this task)
- Deploy staking contract (Solana/EVM) with event emission - Protocol-level auth or verification of JWT signatures against JWKS.
- Add NFT attestation flow - ETH payments/subscriptions and tier enforcement. (Separate design/implementation.)
- (Optional) Implement direct libp2p-js path for browser-native P2P
Developer tasks — immediate actionable items ## Files to modify/add
1. Create RQLite `core` DB migration SQL and add to repo migrations (include `apps`, `nonces`, `subscriptions`, `refresh_tokens`, `api_keys`, `audit_events`, and `namespace_ownership` tables). - Modify:
2. Scaffold `cmd/gateway/main.go` with flags `--http-listen`, `--grpc`, `--tls-*`, `--redis`, `--core-db`. - `pkg/client/interface.go`
3. Implement `pkg/gateway/auth` with challenge/register handlers and Ethereum signature verification helper (for EOA flows / SIWE). - `pkg/client/client.go`
4. Implement `pkg/gateway/bridge` to call `client.NewClient(DefaultClientConfig(namespace))` and wire basic endpoints (pubsub publish, storage get/put). - `pkg/client/implementations.go`
5. Add WebSocket pubsub forwarding using `client.PubSub().Subscribe` and map to WS sessions. - `pkg/pubsub/manager.go`
6. Add Redis-based token-bucket `pkg/gateway/rate` and middleware for HTTP endpoints. - `pkg/pubsub/subscriptions.go`
7. Implement `/v1/payments/commit` Ethereum adapter skeleton (verify payment tx via JSON-RPC and support testnets like Goerli). - `pkg/pubsub/publish.go` (if exists; add override resolution there too)
8. Produce OpenAPI (YAML) for the endpoints to allow SDK generation. - `README.md`, `AI_CONTEXT.md`
9. Build example TypeScript client that performs challenge -> sign -> register -> use payments on testnet -> publish/subscribe. - Add:
10. Implement namespace enforcement middleware: - `pkg/pubsub/context.go` (if not present)
- Validate token `namespace` claim and ensure it matches any requested `namespace` parameter or infers namespace for the operation. - `pkg/client/context.go`
- Map and apply namespace prefixes to storage keys, pubsub topics, and DB table names.
- Reject attempts to access or create resources outside the token's namespace (return 403).
11. Add `core.namespace_ownership` table and enforcement logic to prevent two apps from owning the same namespace; disallow create requests for reserved/owned namespaces.
12. Implement create-resource guards:
- Ensure table/topic/key creation requests include the namespace and that the gateway applies/validates namespace prefixes before creating resources.
- Disallow non-admin DDL that can escape namespace boundaries (`ATTACH`, `DETACH`, raw file access).
13. Add unit and integration tests for multi-tenant isolation:
- Tests that verify reads/writes across namespaces are rejected.
- Tests that verify topic and storage key isolation enforcement.
14. Add audit hooks to log any cross-namespace access attempts and integrate alerts for repeated violations.
15. Update API documentation and SDKs to document the namespace requirement and show examples of correctly namespaced calls.
Notes & guidelines ## Notes
- Use separate `core` logical DB name when creating rqlite connections: `http://<host>:<port>/core` or a connection that uses a dedicated DB directory for the gateway. - Keep logs concise and avoid leaking tokens in logs. You may log the resolved namespace at `INFO` level on connect.
- Keep gateway stateless where possible: store short-lived state in Redis. Persistent state goes to `core` RQLite. - Ensure thread-safety when accessing `Client.config` fields (use existing locks if needed).
- Prefer parameterized SQL calls in gateway code when writing to `core`.
- For wallet signature verification use battle-tested crypto libs (Solana ed25519 from x/crypto, Ethereum ecrecover libs) and accept explicit `wallet_type`.
- Keep WebSocket messages compact (use base64 for binary payloads) and add per-connection subscription limits.
Open questions to finalize before coding
- Which chains to support in v1? (Solana recommended as first)
- Exact stake thresholds and confirmation counts for each chain
- JWKS key storage policy (local PEM for dev; KMS in prod)
- Redis availability & cluster sizing for rate-limiter
- Should `core` RQLite be colocated on node or run as separate RQLite node cluster? (Separate RQLite logical DB is recommended)
If you want, I can now:
- Generate the `network/TASK.md` (this file) as well as scaffolded Go handler stubs for `/v1/auth/challenge`, `/v1/auth/register`, `/v1/payments/commit` and example SQL migrations for `core` DB (including `subscriptions` table).
- Or produce an OpenAPI spec for the MVP endpoints so you can generate SDKs. I can also produce example testnet payment flows (Goerli) and a TypeScript test client that demonstrates paying on testnet and activating a subscription.
Tell me which code artifact you want next and I will produce it.

View File

@ -2,7 +2,10 @@ package client
import ( import (
"context" "context"
"encoding/base64"
"encoding/json"
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
@ -41,6 +44,9 @@ type Client struct {
connected bool connected bool
startTime time.Time startTime time.Time
mu sync.RWMutex mu sync.RWMutex
// resolvedNamespace is the namespace derived from JWT/APIKey.
resolvedNamespace string
} }
// NewClient creates a new network client // NewClient creates a new network client
@ -118,6 +124,18 @@ func (c *Client) Connect() error {
return nil return nil
} }
// Enforce credentials are present
if c.config == nil || (strings.TrimSpace(c.config.APIKey) == "" && strings.TrimSpace(c.config.JWT) == "") {
return fmt.Errorf("access denied: API key or JWT required")
}
// Derive and set namespace from provided credentials
ns, err := c.deriveNamespace()
if err != nil {
return fmt.Errorf("failed to derive namespace: %w", err)
}
c.resolvedNamespace = ns
// Create LibP2P host with optional Anyone proxy for TCP and optional QUIC disable // Create LibP2P host with optional Anyone proxy for TCP and optional QUIC disable
var opts []libp2p.Option var opts []libp2p.Option
opts = append(opts, opts = append(opts,
@ -168,7 +186,7 @@ func (c *Client) Connect() error {
// Create pubsub bridge once and store it // Create pubsub bridge once and store it
adapter := pubsub.NewClientAdapter(c.libp2pPS, c.getAppNamespace()) adapter := pubsub.NewClientAdapter(c.libp2pPS, c.getAppNamespace())
c.pubsub = &pubSubBridge{adapter: adapter} c.pubsub = &pubSubBridge{client: c, adapter: adapter}
// Create storage client with the host // Create storage client with the host
storageClient := storage.NewClient(h, c.getAppNamespace(), c.logger) storageClient := storage.NewClient(h, c.getAppNamespace(), c.logger)
@ -290,5 +308,95 @@ func (c *Client) isConnected() bool {
// getAppNamespace returns the namespace for this app // getAppNamespace returns the namespace for this app
func (c *Client) getAppNamespace() string { func (c *Client) getAppNamespace() string {
c.mu.RLock()
defer c.mu.RUnlock()
if c.resolvedNamespace != "" {
return c.resolvedNamespace
}
return c.config.AppName return c.config.AppName
} }
// requireAccess enforces that credentials are present and that any context-based namespace overrides match
func (c *Client) requireAccess(ctx context.Context) error {
cfg := c.Config()
if cfg == nil || (strings.TrimSpace(cfg.APIKey) == "" && strings.TrimSpace(cfg.JWT) == "") {
return fmt.Errorf("access denied: API key or JWT required")
}
ns := c.getAppNamespace()
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" && s != ns {
return fmt.Errorf("access denied: namespace mismatch")
}
}
if v := ctx.Value(pubsub.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" && s != ns {
return fmt.Errorf("access denied: namespace mismatch")
}
}
return nil
}
// deriveNamespace determines the namespace from JWT or API key.
func (c *Client) deriveNamespace() (string, error) {
// Prefer JWT claim {"Namespace": "..."}
if strings.TrimSpace(c.config.JWT) != "" {
ns, err := parseJWTNamespace(c.config.JWT)
if err != nil {
return "", err
}
if ns != "" {
return ns, nil
}
}
// Fallback to API key format ak_<random>:<namespace>
if strings.TrimSpace(c.config.APIKey) != "" {
ns, err := parseAPIKeyNamespace(c.config.APIKey)
if err != nil {
return "", err
}
if ns != "" {
return ns, nil
}
}
return c.config.AppName, nil
}
// parseJWTNamespace decodes base64url payload to extract Namespace claim (no signature verification)
func parseJWTNamespace(token string) (string, error) {
parts := strings.Split(token, ".")
if len(parts) < 2 {
return "", fmt.Errorf("invalid JWT format")
}
payload := parts[1]
// Decode base64url (raw, no padding)
data, err := base64.RawURLEncoding.DecodeString(payload)
if err != nil {
return "", fmt.Errorf("failed to decode JWT payload: %w", err)
}
// Minimal JSON struct
var claims struct {
Namespace string `json:"Namespace"`
}
if err := json.Unmarshal(data, &claims); err != nil {
return "", fmt.Errorf("failed to parse JWT claims: %w", err)
}
return strings.TrimSpace(claims.Namespace), nil
}
// parseAPIKeyNamespace extracts the namespace from ak_<random>:<namespace>
func parseAPIKeyNamespace(key string) (string, error) {
key = strings.TrimSpace(key)
if key == "" {
return "", fmt.Errorf("invalid API key: empty")
}
// Allow but ignore prefix ak_
parts := strings.Split(key, ":")
if len(parts) != 2 {
return "", fmt.Errorf("invalid API key format: expected ak_<random>:<namespace>")
}
ns := strings.TrimSpace(parts[1])
if ns == "" {
return "", fmt.Errorf("invalid API key: empty namespace")
}
return ns, nil
}

17
pkg/client/context.go Normal file
View File

@ -0,0 +1,17 @@
package client
import (
"context"
"git.debros.io/DeBros/network/pkg/pubsub"
"git.debros.io/DeBros/network/pkg/storage"
)
// WithNamespace applies both storage and pubsub namespace overrides to the context.
// It is a convenience helper for client callers to ensure both subsystems receive
// the same, consistent namespace override.
func WithNamespace(ctx context.Context, ns string) context.Context {
ctx = storage.WithNamespace(ctx, ns)
ctx = pubsub.WithNamespace(ctx, ns)
return ctx
}

View File

@ -61,6 +61,10 @@ func (d *DatabaseClientImpl) Query(ctx context.Context, sql string, args ...inte
return nil, err return nil, err
} }
if err := d.client.requireAccess(ctx); err != nil {
return nil, err
}
// Determine if this is a read or write operation // Determine if this is a read or write operation
isWriteOperation := d.isWriteOperation(sql) isWriteOperation := d.isWriteOperation(sql)
@ -260,6 +264,10 @@ func (d *DatabaseClientImpl) Transaction(ctx context.Context, queries []string)
return fmt.Errorf("client not connected") return fmt.Errorf("client not connected")
} }
if err := d.client.requireAccess(ctx); err != nil {
return err
}
maxRetries := 3 maxRetries := 3
var lastErr error var lastErr error
@ -298,6 +306,10 @@ func (d *DatabaseClientImpl) CreateTable(ctx context.Context, schema string) err
return err return err
} }
if err := d.client.requireAccess(ctx); err != nil {
return err
}
return d.withRetry(func(conn *gorqlite.Connection) error { return d.withRetry(func(conn *gorqlite.Connection) error {
_, err := conn.WriteOne(schema) _, err := conn.WriteOne(schema)
return err return err
@ -310,6 +322,10 @@ func (d *DatabaseClientImpl) DropTable(ctx context.Context, tableName string) er
return err return err
} }
if err := d.client.requireAccess(ctx); err != nil {
return err
}
return d.withRetry(func(conn *gorqlite.Connection) error { return d.withRetry(func(conn *gorqlite.Connection) error {
dropSQL := fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName) dropSQL := fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)
_, err := conn.WriteOne(dropSQL) _, err := conn.WriteOne(dropSQL)
@ -323,6 +339,10 @@ func (d *DatabaseClientImpl) GetSchema(ctx context.Context) (*SchemaInfo, error)
return nil, fmt.Errorf("client not connected") return nil, fmt.Errorf("client not connected")
} }
if err := d.client.requireAccess(ctx); err != nil {
return nil, err
}
// Get RQLite connection // Get RQLite connection
conn, err := d.getRQLiteConnection() conn, err := d.getRQLiteConnection()
if err != nil { if err != nil {
@ -396,6 +416,10 @@ func (s *StorageClientImpl) Get(ctx context.Context, key string) ([]byte, error)
return nil, fmt.Errorf("client not connected") return nil, fmt.Errorf("client not connected")
} }
if err := s.client.requireAccess(ctx); err != nil {
return nil, err
}
return s.storageClient.Get(ctx, key) return s.storageClient.Get(ctx, key)
} }
@ -405,6 +429,10 @@ func (s *StorageClientImpl) Put(ctx context.Context, key string, value []byte) e
return fmt.Errorf("client not connected") return fmt.Errorf("client not connected")
} }
if err := s.client.requireAccess(ctx); err != nil {
return err
}
err := s.storageClient.Put(ctx, key, value) err := s.storageClient.Put(ctx, key, value)
if err != nil { if err != nil {
return err return err
@ -419,6 +447,10 @@ func (s *StorageClientImpl) Delete(ctx context.Context, key string) error {
return fmt.Errorf("client not connected") return fmt.Errorf("client not connected")
} }
if err := s.client.requireAccess(ctx); err != nil {
return err
}
err := s.storageClient.Delete(ctx, key) err := s.storageClient.Delete(ctx, key)
if err != nil { if err != nil {
return err return err
@ -433,6 +465,10 @@ func (s *StorageClientImpl) List(ctx context.Context, prefix string, limit int)
return nil, fmt.Errorf("client not connected") return nil, fmt.Errorf("client not connected")
} }
if err := s.client.requireAccess(ctx); err != nil {
return nil, err
}
return s.storageClient.List(ctx, prefix, limit) return s.storageClient.List(ctx, prefix, limit)
} }
@ -442,6 +478,10 @@ func (s *StorageClientImpl) Exists(ctx context.Context, key string) (bool, error
return false, fmt.Errorf("client not connected") return false, fmt.Errorf("client not connected")
} }
if err := s.client.requireAccess(ctx); err != nil {
return false, err
}
return s.storageClient.Exists(ctx, key) return s.storageClient.Exists(ctx, key)
} }
@ -456,6 +496,10 @@ func (n *NetworkInfoImpl) GetPeers(ctx context.Context) ([]PeerInfo, error) {
return nil, fmt.Errorf("client not connected") return nil, fmt.Errorf("client not connected")
} }
if err := n.client.requireAccess(ctx); err != nil {
return nil, err
}
// Get peers from LibP2P host // Get peers from LibP2P host
host := n.client.host host := n.client.host
if host == nil { if host == nil {
@ -512,6 +556,10 @@ func (n *NetworkInfoImpl) GetStatus(ctx context.Context) (*NetworkStatus, error)
return nil, fmt.Errorf("client not connected") return nil, fmt.Errorf("client not connected")
} }
if err := n.client.requireAccess(ctx); err != nil {
return nil, err
}
host := n.client.host host := n.client.host
if host == nil { if host == nil {
return nil, fmt.Errorf("no host available") return nil, fmt.Errorf("no host available")
@ -551,6 +599,10 @@ func (n *NetworkInfoImpl) ConnectToPeer(ctx context.Context, peerAddr string) er
return fmt.Errorf("client not connected") return fmt.Errorf("client not connected")
} }
if err := n.client.requireAccess(ctx); err != nil {
return err
}
host := n.client.host host := n.client.host
if host == nil { if host == nil {
return fmt.Errorf("no host available") return fmt.Errorf("no host available")
@ -582,6 +634,10 @@ func (n *NetworkInfoImpl) DisconnectFromPeer(ctx context.Context, peerID string)
return fmt.Errorf("client not connected") return fmt.Errorf("client not connected")
} }
if err := n.client.requireAccess(ctx); err != nil {
return err
}
host := n.client.host host := n.client.host
if host == nil { if host == nil {
return fmt.Errorf("no host available") return fmt.Errorf("no host available")

View File

@ -129,7 +129,8 @@ type ClientConfig struct {
RetryAttempts int `json:"retry_attempts"` RetryAttempts int `json:"retry_attempts"`
RetryDelay time.Duration `json:"retry_delay"` RetryDelay time.Duration `json:"retry_delay"`
QuietMode bool `json:"quiet_mode"` // Suppress debug/info logs QuietMode bool `json:"quiet_mode"` // Suppress debug/info logs
APIKey string `json:"api_key"` // Optional API key for gateway auth (not enforced by client) APIKey string `json:"api_key"` // API key for gateway auth
JWT string `json:"jwt"` // Optional JWT bearer token
} }
// DefaultClientConfig returns a default client configuration // DefaultClientConfig returns a default client configuration
@ -148,5 +149,6 @@ func DefaultClientConfig(appName string) *ClientConfig {
RetryDelay: time.Second * 5, RetryDelay: time.Second * 5,
QuietMode: false, QuietMode: false,
APIKey: "", APIKey: "",
JWT: "",
} }
} }

View File

@ -8,10 +8,14 @@ import (
// pubSubBridge bridges between our PubSubClient interface and the pubsub package // pubSubBridge bridges between our PubSubClient interface and the pubsub package
type pubSubBridge struct { type pubSubBridge struct {
client *Client
adapter *pubsub.ClientAdapter adapter *pubsub.ClientAdapter
} }
func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
if err := p.client.requireAccess(ctx); err != nil {
return err
}
// Convert our MessageHandler to the pubsub package MessageHandler // Convert our MessageHandler to the pubsub package MessageHandler
pubsubHandler := func(topic string, data []byte) error { pubsubHandler := func(topic string, data []byte) error {
return handler(topic, data) return handler(topic, data)
@ -20,13 +24,22 @@ func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler Mess
} }
func (p *pubSubBridge) Publish(ctx context.Context, topic string, data []byte) error { func (p *pubSubBridge) Publish(ctx context.Context, topic string, data []byte) error {
if err := p.client.requireAccess(ctx); err != nil {
return err
}
return p.adapter.Publish(ctx, topic, data) return p.adapter.Publish(ctx, topic, data)
} }
func (p *pubSubBridge) Unsubscribe(ctx context.Context, topic string) error { func (p *pubSubBridge) Unsubscribe(ctx context.Context, topic string) error {
if err := p.client.requireAccess(ctx); err != nil {
return err
}
return p.adapter.Unsubscribe(ctx, topic) return p.adapter.Unsubscribe(ctx, topic)
} }
func (p *pubSubBridge) ListTopics(ctx context.Context) ([]string, error) { func (p *pubSubBridge) ListTopics(ctx context.Context) ([]string, error) {
if err := p.client.requireAccess(ctx); err != nil {
return nil, err
}
return p.adapter.ListTopics(ctx) return p.adapter.ListTopics(ctx)
} }

16
pkg/pubsub/context.go Normal file
View File

@ -0,0 +1,16 @@
package pubsub
import "context"
// Context utilities for namespace override
// Keep type unexported and expose the key as exported constant to avoid collisions
// while still allowing other packages to use the exact key value.
type ctxKey string
// CtxKeyNamespaceOverride is the context key used to override namespace per pubsub call
const CtxKeyNamespaceOverride ctxKey = "pubsub_ns_override"
// WithNamespace returns a new context that carries a pubsub namespace override
func WithNamespace(ctx context.Context, ns string) context.Context {
return context.WithValue(ctx, CtxKeyNamespaceOverride, ns)
}

View File

@ -11,7 +11,15 @@ func (m *Manager) Publish(ctx context.Context, topic string, data []byte) error
return fmt.Errorf("pubsub not initialized") return fmt.Errorf("pubsub not initialized")
} }
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) // Determine namespace (allow per-call override via context)
ns := m.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
// Get or create topic // Get or create topic
libp2pTopic, err := m.getOrCreateTopic(namespacedTopic) libp2pTopic, err := m.getOrCreateTopic(namespacedTopic)

View File

@ -13,7 +13,14 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa
return fmt.Errorf("pubsub not initialized") return fmt.Errorf("pubsub not initialized")
} }
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) // Determine namespace (allow per-call override via context)
ns := m.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
// Check if already subscribed // Check if already subscribed
m.mu.Lock() m.mu.Lock()
@ -86,7 +93,14 @@ func (m *Manager) Unsubscribe(ctx context.Context, topic string) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) // Determine namespace (allow per-call override via context)
ns := m.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
if subscription, exists := m.subscriptions[namespacedTopic]; exists { if subscription, exists := m.subscriptions[namespacedTopic]; exists {
// Cancel the subscription context to stop the message handler goroutine // Cancel the subscription context to stop the message handler goroutine
@ -103,7 +117,14 @@ func (m *Manager) ListTopics(ctx context.Context) ([]string, error) {
defer m.mu.RUnlock() defer m.mu.RUnlock()
var topics []string var topics []string
prefix := m.namespace + "." // Determine namespace (allow per-call override via context)
ns := m.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
prefix := ns + "."
for topic := range m.subscriptions { for topic := range m.subscriptions {
if len(topic) > len(prefix) && topic[:len(prefix)] == prefix { if len(topic) > len(prefix) && topic[:len(prefix)] == prefix {