feat: implement HTTP gateway with auth, storage, and namespace isolation

This commit is contained in:
anonpenguin 2025-08-16 16:04:00 +03:00
parent 271d7bbafb
commit 5eca56cd1e
21 changed files with 2978 additions and 5 deletions

629
PLAN.md Normal file
View File

@ -0,0 +1,629 @@
# DeBros Network Gateway Implementation Plan
## Overview
This document outlines the phased implementation plan for the DeBros Network Gateway system, which provides HTTP/gRPC interfaces for non-Go clients to access network features like pub-sub, RQLite database, and storage through Ethereum wallet-based authentication and subscription models.
## Architecture Summary
- Separate `cmd/gateway` binary (not embedded in node)
- HTTP endpoints by default, optional gRPC support
- WebSocket support for pub-sub subscriptions
- Core RQLite database for gateway operational data
- Ethereum wallet-based authentication
- Multi-tenant namespace isolation
- Subscription-based payment model
---
## Phase 1: Basic Gateway Foundation (Week 1)
### Objective
Create the core gateway structure without authentication - a working HTTP proxy to the network.
### Step 1.1: Gateway Skeleton
**Files to create:**
```
cmd/gateway/main.go
pkg/gateway/config/config.go
pkg/gateway/server/server.go
```
**Implementation:**
- Basic HTTP server setup with graceful shutdown
- Configuration loading (port, network client settings)
- Health check endpoint (`/health`)
- Signal handling for SIGTERM/SIGINT
- Structured logging integration
### Step 1.2: Network Client Integration
**Files to create:**
```
pkg/gateway/client/network.go
pkg/gateway/client/pool.go
```
**Implementation:**
- Initialize network client connection using existing `pkg/client`
- Connection management and pooling
- Basic error handling and retries
- Health monitoring of network connections
### Step 1.3: Basic HTTP Handlers (No Auth)
**Files to create:**
```
pkg/gateway/handlers/health.go
pkg/gateway/handlers/storage.go
pkg/gateway/handlers/network.go
pkg/gateway/middleware/cors.go
pkg/gateway/middleware/logging.go
```
**Implementation:**
- Health check endpoint
- Basic storage GET/PUT (pass-through to network)
- Network status endpoint
- CORS middleware for web clients
- Request/response logging middleware
### Deliverables Phase 1
- [ ] Working gateway that can proxy basic requests to the network
- [ ] `/health` and `/status` endpoints functional
- [ ] Basic storage operations working without auth
- [ ] Proper error handling and logging
---
## Phase 2: Core Database & Models (Week 1-2)
### Objective
Set up the foundation database schema and models for authentication and multi-tenancy.
### Step 2.1: Database Setup
**Files to create:**
```
migrations/001_initial.sql
migrations/002_indexes.sql
pkg/gateway/db/migrations.go
```
**Implementation:**
```sql
-- Core tables
CREATE TABLE apps (id, namespace, wallet_address, created_at, updated_at);
CREATE TABLE namespaces (id, name, owner_wallet, created_at);
CREATE TABLE api_keys (id, app_id, key_hash, created_at, last_used);
CREATE TABLE audit_events (id, namespace, action, resource, timestamp);
CREATE TABLE nonces (wallet_address, nonce, expires_at);
CREATE TABLE refresh_tokens (id, app_id, token_hash, expires_at);
```
### Step 2.2: Database Access Layer
**Files to create:**
```
pkg/gateway/db/connection.go
pkg/gateway/db/models.go
pkg/gateway/db/queries.go
pkg/gateway/db/migrate.go
```
**Implementation:**
- Database connection management
- Model structs for all tables
- CRUD operations for each model
- Migration runner and version tracking
### Step 2.3: Namespace Management
**Files to create:**
```
pkg/gateway/namespace/manager.go
pkg/gateway/namespace/validator.go
pkg/gateway/namespace/errors.go
```
**Implementation:**
- Namespace CRUD operations
- Validation rules (naming, uniqueness)
- Ownership verification
- Namespace reservation system
### Deliverables Phase 2
- [ ] Database schema deployed and versioned
- [ ] Basic CRUD operations for apps and namespaces
- [ ] Migration system working
- [ ] Namespace management API
---
## Phase 3: Ethereum Wallet Authentication (Week 2)
### Objective
Implement the core Ethereum wallet-based authentication system.
### Step 3.1: Wallet Signature Verification
**Files to create:**
```
pkg/gateway/auth/ethereum.go
pkg/gateway/auth/nonce.go
pkg/gateway/auth/signature.go
```
**Implementation:**
- Message signing/verification using secp256k1
- Nonce generation and management (prevent replay attacks)
- Address recovery from signatures
- EIP-191 message formatting
### Step 3.2: JWT Token System
**Files to create:**
```
pkg/gateway/auth/jwt.go
pkg/gateway/auth/claims.go
pkg/gateway/auth/refresh.go
```
**Implementation:**
- JWT token generation with namespace claims
- Token validation middleware
- Refresh token implementation
- Token blacklisting for logout
### Step 3.3: Authentication Endpoints
**Files to create:**
```
pkg/gateway/handlers/auth.go
pkg/gateway/middleware/auth.go
```
**Endpoints:**
- `POST /v1/auth/nonce` - Get signing nonce
- `POST /v1/auth/verify` - Verify signature and get JWT
- `POST /v1/auth/refresh` - Refresh JWT token
- `POST /v1/auth/logout` - Invalidate tokens
### Deliverables Phase 3
- [ ] Working Ethereum wallet authentication
- [ ] JWT tokens with namespace claims
- [ ] Session management with refresh tokens
- [ ] Secure logout functionality
---
## Phase 4: Namespace Isolation & Security (Week 2-3)
### Objective
Implement strict multi-tenant security with complete namespace isolation.
### Step 4.1: Namespace Enforcement Middleware
**Files to create:**
```
pkg/gateway/middleware/namespace.go
pkg/gateway/middleware/ownership.go
pkg/gateway/security/validator.go
```
**Implementation:**
- Extract namespace from JWT claims
- Validate namespace ownership against database
- Inject namespace into request context
- Block cross-namespace access attempts
### Step 4.2: Resource Prefixing
**Files to create:**
```
pkg/gateway/isolation/storage.go
pkg/gateway/isolation/pubsub.go
pkg/gateway/isolation/database.go
pkg/gateway/isolation/keys.go
```
**Implementation:**
- Storage keys: `ns::<namespace>::<key>`
- PubSub topics: `<namespace>.<topic>`
- Database tables: `ns__<namespace>__tablename`
- Consistent prefixing across all resources
### Step 4.3: Secure Handlers Update
**Files to update:**
```
pkg/gateway/handlers/storage.go
pkg/gateway/handlers/pubsub.go
pkg/gateway/handlers/database.go
```
**Implementation:**
- All handlers updated to use namespace isolation
- Resource access validation
- Audit logging for all operations
### Deliverables Phase 4
- [ ] All operations namespace-isolated
- [ ] Cross-namespace access prevented and logged
- [ ] Security tests passing
- [ ] Audit trail for all resource access
---
## Phase 5: Complete API Implementation (Week 3)
### Objective
Implement all remaining REST and WebSocket endpoints.
### Step 5.1: Storage API
**Files to create/update:**
```
pkg/gateway/handlers/storage.go
pkg/gateway/api/storage.go
```
**Endpoints:**
- `GET /v1/storage/:key` - Get value
- `PUT /v1/storage/:key` - Set value
- `DELETE /v1/storage/:key` - Delete key
- `GET /v1/storage` - List keys with prefix filter
### Step 5.2: PubSub API with WebSockets
**Files to create:**
```
pkg/gateway/handlers/pubsub.go
pkg/gateway/websocket/manager.go
pkg/gateway/websocket/subscriber.go
```
**Endpoints:**
- `POST /v1/pubsub/publish` - Publish message
- `WebSocket /v1/pubsub/subscribe` - Real-time subscriptions
- `GET /v1/pubsub/topics` - List topics
- `GET /v1/pubsub/subscriptions` - List active subscriptions
### Step 5.3: Database API
**Files to create:**
```
pkg/gateway/handlers/database.go
pkg/gateway/api/database.go
```
**Endpoints:**
- `POST /v1/db/query` - Execute SELECT queries
- `POST /v1/db/execute` - Execute INSERT/UPDATE/DELETE
- `POST /v1/db/batch` - Execute multiple statements
- `GET /v1/db/tables` - List namespace tables
- `POST /v1/db/migrate` - Run schema migrations
### Deliverables Phase 5
- [ ] All CRUD operations working with namespace isolation
- [ ] WebSocket subscriptions functional and secure
- [ ] Database operations isolated per namespace
- [ ] API documentation generated
---
## Phase 6: Rate Limiting & Quotas (Week 4)
### Objective
Add usage controls, monitoring, and tier-based quotas.
### Step 6.1: Rate Limiter Implementation
**Files to create:**
```
pkg/gateway/ratelimit/limiter.go
pkg/gateway/ratelimit/middleware.go
pkg/gateway/ratelimit/storage.go
pkg/gateway/ratelimit/config.go
```
**Implementation:**
- Token bucket algorithm implementation
- Per-namespace rate limiting
- Redis backend for distributed limiting
- Configurable limits per endpoint
### Step 6.2: Usage Tracking
**Files to create:**
```
pkg/gateway/usage/tracker.go
pkg/gateway/usage/quotas.go
pkg/gateway/usage/metrics.go
pkg/gateway/usage/reporter.go
```
**Implementation:**
- Track API calls per namespace
- Monitor resource usage (storage, database queries)
- Export Prometheus metrics
- Daily/monthly usage reports
### Step 6.3: Tier Enforcement
**Files to create:**
```
pkg/gateway/middleware/tier.go
pkg/gateway/subscription/tiers.go
pkg/gateway/subscription/limits.go
```
**Tier Limits:**
- **Free**: 250 RPM, 10k requests/day
- **Basic**: 1000 RPM, 100k requests/day, 100MB storage
- **Pro**: 5000 RPM, 1M requests/day, 1GB storage
- **Elite**: Unlimited RPM, 10M requests/day, 10GB storage
### Deliverables Phase 6
- [ ] Rate limiting active and configurable
- [ ] Usage tracking in database
- [ ] Tier-based quotas enforced
- [ ] Metrics exported for monitoring
---
## Phase 7: Payment & Subscription System (Week 4-5)
### Objective
Implement the Ethereum-based payment and subscription system.
### Step 7.1: Smart Contract Integration
**Files to create:**
```
pkg/gateway/blockchain/client.go
pkg/gateway/blockchain/contracts.go
pkg/gateway/blockchain/verifier.go
pkg/gateway/blockchain/events.go
```
**Implementation:**
- Ethereum client setup (mainnet + testnet)
- Payment verification smart contracts
- Event listening for payments
- Transaction verification
### Step 7.2: Subscription Management
**Files to create:**
```
pkg/gateway/subscription/manager.go
pkg/gateway/subscription/validator.go
pkg/gateway/subscription/renewal.go
pkg/gateway/subscription/pricing.go
```
**Pricing:**
- **Basic**: 0.1 ETH/month
- **Pro**: 0.2 ETH/month
- **Elite**: 0.3 ETH/month
- **Testnet**: Free for testing
### Step 7.3: Payment Endpoints
**Files to create:**
```
pkg/gateway/handlers/payments.go
pkg/gateway/handlers/subscriptions.go
```
**Endpoints:**
- `POST /v1/payments/subscribe` - Initiate subscription
- `GET /v1/payments/status` - Check payment status
- `POST /v1/payments/verify` - Verify blockchain payment
- `GET /v1/subscriptions/current` - Get subscription details
- `POST /v1/subscriptions/cancel` - Cancel subscription
### Deliverables Phase 7
- [ ] Payment verification working on mainnet/testnet
- [ ] Subscription status tracking
- [ ] Automatic tier application based on payments
- [ ] Payment event monitoring
---
## Phase 8: Testing & Hardening (Week 5)
### Objective
Comprehensive testing, security auditing, and performance optimization.
### Step 8.1: Integration Tests
**Files to create:**
```
tests/integration/auth_test.go
tests/integration/namespace_test.go
tests/integration/api_test.go
tests/integration/payments_test.go
tests/security/isolation_test.go
```
**Test Coverage:**
- Full API test suite
- Cross-namespace security tests
- Rate limit and quota tests
- Payment flow tests
- WebSocket connection tests
### Step 8.2: Load Testing
**Files to create:**
```
tests/load/k6_scripts/
tests/load/websocket_stress.js
tests/load/api_concurrent.js
```
**Testing:**
- Concurrent user simulations
- WebSocket stress testing
- Database connection pooling tests
- Rate limiter performance tests
### Step 8.3: Security Audit
**Files to create:**
```
docs/security_audit.md
tests/security/penetration_tests.go
```
**Security Checks:**
- Input validation on all endpoints
- SQL injection prevention
- Rate limit bypass attempts
- JWT security verification
- Cross-namespace isolation verification
### Deliverables Phase 8
- [ ] 80%+ test coverage across all components
- [ ] Load test results and performance benchmarks
- [ ] Security audit report with findings
- [ ] Performance optimization recommendations
---
## Quick Start Implementation Order
For immediate progress, implement in this exact order:
### Day 1-2: Minimal Gateway
1. Create `cmd/gateway/main.go` with basic HTTP server
2. Add health check endpoint (`/health`)
3. Connect to network using existing `pkg/client`
4. Test basic connectivity
### Day 3-4: Database Foundation
1. Create migration files with core tables
2. Setup database connection and models
3. Add app registration endpoint (no auth yet)
4. Test database operations
### Day 5-7: Basic Authentication
1. Implement nonce generation and storage
2. Add Ethereum wallet signature verification
3. Create JWT token system
4. Add authentication middleware
### Week 2: Core Security Features
1. Add namespace isolation middleware
2. Implement resource prefixing
3. Update handlers for namespace isolation
4. Add basic rate limiting
### Week 3: Complete API
1. Implement all storage endpoints
2. Add WebSocket support for pub-sub
3. Complete database API
4. Add comprehensive error handling
### Week 4: Production Ready
1. Add usage tracking and quotas
2. Implement tier-based limiting
3. Add payment verification
4. Complete subscription management
### Week 5: Testing & Launch
1. Write integration tests
2. Perform security testing
3. Load testing and optimization
4. Documentation and deployment
---
## File Structure Overview
```
cmd/gateway/
├── main.go # Entry point
└── config.yaml # Configuration
pkg/gateway/
├── server/
│ ├── server.go # HTTP server setup
│ └── routes.go # Route definitions
├── config/
│ └── config.go # Configuration management
├── db/
│ ├── connection.go # Database connection
│ ├── models.go # Data models
│ ├── queries.go # SQL queries
│ └── migrations.go # Migration runner
├── auth/
│ ├── ethereum.go # Wallet authentication
│ ├── jwt.go # JWT handling
│ └── nonce.go # Nonce management
├── handlers/
│ ├── auth.go # Auth endpoints
│ ├── storage.go # Storage API
│ ├── pubsub.go # PubSub API
│ ├── database.go # Database API
│ └── payments.go # Payment API
├── middleware/
│ ├── auth.go # Authentication
│ ├── namespace.go # Namespace isolation
│ ├── ratelimit.go # Rate limiting
│ └── cors.go # CORS handling
├── isolation/
│ ├── storage.go # Storage isolation
│ ├── pubsub.go # PubSub isolation
│ └── database.go # Database isolation
├── subscription/
│ ├── manager.go # Subscription management
│ └── tiers.go # Tier definitions
├── blockchain/
│ ├── client.go # Ethereum client
│ └── verifier.go # Payment verification
└── websocket/
├── manager.go # WebSocket management
└── subscriber.go # PubSub subscriptions
migrations/
├── 001_initial.sql # Initial schema
├── 002_indexes.sql # Performance indexes
└── 003_payments.sql # Payment tables
tests/
├── integration/ # Integration tests
├── security/ # Security tests
└── load/ # Load tests
docs/
├── api.md # API documentation
├── security.md # Security guidelines
└── deployment.md # Deployment guide
```
---
## Success Metrics
### Phase 1 Success Criteria
- [ ] Gateway starts and connects to network
- [ ] Health endpoint returns 200 OK
- [ ] Basic storage operations work
### Phase 2 Success Criteria
- [ ] Database migrations run successfully
- [ ] CRUD operations work for all models
- [ ] Namespace management functional
### Phase 3 Success Criteria
- [ ] Ethereum wallet authentication works
- [ ] JWT tokens generated and validated
- [ ] Session management operational
### Phase 4 Success Criteria
- [ ] Cross-namespace access blocked
- [ ] All resources properly isolated
- [ ] Security tests pass
### Phase 5 Success Criteria
- [ ] All API endpoints functional
- [ ] WebSocket subscriptions work
- [ ] Complete feature parity with direct client
### Phase 6 Success Criteria
- [ ] Rate limiting enforced
- [ ] Usage tracking accurate
- [ ] Tier limits respected
### Phase 7 Success Criteria
- [ ] Payment verification works
- [ ] Subscription management complete
- [ ] Automatic tier upgrades/downgrades
### Phase 8 Success Criteria
- [ ] 80%+ test coverage
- [ ] Security audit passed
- [ ] Load testing completed
- [ ] Production deployment ready
---
This implementation plan provides a clear roadmap from basic gateway functionality to a production-ready, secure, multi-tenant system with Ethereum-based payments and comprehensive API coverage.

298
TASK.md Normal file
View File

@ -0,0 +1,298 @@
# DeBros Network — Gateway, Auth & Staking TASKS
This document captures the plan, endpoints, auth/staking model, data layout, security hardening, and an implementation roadmap for the new `gateway` service. It is a single-source checklist to turn the ideas discussed into working code and infra.
Goals
- Provide a standalone `cmd/gateway` binary that exposes HTTP (default) and optional gRPC to allow non-Go clients (JS/Swift/etc.) to access Database, Storage, PubSub and Network features.
- Authenticate and authorize apps via wallet-based verification and on-chain staking / NFT attestation.
- Issue short-lived access tokens (JWT) + rotating refresh tokens + optional API keys for server apps.
- Use staking or NFT ownership to grant higher rate limits and scopes.
- Keep node processes and gateway process separable for scaling, security and reliability.
- Store gateway/core metadata in a dedicated `core` RQLite database to avoid mixing runtime app data with cluster DB.
High-level architecture
- `cmd/gateway` (new binary)
- HTTP server (REST + WebSocket)
- Optional gRPC server
- Bridge layer that calls `pkg/client` (NetworkClient) to interact with the network
- Auth & staking modules, token manager, rate-limiter, background chain watcher
- `pkg/gateway` packages
- `bridge` — adapters to call `client` methods
- `http` — REST handlers & middleware
- `ws` — WebSocket pubsub broker
- `auth` — challenge, register, JWT + refresh token handling
- `payments` — payment adapters, payment verification, subscription state
- `rate` — Redis-backed token-bucket / quota manager
- `db` — gateway schema migrations / helper for `core` DB
- Persistence
- `core` RQLite database (separate from application DBs) stores apps, stakes, tokens and nonces
- Redis for rate-limiting and ephemeral session state (optional fallback to in-memory for dev)
- Payment adapters
- Ethereum (EVM) JSON-RPC adapter (support for mainnet + testnets such as Goerli)
- Abstract interface allows adding other chains later if desired
Endpoints (HTTP + WebSocket) — MVP (no admin endpoints)
- General
- GET /v1/health
- GET /v1/version
- Network
- GET /v1/network/peers
- GET /v1/network/status
- POST /v1/network/connect { multiaddr }
- POST /v1/network/disconnect { peer_id }
- Database
- POST /v1/db/query { sql, params?, timeout? } (enforce scopes)
- POST /v1/db/transaction { queries: [] }
- GET /v1/db/schema
- POST /v1/db/create-table { sql } (admin / gated)
- Storage
- GET /v1/storage/get?key=&namespace=
- POST /v1/storage/put (binary body or JSON base64) ?key=&namespace=
- DELETE /v1/storage/delete { key, namespace }
- GET /v1/storage/list?prefix=&limit=&namespace=
- GET /v1/storage/exists?key=&namespace=
- Pub/Sub
- POST /v1/pubsub/publish { topic, data(base64|raw), namespace?, ttl? }
- GET /v1/pubsub/topics?namespace=
- WS /v1/pubsub/ws (subscribe/unsubscribe/publish over WS frames)
- SSE /v1/pubsub/sse?topic=... (optional read-only)
- Auth & App onboarding
- POST /v1/auth/challenge { wallet, wallet_type, app_name, metadata? }
- Response: { challenge, expires_in }
- POST /v1/auth/register { wallet, wallet_type, challenge, signature, app_name, metadata? }
- On success: create provisional App and return client_id (status: pending or active per flow)
- POST /v1/auth/refresh { client_id, refresh_token } -> new access_token
- GET /v1/auth/whoami (protected)
- Staking (on-chain)
- POST /v1/stake/info { client_id } -> returns required stake, contract address, memo format
- POST /v1/stake/commit { client_id, chain, tx_signature } -> verify on-chain, update stake
- GET /v1/stake/status?client_id=...
- POST /v1/stake/unstake { client_id } -> returns steps and marks pending_unstake
Authentication & authorization model (recommended MVP)
- App registration:
1. Client obtains ephemeral `challenge` for a wallet.
2. Client signs `challenge` with wallet and calls `/v1/auth/register`.
3. Gateway verifies signature and creates `app` record (status pending or active).
- App activation:
- NFT path: if the wallet holds qualifying NFT(s), gateway verifies and activates the app.
- Staking path: gateway asks you to stake tokens to a staking contract with a memo bound to `client_id`. After verifying the on-chain tx, gateway activates the app and assigns a tier.
- Tokens:
- Issue short-lived JWT access tokens (e.g., 15m) signed by gateway private key (RS256 or ES256). Publish JWKS at `/.well-known/jwks.json`.
- Issue rotating refresh tokens (keep hashed in `core` DB).
- Optionally issue API keys (hashed) for server-to-server use (longer TTL, revokable).
- JWT claims:
- iss, sub (client_id), aud, exp, iat, jti
- namespace, wallet, wallet_type, scopes, stake_amount, stake_chain, tier
- Scopes:
- `storage:read`, `storage:write`, `pubsub:publish`, `pubsub:subscribe`, `db:read`, `db:write`
- Enforce scopes in middleware for each endpoint.
Payment / Subscription model
- Pricing & plans (Ethereum-based monthly payments)
- The gateway requires paid subscriptions to use the network. Example starter plans:
- Basic: 0.1 ETH / month -> default quota (e.g., 1,000 RPM)
- Pro: 0.2 ETH / month -> higher quota (e.g., 5,000 RPM)
- Elite: 0.3 ETH / month -> top quota (e.g., 50,000 RPM)
- Plans are configurable and billed per subscription period (monthly by default).
- Payment verification mode:
- Transaction-proof commit (MVP): the user pays the gateway's billing address or staking/payment contract on Ethereum. The payment transaction must include a memo/metadata field or be directed to a payment endpoint structured to identify the `client_id`. The gateway verifies the transaction on-chain (via JSON-RPC) and marks the subscription active for the paid period.
- Event-driven contract listening (optional): deploy a simple subscription contract that emits `PaymentMade(client_id, wallet, plan, amount, tx)` events — gateway listens and reconciles subscriptions automatically.
- Testnet support:
- Support Ethereum testnets (e.g., Goerli) for testing flows without spending real ETH. Gateway config must allow testnet mode and separate testnet payment address/contract.
- Billing cycle & renewal:
- When a payment is verified, set subscription validity for the plan period (e.g., 30 days). The gateway should notify (via webhook or SDK callback) before expiration and support manual or automated renewal (client submits another payment).
- If payment is missed or subscription expires, downgrade quotas to the free/default plan or suspend access depending on policy.
- Confirmation requirements:
- Require configurable confirmation counts before marking a payment as final (example: 12 confirmations for Ethereum mainnet; lower for testnet in dev).
- Refunds & dispute handling:
- Gateway should define a refund/dispute policy (manual or automated) — out of scope for MVP but planned.
Database & storage plan
- Use a separate RQLite logical DB called `core` to store gateway metadata. Rationale:
- Avoid mixing application data with gateway operational metadata.
- Easier backups / migrations for gateway-only state.
- `core` schema (sketch)
- `apps`:
- id UUID, client_id TEXT (unique), namespace TEXT, wallet_pubkey TEXT, wallet_type TEXT, scopes JSON, status TEXT, metadata JSON, created_at, updated_at
- `nonces`:
- nonce TEXT PK, wallet_pubkey TEXT, created_at, expires_at, used BOOL, ip_addr TEXT
- `subscriptions`:
- id UUID, app_id FK, chain TEXT, plan TEXT, amount_paid NUMERIC, tx_signature TEXT, confirmed BOOL, confirmations INT, period_start TIMESTAMP, period_end TIMESTAMP, auto_renew BOOL, testnet BOOL, created_at, updated_at
- `refresh_tokens`:
- jti TEXT PK, client_id FK, hashed_token TEXT, expires_at TIMESTAMP, revoked BOOL, created_at
- `api_keys`:
- id UUID, app_id, hashed_key TEXT, description, created_at, revoked BOOL
- `audit_events`:
- id UUID, app_id nullable, event_type TEXT, details JSON, created_at
- Access patterns:
- Gateway writes to `core`; background watcher verifies payments/subscriptions and writes audit events.
- When validating JWTs, check `jti` blacklist and optionally verify `apps.status` and `subscriptions` validity.
Rate-limiting architecture
- Use Redis token-bucket per `client_id` for production. Fallback to in-memory limiter for dev.
- Keyed by `client_id`; token bucket parameters derived from subscription plan or default (free) plan.
- Quota update flow:
- When subscription status changes (payment commit / expiration / renewal), background worker recalculates plan quotas and sets new token-bucket capacity in Redis.
- Middleware consumes tokens on request; return 429 when exhausted.
- Consider endpoint-specific quota (DB-write quotas smaller than read/storage).
Background workers & payment watcher
- Background service tasks:
- Payment watcher: listen to payment contract events or poll transactions; verify payments, mark confirmations, and update `subscriptions`.
- Token revocation worker: expire revoked tokens from cache; enforce `jti` blacklist.
- Quota reconciler: push new quotas to Redis after subscription changes (payment/expiration/renewal).
- Audit logger: persist critical events to `core.audit_events`.
Security hardening (key points)
- Transport: require TLS for all gateways. Support mTLS for internal connections if desired.
- Signature verification:
- Support Solana (ed25519) and Ethereum (secp256k1 SIWE/EIP-191) signature formats.
- Challenges are single-use and time-limited.
- JWT & keys:
- Use RS256/ES256 and store private keys in KMS/HSM. Publish JWKS for clients.
- Rotate keys and keep old keys in JWKS until no tokens signed by them remain.
- Token lifecycle:
- Short access TTL, rotating refresh tokens with hashed storage, jti blacklisting for revocation.
- Input validation:
- Enforce strict validation on SQL queries (parameterized), topic names, key names, sizes.
- Rate-limits + quotas:
- Enforce per-client quotas. Additional IP-based rate-limiting as fallback.
- Anti-sybil for registration:
- Rate-limit challenge requests per IP and per wallet address.
- Require payments/NFT for elevated capabilities (paid plans or verified NFT holders).
- Apply CAPTCHAs or step-up verification if suspicious activity is detected (many signups from same IP/wallet).
- Monitor behavioral signals (usage spikes, repeated failures) and automatically throttle or block abusive actors.
- Multi-tenant isolation & namespace enforcement:
- Principle: every App must be strictly namespaced. All cross-service operations (DB, Storage, Pub/Sub, Network actions) must be checked and scoped to the App's namespace as declared in the App record and embedded in JWTs.
- JWT binding: issue tokens that include a `namespace` claim. All gateway handlers must verify that any request-scoped `namespace` parameter or implied namespace (for example when creating resources) matches the token's `namespace`.
- Storage keys: internally prefix all storage operations with the namespace (e.g., `ns::<namespace>::<key>`). The gateway API must require either an explicit `namespace` parameter or infer it from the token; never accept a raw key without namespacing.
- Pub/Sub topics: require namespaced topic names (e.g., `<namespace>.<topic>`). Reject topic operations that omit or try to impersonate a different namespace. When forwarding subscriptions to the internal pubsub layer, map to namespaced topics only.
- Database isolation and table naming:
- Prefer one of these techniques (ordered by recommended deployment complexity):
1. Logical per-app RQLite DB (best isolation): create/assign a dedicated logical DB or database file for each app so tables live in the app's DB and cannot collide.
2. Table name prefixing (practical): prefix table names with namespace (e.g., `ns__<namespace>__users`) and enforce gateway-only creation to avoid collisions.
3. SQL sandboxing + query rewriting: rewrite queries to inject namespace-qualified table names or run them in a namespaced schema; disallow raw `ATTACH`/`DETACH` and other DDL that can escape namespace.
- For MVP, implement table name prefixing and forbid arbitrary DDL by non-admin apps. If you later need stronger isolation, migrate to per-app logical DBs.
- Resource creation rules:
- Only allow apps to create resources (tables, topics, storage keys) within their namespace.
- When a create request arrives, the gateway must validate:
- The token `namespace` matches the requested namespace.
- The resource name, after applying namespace prefix, does not already exist under another namespace.
- Enforce strict name validation (regex), disallowing `..`, slashes and control chars.
- Prevent namespace collisions and impersonation:
- Reserve a namespace namespace-ownership table in `core` that records `owner_app_id`, `namespace`, `created_at`, and optional `domain`/`metadata`.
- Reject any create attempt for a namespace that is already registered to another app.
- Provide an admin-only transfer mechanism for namespace ownership (manual transfer requires validation).
- Middleware & enforcement:
- Implement a centralized namespace enforcement middleware that runs before handler logic:
- Extract `namespace` from JWT.
- Compare to requested namespace (query/body/path). If mismatch, return 403.
- For internal DB calls, automatically apply namespace prefix mapping.
- Log and audit any 403 cross-namespace attempts for monitoring and alerts.
- SQL & command safety:
- Disallow dangerous SQL statements from non-admin tokens: `ATTACH`, `DETACH`, `PRAGMA` (sensitive), `ALTER` (unless permitted), and `DROP TABLE` without proper scope checks.
- Enforce parameterized queries only. Optionally maintain a whitelist of allowed DDL/DDL-like statements for higher-tier apps that request explicit permissions.
- Testing & validation:
- Add unit and integration tests that attempt cross-namespace access (DB reads/writes, storage key reads, pubsub subscriptions) and assert they are rejected.
- Add fuzz tests for topic and key naming to ensure sanitization and prefixing is robust.
- Migration & operator notes:
- Document the namespace naming convention and migration path if you need to move an app from prefixing to per-app DB later.
- Provide tooling to inspect `core.namespace_ownership` and reconcile accidental collisions or orphaned namespaces.
- Secrets management:
- Store API keys and refresh tokens hashed.
- Protect DB credentials and RQLite nodes with network controls.
- Audit & monitoring:
- Structured logs with redaction rules.
- Prometheus metrics for key events: stake commits, registration attempts, JWT issuance, quota breaches.
Operational & deployment considerations
- Separate gateway process to scale independently from nodes.
- For local/demo: provide `--embedded` or `node --enable-gateway` dev-only mode that runs gateway in-process on localhost.
- Expose Prometheus `/metrics`.
- Provide configuration flags:
- `--http-listen`, `--grpc-listen`, `--tls-cert`, `--tls-key`, `--jwks-url`, `--stake-contracts`, `--chain-rpc` endpoints, `--redis-url`, `--core-db-endpoint`
- Run gateway behind an API gateway / LB in production for TLS termination or WAF if needed.
- Use KMS for signing key management and automated cert issuance (cert-manager or cloud provider).
Implementation roadmap (milestones & tasks)
Phase 0 — Design & infra
- Finalize JWKS/token signing approach (RS256 vs ES256).
- Define staking contract interface for Solana (and EVM if planned).
- Create `core` RQLite schema SQL migrations.
Phase 1 — Minimal Gateway MVP
- Scaffold `cmd/gateway` and `pkg/gateway/bridge`.
- Implement:
- `/v1/auth/challenge` and `/v1/auth/register` (wallet signature verification)
- Token issuance (JWT + refresh token)
- Simple `apps` CRUD in `core` DB
- Basic endpoints: `/v1/health`, `/v1/network/peers`, `/v1/pubsub/publish`, `/v1/storage/get|put`
- Basic WebSocket pubsub `/v1/pubsub/ws`
- Local in-memory rate limiter with default quotas
- Create TypeScript example client demonstrating challenge → sign → register → use token → WS pubsub.
Phase 2 — Payments & quotas
- Add payment endpoints: `/v1/payments/info`, `/v1/payments/commit`, `/v1/payments/status`
- Implement Ethereum (EVM) adapter to verify payment txs via JSON-RPC (support mainnet + testnets such as Goerli)
- Add Redis-backed rate limiter and plan mapping based on subscription plan (Basic/Pro/Elite)
- Implement background payment watcher to verify transactions, confirm payments, set subscription periods, and push quota updates
- Provide testnet configuration and flows so integrations can be tested without spending real ETH
Phase 3 — Production hardening & SDKs
- Integrate persistent Redis + RQLite `core` DB in prod config
- Replace in-memory limiter with Redis; add quota recalculation on stake changes
- Add JWKS endpoints, key rotation, KMS integration
- Add API key issuance (hashed)
- Add OpenAPI spec and generate JS/Swift SDKs
- Add metrics, logging, alerting and documentation
Phase 4 — Optional: On-chain contracts & advanced flows
- Deploy staking contract (Solana/EVM) with event emission
- Add NFT attestation flow
- (Optional) Implement direct libp2p-js path for browser-native P2P
Developer tasks — immediate actionable items
1. Create RQLite `core` DB migration SQL and add to repo migrations (include `apps`, `nonces`, `subscriptions`, `refresh_tokens`, `api_keys`, `audit_events`, and `namespace_ownership` tables).
2. Scaffold `cmd/gateway/main.go` with flags `--http-listen`, `--grpc`, `--tls-*`, `--redis`, `--core-db`.
3. Implement `pkg/gateway/auth` with challenge/register handlers and Ethereum signature verification helper (for EOA flows / SIWE).
4. Implement `pkg/gateway/bridge` to call `client.NewClient(DefaultClientConfig(namespace))` and wire basic endpoints (pubsub publish, storage get/put).
5. Add WebSocket pubsub forwarding using `client.PubSub().Subscribe` and map to WS sessions.
6. Add Redis-based token-bucket `pkg/gateway/rate` and middleware for HTTP endpoints.
7. Implement `/v1/payments/commit` Ethereum adapter skeleton (verify payment tx via JSON-RPC and support testnets like Goerli).
8. Produce OpenAPI (YAML) for the endpoints to allow SDK generation.
9. Build example TypeScript client that performs challenge -> sign -> register -> use payments on testnet -> publish/subscribe.
10. Implement namespace enforcement middleware:
- Validate token `namespace` claim and ensure it matches any requested `namespace` parameter or infers namespace for the operation.
- Map and apply namespace prefixes to storage keys, pubsub topics, and DB table names.
- Reject attempts to access or create resources outside the token's namespace (return 403).
11. Add `core.namespace_ownership` table and enforcement logic to prevent two apps from owning the same namespace; disallow create requests for reserved/owned namespaces.
12. Implement create-resource guards:
- Ensure table/topic/key creation requests include the namespace and that the gateway applies/validates namespace prefixes before creating resources.
- Disallow non-admin DDL that can escape namespace boundaries (`ATTACH`, `DETACH`, raw file access).
13. Add unit and integration tests for multi-tenant isolation:
- Tests that verify reads/writes across namespaces are rejected.
- Tests that verify topic and storage key isolation enforcement.
14. Add audit hooks to log any cross-namespace access attempts and integrate alerts for repeated violations.
15. Update API documentation and SDKs to document the namespace requirement and show examples of correctly namespaced calls.
Notes & guidelines
- Use separate `core` logical DB name when creating rqlite connections: `http://<host>:<port>/core` or a connection that uses a dedicated DB directory for the gateway.
- Keep gateway stateless where possible: store short-lived state in Redis. Persistent state goes to `core` RQLite.
- Prefer parameterized SQL calls in gateway code when writing to `core`.
- For wallet signature verification use battle-tested crypto libs (Solana ed25519 from x/crypto, Ethereum ecrecover libs) and accept explicit `wallet_type`.
- Keep WebSocket messages compact (use base64 for binary payloads) and add per-connection subscription limits.
Open questions to finalize before coding
- Which chains to support in v1? (Solana recommended as first)
- Exact stake thresholds and confirmation counts for each chain
- JWKS key storage policy (local PEM for dev; KMS in prod)
- Redis availability & cluster sizing for rate-limiter
- Should `core` RQLite be colocated on node or run as separate RQLite node cluster? (Separate RQLite logical DB is recommended)
If you want, I can now:
- Generate the `network/TASK.md` (this file) as well as scaffolded Go handler stubs for `/v1/auth/challenge`, `/v1/auth/register`, `/v1/payments/commit` and example SQL migrations for `core` DB (including `subscriptions` table).
- Or produce an OpenAPI spec for the MVP endpoints so you can generate SDKs. I can also produce example testnet payment flows (Goerli) and a TypeScript test client that demonstrates paying on testnet and activating a subscription.
Tell me which code artifact you want next and I will produce it.

97
cmd/gateway/config.go Normal file
View File

@ -0,0 +1,97 @@
package main
import (
"flag"
"os"
"strings"
"git.debros.io/DeBros/network/pkg/gateway"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
// For transition, alias main.GatewayConfig to pkg/gateway.Config
// server.go will be removed; this keeps compatibility until then.
type GatewayConfig = gateway.Config
func getEnvDefault(key, def string) string {
if v := os.Getenv(key); strings.TrimSpace(v) != "" {
return v
}
return def
}
func getEnvBoolDefault(key string, def bool) bool {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
return def
}
switch strings.ToLower(v) {
case "1", "true", "t", "yes", "y", "on":
return true
case "0", "false", "f", "no", "n", "off":
return false
default:
return def
}
}
// parseGatewayConfig parses flags and environment variables into GatewayConfig.
// Priority: flags > env > defaults.
func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
addr := flag.String("addr", getEnvDefault("GATEWAY_ADDR", ":8080"), "HTTP listen address (e.g., :8080)")
ns := flag.String("namespace", getEnvDefault("GATEWAY_NAMESPACE", "default"), "Client namespace for scoping resources")
peers := flag.String("bootstrap-peers", getEnvDefault("GATEWAY_BOOTSTRAP_PEERS", ""), "Comma-separated bootstrap peers for network client")
requireAuth := flag.Bool("require-auth", getEnvBoolDefault("GATEWAY_REQUIRE_AUTH", false), "Require API key authentication for requests")
apiKeysStr := flag.String("api-keys", getEnvDefault("GATEWAY_API_KEYS", ""), "Comma-separated API keys, optionally as key:namespace")
// Do not call flag.Parse() elsewhere to avoid double-parsing
flag.Parse()
var bootstrap []string
if p := strings.TrimSpace(*peers); p != "" {
parts := strings.Split(p, ",")
for _, part := range parts {
val := strings.TrimSpace(part)
if val != "" {
bootstrap = append(bootstrap, val)
}
}
}
apiKeys := make(map[string]string)
if s := strings.TrimSpace(*apiKeysStr); s != "" {
tokens := strings.Split(s, ",")
for _, tok := range tokens {
tok = strings.TrimSpace(tok)
if tok == "" {
continue
}
key := tok
nsOverride := ""
if i := strings.Index(tok, ":"); i != -1 {
key = strings.TrimSpace(tok[:i])
nsOverride = strings.TrimSpace(tok[i+1:])
}
if key != "" {
apiKeys[key] = nsOverride
}
}
}
logger.ComponentInfo(logging.ComponentGeneral, "Loaded gateway configuration",
zap.String("addr", *addr),
zap.String("namespace", *ns),
zap.Int("bootstrap_peer_count", len(bootstrap)),
zap.Bool("require_auth", *requireAuth),
zap.Int("api_key_count", len(apiKeys)),
)
return &gateway.Config{
ListenAddr: *addr,
ClientNamespace: *ns,
BootstrapPeers: bootstrap,
RequireAuth: *requireAuth,
APIKeys: apiKeys,
}
}

68
cmd/gateway/main.go Normal file
View File

@ -0,0 +1,68 @@
package main
import (
"context"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"git.debros.io/DeBros/network/pkg/gateway"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
func setupLogger() *logging.ColoredLogger {
logger, err := logging.NewColoredLogger(logging.ComponentGeneral, true)
if err != nil {
panic(err)
}
return logger
}
func main() {
logger := setupLogger()
// Load gateway config (flags/env)
cfg := parseGatewayConfig(logger)
// Initialize gateway (connect client, prepare routes)
g, err := gateway.New(logger, cfg)
if err != nil {
logger.ComponentError(logging.ComponentGeneral, "failed to initialize gateway", zap.Error(err))
os.Exit(1)
}
defer g.Close()
server := &http.Server{
Addr: cfg.ListenAddr,
Handler: g.Routes(),
}
// Start server
go func() {
logger.ComponentInfo(logging.ComponentGeneral, "Gateway HTTP server starting",
zap.String("addr", cfg.ListenAddr),
zap.String("namespace", cfg.ClientNamespace),
zap.Int("bootstrap_peer_count", len(cfg.BootstrapPeers)),
)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.ComponentError(logging.ComponentGeneral, "HTTP server error", zap.Error(err))
os.Exit(1)
}
}()
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
logger.ComponentInfo(logging.ComponentGeneral, "Shutting down gateway HTTP server...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
logger.ComponentError(logging.ComponentGeneral, "HTTP server shutdown error", zap.Error(err))
}
logger.ComponentInfo(logging.ComponentGeneral, "Gateway shutdown complete")
}

3
go.mod
View File

@ -5,6 +5,7 @@ go 1.23.8
toolchain go1.24.1 toolchain go1.24.1
require ( require (
github.com/ethereum/go-ethereum v1.13.14
github.com/libp2p/go-libp2p v0.41.1 github.com/libp2p/go-libp2p v0.41.1
github.com/libp2p/go-libp2p-pubsub v0.14.2 github.com/libp2p/go-libp2p-pubsub v0.14.2
github.com/multiformats/go-multiaddr v0.15.0 github.com/multiformats/go-multiaddr v0.15.0
@ -17,6 +18,7 @@ require (
require ( require (
github.com/benbjohnson/clock v1.3.5 // indirect github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect
@ -34,6 +36,7 @@ require (
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/huin/goupnp v1.3.0 // indirect github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/go-cid v0.5.0 // indirect github.com/ipfs/go-cid v0.5.0 // indirect
github.com/ipfs/go-log/v2 v2.6.0 // indirect github.com/ipfs/go-log/v2 v2.6.0 // indirect

8
go.sum
View File

@ -16,6 +16,10 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@ -46,6 +50,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/uo= github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/uo=
github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/ethereum/go-ethereum v1.13.14 h1:EwiY3FZP94derMCIam1iW4HFVrSgIcpsu0HwTQtm6CQ=
github.com/ethereum/go-ethereum v1.13.14/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg= github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
@ -97,6 +103,8 @@ github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:Fecb
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg= github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg=

View File

@ -0,0 +1,55 @@
-- DeBros Gateway - Initial database schema (SQLite/RQLite dialect)
-- This file scaffolds core tables used by the HTTP gateway for auth, observability, and namespacing.
-- Apply via your migration tooling or manual execution in RQLite.
BEGIN;
-- Tracks applied migrations (optional if your runner manages this separately)
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
-- Namespaces (tenant/app isolation)
CREATE TABLE IF NOT EXISTS namespaces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- API keys (basic authentication/authorization scaffold)
CREATE TABLE IF NOT EXISTS api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL UNIQUE,
name TEXT,
namespace_id INTEGER NOT NULL,
scopes TEXT, -- comma-separated or JSON array; refine later
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_api_keys_namespace ON api_keys(namespace_id);
-- Request logs (simple observability; expand with more fields later)
CREATE TABLE IF NOT EXISTS request_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
method TEXT NOT NULL,
path TEXT NOT NULL,
status_code INTEGER NOT NULL,
bytes_out INTEGER NOT NULL DEFAULT 0,
duration_ms INTEGER NOT NULL DEFAULT 0,
ip TEXT,
api_key_id INTEGER,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_request_logs_api_key ON request_logs(api_key_id);
CREATE INDEX IF NOT EXISTS idx_request_logs_created_at ON request_logs(created_at);
-- Seed a default namespace for development convenience
INSERT OR IGNORE INTO namespaces(name) VALUES ('default');
-- Mark this migration as applied (optional)
INSERT OR IGNORE INTO schema_migrations(version) VALUES (1);
COMMIT;

95
migrations/002_core.sql Normal file
View File

@ -0,0 +1,95 @@
-- DeBros Gateway - Core schema (Phase 2)
-- Adds apps, nonces, subscriptions, refresh_tokens, audit_events, namespace_ownership
-- SQLite/RQLite dialect
BEGIN;
-- Apps registered within a namespace (optional public key for attestation)
CREATE TABLE IF NOT EXISTS apps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
app_id TEXT NOT NULL,
name TEXT,
public_key TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(namespace_id, app_id),
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_apps_namespace ON apps(namespace_id);
-- Wallet nonces for challenge-response auth
CREATE TABLE IF NOT EXISTS nonces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
wallet TEXT NOT NULL,
nonce TEXT NOT NULL,
purpose TEXT,
expires_at TIMESTAMP,
used_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(namespace_id, wallet, nonce),
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_nonces_wallet ON nonces(wallet);
CREATE INDEX IF NOT EXISTS idx_nonces_expires ON nonces(expires_at);
-- Subscriptions to topics or channels for callbacks/notifications
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
app_id INTEGER,
topic TEXT NOT NULL,
endpoint TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE,
FOREIGN KEY(app_id) REFERENCES apps(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_subscriptions_ns ON subscriptions(namespace_id);
CREATE INDEX IF NOT EXISTS idx_subscriptions_topic ON subscriptions(topic);
-- Opaque refresh tokens for JWT
CREATE TABLE IF NOT EXISTS refresh_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
subject TEXT NOT NULL,
token TEXT NOT NULL UNIQUE,
audience TEXT,
expires_at TIMESTAMP,
revoked_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_refresh_subject ON refresh_tokens(subject);
CREATE INDEX IF NOT EXISTS idx_refresh_expires ON refresh_tokens(expires_at);
-- Audit events for security and observability
CREATE TABLE IF NOT EXISTS audit_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
actor TEXT,
action TEXT NOT NULL,
resource TEXT,
ip TEXT,
metadata TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_audit_ns_time ON audit_events(namespace_id, created_at);
CREATE INDEX IF NOT EXISTS idx_audit_action ON audit_events(action);
-- Namespace ownership mapping (who controls a namespace)
CREATE TABLE IF NOT EXISTS namespace_ownership (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
owner_type TEXT NOT NULL, -- e.g., 'wallet', 'api_key'
owner_id TEXT NOT NULL, -- e.g., wallet address or api key string
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(namespace_id, owner_type, owner_id),
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_ns_owner_ns ON namespace_ownership(namespace_id);
-- Optional marker (ignored by runner)
INSERT OR IGNORE INTO schema_migrations(version) VALUES (2);
COMMIT;

View File

@ -129,6 +129,7 @@ type ClientConfig struct {
RetryAttempts int `json:"retry_attempts"` RetryAttempts int `json:"retry_attempts"`
RetryDelay time.Duration `json:"retry_delay"` RetryDelay time.Duration `json:"retry_delay"`
QuietMode bool `json:"quiet_mode"` // Suppress debug/info logs QuietMode bool `json:"quiet_mode"` // Suppress debug/info logs
APIKey string `json:"api_key"` // Optional API key for gateway auth (not enforced by client)
} }
// DefaultClientConfig returns a default client configuration // DefaultClientConfig returns a default client configuration
@ -145,5 +146,7 @@ func DefaultClientConfig(appName string) *ClientConfig {
ConnectTimeout: time.Second * 30, ConnectTimeout: time.Second * 30,
RetryAttempts: 3, RetryAttempts: 3,
RetryDelay: time.Second * 5, RetryDelay: time.Second * 5,
QuietMode: false,
APIKey: "",
} }
} }

View File

@ -0,0 +1,168 @@
package gateway
import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"net/http"
"strings"
"git.debros.io/DeBros/network/pkg/storage"
)
// appsHandler implements minimal CRUD for apps within a namespace.
// Routes handled:
// - GET /v1/apps -> list
// - POST /v1/apps -> create
// - GET /v1/apps/{app_id} -> fetch
// - PUT /v1/apps/{app_id} -> update (name/public_key)
// - DELETE /v1/apps/{app_id} -> delete
func (g *Gateway) appsHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
ns := g.cfg.ClientNamespace
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
if strings.TrimSpace(ns) == "" { ns = "default" }
db := g.client.Database()
nsID, err := g.resolveNamespaceID(ctx, ns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
path := r.URL.Path
// Determine if operating on collection or single resource
if path == "/v1/apps" || path == "/v1/apps/" {
switch r.Method {
case http.MethodGet:
// List apps
res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? ORDER BY created_at DESC", nsID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
items := make([]map[string]any, 0, res.Count)
for _, row := range res.Rows {
item := map[string]any{
"app_id": row[0],
"name": row[1],
"public_key": row[2],
"namespace": ns,
"created_at": row[3],
}
items = append(items, item)
}
writeJSON(w, http.StatusOK, map[string]any{"items": items, "count": len(items)})
return
case http.MethodPost:
// Create app with provided name/public_key
var req struct {
Name string `json:"name"`
PublicKey string `json:"public_key"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
// Generate app_id
buf := make([]byte, 12)
if _, err := rand.Read(buf); err != nil {
writeError(w, http.StatusInternalServerError, "failed to generate app id")
return
}
appID := "app_" + base64.RawURLEncoding.EncodeToString(buf)
if _, err := db.Query(ctx, "INSERT INTO apps(namespace_id, app_id, name, public_key) VALUES (?, ?, ?, ?)", nsID, appID, req.Name, req.PublicKey); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{
"app_id": appID,
"name": req.Name,
"public_key": req.PublicKey,
"namespace": ns,
})
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
// Single resource: /v1/apps/{app_id}
if strings.HasPrefix(path, "/v1/apps/") {
appID := strings.TrimPrefix(path, "/v1/apps/")
appID = strings.TrimSpace(appID)
if appID == "" {
writeError(w, http.StatusBadRequest, "missing app_id")
return
}
switch r.Method {
case http.MethodGet:
res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? AND app_id = ? LIMIT 1", nsID, appID)
if err != nil || res == nil || res.Count == 0 {
writeError(w, http.StatusNotFound, "app not found")
return
}
row := res.Rows[0]
writeJSON(w, http.StatusOK, map[string]any{
"app_id": row[0],
"name": row[1],
"public_key": row[2],
"namespace": ns,
"created_at": row[3],
})
return
case http.MethodPut:
var req struct {
Name *string `json:"name"`
PublicKey *string `json:"public_key"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
// Build update dynamically
sets := make([]string, 0, 2)
args := make([]any, 0, 4)
if req.Name != nil {
sets = append(sets, "name = ?")
args = append(args, *req.Name)
}
if req.PublicKey != nil {
sets = append(sets, "public_key = ?")
args = append(args, *req.PublicKey)
}
if len(sets) == 0 {
writeError(w, http.StatusBadRequest, "no fields to update")
return
}
q := "UPDATE apps SET " + strings.Join(sets, ", ") + " WHERE namespace_id = ? AND app_id = ?"
args = append(args, nsID, appID)
if _, err := db.Query(ctx, q, args...); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
return
case http.MethodDelete:
if _, err := db.Query(ctx, "DELETE FROM apps WHERE namespace_id = ? AND app_id = ?", nsID, appID); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
writeError(w, http.StatusNotFound, "not found")
}

View File

@ -0,0 +1,493 @@
package gateway
import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"net/http"
"strconv"
"strings"
"time"
"git.debros.io/DeBros/network/pkg/storage"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
)
func (g *Gateway) whoamiHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Determine namespace (may be overridden by auth layer)
ns := g.cfg.ClientNamespace
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
// Prefer JWT if present
if v := ctx.Value(ctxKeyJWT); v != nil {
if claims, ok := v.(*jwtClaims); ok && claims != nil {
writeJSON(w, http.StatusOK, map[string]any{
"authenticated": true,
"method": "jwt",
"subject": claims.Sub,
"issuer": claims.Iss,
"audience": claims.Aud,
"issued_at": claims.Iat,
"not_before": claims.Nbf,
"expires_at": claims.Exp,
"namespace": ns,
"require_auth": g.cfg != nil && g.cfg.RequireAuth,
})
return
}
}
// Fallback: API key identity
var key string
if v := ctx.Value(ctxKeyAPIKey); v != nil {
if s, ok := v.(string); ok {
key = s
}
}
writeJSON(w, http.StatusOK, map[string]any{
"authenticated": key != "",
"method": "api_key",
"api_key": key,
"namespace": ns,
"require_auth": g.cfg != nil && g.cfg.RequireAuth,
})
}
func (g *Gateway) challengeHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var req struct {
Wallet string `json:"wallet"`
Purpose string `json:"purpose"`
Namespace string `json:"namespace"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
if strings.TrimSpace(req.Wallet) == "" {
writeError(w, http.StatusBadRequest, "wallet is required")
return
}
ns := strings.TrimSpace(req.Namespace)
if ns == "" {
ns = strings.TrimSpace(g.cfg.ClientNamespace)
if ns == "" { ns = "default" }
}
// Generate a URL-safe random nonce (32 bytes)
buf := make([]byte, 32)
if _, err := rand.Read(buf); err != nil {
writeError(w, http.StatusInternalServerError, "failed to generate nonce")
return
}
nonce := base64.RawURLEncoding.EncodeToString(buf)
// Insert namespace if missing, fetch id
ctx := r.Context()
db := g.client.Database()
if _, err := db.Query(ctx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
nres, err := db.Query(ctx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns)
if err != nil || nres == nil || nres.Count == 0 || len(nres.Rows) == 0 || len(nres.Rows[0]) == 0 {
writeError(w, http.StatusInternalServerError, "failed to resolve namespace")
return
}
nsID := nres.Rows[0][0]
// Store nonce with 5 minute expiry
if _, err := db.Query(ctx,
"INSERT INTO nonces(namespace_id, wallet, nonce, purpose, expires_at) VALUES (?, ?, ?, ?, datetime('now', '+5 minutes'))",
nsID, req.Wallet, nonce, req.Purpose,
); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"wallet": req.Wallet,
"namespace": ns,
"nonce": nonce,
"purpose": req.Purpose,
"expires_at": time.Now().Add(5 * time.Minute).UTC().Format(time.RFC3339Nano),
})
}
func (g *Gateway) verifyHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var req struct {
Wallet string `json:"wallet"`
Nonce string `json:"nonce"`
Signature string `json:"signature"`
Namespace string `json:"namespace"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
if strings.TrimSpace(req.Wallet) == "" || strings.TrimSpace(req.Nonce) == "" || strings.TrimSpace(req.Signature) == "" {
writeError(w, http.StatusBadRequest, "wallet, nonce and signature are required")
return
}
ns := strings.TrimSpace(req.Namespace)
if ns == "" {
ns = strings.TrimSpace(g.cfg.ClientNamespace)
if ns == "" { ns = "default" }
}
ctx := r.Context()
db := g.client.Database()
nsID, err := g.resolveNamespaceID(ctx, ns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
q := "SELECT id FROM nonces WHERE namespace_id = ? AND wallet = ? AND nonce = ? AND used_at IS NULL AND (expires_at IS NULL OR expires_at > datetime('now')) LIMIT 1"
nres, err := db.Query(ctx, q, nsID, req.Wallet, req.Nonce)
if err != nil || nres == nil || nres.Count == 0 {
writeError(w, http.StatusBadRequest, "invalid or expired nonce")
return
}
nonceID := nres.Rows[0][0]
// EVM personal_sign verification of the nonce
// Hash: keccak256("\x19Ethereum Signed Message:\n" + len(nonce) + nonce)
msg := []byte(req.Nonce)
prefix := []byte("\x19Ethereum Signed Message:\n" + strconv.Itoa(len(msg)))
hash := ethcrypto.Keccak256(prefix, msg)
// Decode signature (expects 65-byte r||s||v, hex with optional 0x)
sigHex := strings.TrimSpace(req.Signature)
if strings.HasPrefix(sigHex, "0x") || strings.HasPrefix(sigHex, "0X") {
sigHex = sigHex[2:]
}
sig, err := hex.DecodeString(sigHex)
if err != nil || len(sig) != 65 {
writeError(w, http.StatusBadRequest, "invalid signature format")
return
}
// Normalize V to 0/1 as expected by geth
if sig[64] >= 27 {
sig[64] -= 27
}
pub, err := ethcrypto.SigToPub(hash, sig)
if err != nil {
writeError(w, http.StatusUnauthorized, "signature recovery failed")
return
}
addr := ethcrypto.PubkeyToAddress(*pub).Hex()
want := strings.ToLower(strings.TrimPrefix(strings.TrimPrefix(req.Wallet, "0x"), "0X"))
got := strings.ToLower(strings.TrimPrefix(strings.TrimPrefix(addr, "0x"), "0X"))
if got != want {
writeError(w, http.StatusUnauthorized, "signature does not match wallet")
return
}
// Mark nonce used now (after successful verification)
if _, err := db.Query(ctx, "UPDATE nonces SET used_at = datetime('now') WHERE id = ?", nonceID); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if g.signingKey == nil {
writeError(w, http.StatusServiceUnavailable, "signing key unavailable")
return
}
// Issue access token (15m) and a refresh token (30d)
token, expUnix, err := g.generateJWT(ns, req.Wallet, 15*time.Minute)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
// create refresh token
rbuf := make([]byte, 32)
if _, err := rand.Read(rbuf); err != nil {
writeError(w, http.StatusInternalServerError, "failed to generate refresh token")
return
}
refresh := base64.RawURLEncoding.EncodeToString(rbuf)
if _, err := db.Query(ctx, "INSERT INTO refresh_tokens(namespace_id, subject, token, audience, expires_at) VALUES (?, ?, ?, ?, datetime('now', '+30 days'))", nsID, req.Wallet, refresh, "gateway"); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"access_token": token,
"token_type": "Bearer",
"expires_in": int(expUnix - time.Now().Unix()),
"refresh_token": refresh,
"subject": req.Wallet,
"namespace": ns,
"nonce": req.Nonce,
"signature_verified": true,
})
}
func (g *Gateway) registerHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var req struct {
Wallet string `json:"wallet"`
Nonce string `json:"nonce"`
Signature string `json:"signature"`
Namespace string `json:"namespace"`
Name string `json:"name"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
if strings.TrimSpace(req.Wallet) == "" || strings.TrimSpace(req.Nonce) == "" || strings.TrimSpace(req.Signature) == "" {
writeError(w, http.StatusBadRequest, "wallet, nonce and signature are required")
return
}
ns := strings.TrimSpace(req.Namespace)
if ns == "" {
ns = strings.TrimSpace(g.cfg.ClientNamespace)
if ns == "" { ns = "default" }
}
ctx := r.Context()
db := g.client.Database()
nsID, err := g.resolveNamespaceID(ctx, ns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
// Validate nonce
q := "SELECT id FROM nonces WHERE namespace_id = ? AND wallet = ? AND nonce = ? AND used_at IS NULL AND (expires_at IS NULL OR expires_at > datetime('now')) LIMIT 1"
nres, err := db.Query(ctx, q, nsID, req.Wallet, req.Nonce)
if err != nil || nres == nil || nres.Count == 0 || len(nres.Rows) == 0 || len(nres.Rows[0]) == 0 {
writeError(w, http.StatusBadRequest, "invalid or expired nonce")
return
}
nonceID := nres.Rows[0][0]
// EVM personal_sign verification of the nonce
msg := []byte(req.Nonce)
prefix := []byte("\x19Ethereum Signed Message:\n" + strconv.Itoa(len(msg)))
hash := ethcrypto.Keccak256(prefix, msg)
// Decode signature (expects 65-byte r||s||v, hex with optional 0x)
sigHex := strings.TrimSpace(req.Signature)
if strings.HasPrefix(sigHex, "0x") || strings.HasPrefix(sigHex, "0X") {
sigHex = sigHex[2:]
}
sig, err := hex.DecodeString(sigHex)
if err != nil || len(sig) != 65 {
writeError(w, http.StatusBadRequest, "invalid signature format")
return
}
// Normalize V to 0/1 as expected by geth
if sig[64] >= 27 {
sig[64] -= 27
}
pub, err := ethcrypto.SigToPub(hash, sig)
if err != nil {
writeError(w, http.StatusUnauthorized, "signature recovery failed")
return
}
addr := ethcrypto.PubkeyToAddress(*pub).Hex()
want := strings.ToLower(strings.TrimPrefix(strings.TrimPrefix(req.Wallet, "0x"), "0X"))
got := strings.ToLower(strings.TrimPrefix(strings.TrimPrefix(addr, "0x"), "0X"))
if got != want {
writeError(w, http.StatusUnauthorized, "signature does not match wallet")
return
}
// Mark nonce used now (after successful verification)
if _, err := db.Query(ctx, "UPDATE nonces SET used_at = datetime('now') WHERE id = ?", nonceID); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
// Derive public key (uncompressed) hex
pubBytes := ethcrypto.FromECDSAPub(pub)
pubHex := "0x" + hex.EncodeToString(pubBytes)
// Generate client app_id
buf := make([]byte, 12)
if _, err := rand.Read(buf); err != nil {
writeError(w, http.StatusInternalServerError, "failed to generate app id")
return
}
appID := "app_" + base64.RawURLEncoding.EncodeToString(buf)
// Persist app
if _, err := db.Query(ctx, "INSERT INTO apps(namespace_id, app_id, name, public_key) VALUES (?, ?, ?, ?)", nsID, appID, req.Name, pubHex); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
// Record namespace ownership by wallet (best-effort)
_, _ = db.Query(ctx, "INSERT OR IGNORE INTO namespace_ownership(namespace_id, owner_type, owner_id) VALUES (?, ?, ?)", nsID, "wallet", req.Wallet)
writeJSON(w, http.StatusCreated, map[string]any{
"client_id": appID,
"app": map[string]any{
"app_id": appID,
"name": req.Name,
"public_key": pubHex,
"namespace": ns,
"wallet": strings.ToLower(strings.TrimPrefix(strings.TrimPrefix(req.Wallet, "0x"), "0X")),
},
"signature_verified": true,
})
}
func (g *Gateway) refreshHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var req struct {
RefreshToken string `json:"refresh_token"`
Namespace string `json:"namespace"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
if strings.TrimSpace(req.RefreshToken) == "" {
writeError(w, http.StatusBadRequest, "refresh_token is required")
return
}
ns := strings.TrimSpace(req.Namespace)
if ns == "" {
ns = strings.TrimSpace(g.cfg.ClientNamespace)
if ns == "" { ns = "default" }
}
ctx := r.Context()
db := g.client.Database()
nsID, err := g.resolveNamespaceID(ctx, ns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
q := "SELECT subject FROM refresh_tokens WHERE namespace_id = ? AND token = ? AND revoked_at IS NULL AND (expires_at IS NULL OR expires_at > datetime('now')) LIMIT 1"
rres, err := db.Query(ctx, q, nsID, req.RefreshToken)
if err != nil || rres == nil || rres.Count == 0 {
writeError(w, http.StatusUnauthorized, "invalid or expired refresh token")
return
}
subject := ""
if len(rres.Rows) > 0 && len(rres.Rows[0]) > 0 {
if s, ok := rres.Rows[0][0].(string); ok {
subject = s
} else {
// fallback: format via json
b, _ := json.Marshal(rres.Rows[0][0])
_ = json.Unmarshal(b, &subject)
}
}
if g.signingKey == nil {
writeError(w, http.StatusServiceUnavailable, "signing key unavailable")
return
}
token, expUnix, err := g.generateJWT(ns, subject, 15*time.Minute)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"access_token": token,
"token_type": "Bearer",
"expires_in": int(expUnix - time.Now().Unix()),
"refresh_token": req.RefreshToken,
"subject": subject,
"namespace": ns,
})
}
// logoutHandler revokes refresh tokens. If a refresh_token is provided, it will
// be revoked. If all=true is provided (and the request is authenticated via JWT),
// all tokens for the JWT subject within the namespace are revoked.
func (g *Gateway) logoutHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var req struct {
RefreshToken string `json:"refresh_token"`
Namespace string `json:"namespace"`
All bool `json:"all"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
ns := strings.TrimSpace(req.Namespace)
if ns == "" {
ns = strings.TrimSpace(g.cfg.ClientNamespace)
if ns == "" { ns = "default" }
}
ctx := r.Context()
db := g.client.Database()
nsID, err := g.resolveNamespaceID(ctx, ns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if strings.TrimSpace(req.RefreshToken) != "" {
// Revoke specific token
if _, err := db.Query(ctx, "UPDATE refresh_tokens SET revoked_at = datetime('now') WHERE namespace_id = ? AND token = ? AND revoked_at IS NULL", nsID, req.RefreshToken); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "revoked": 1})
return
}
if req.All {
// Require JWT to identify subject
var subject string
if v := ctx.Value(ctxKeyJWT); v != nil {
if claims, ok := v.(*jwtClaims); ok && claims != nil {
subject = strings.TrimSpace(claims.Sub)
}
}
if subject == "" {
writeError(w, http.StatusUnauthorized, "jwt required for all=true")
return
}
if _, err := db.Query(ctx, "UPDATE refresh_tokens SET revoked_at = datetime('now') WHERE namespace_id = ? AND subject = ? AND revoked_at IS NULL", nsID, subject); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "revoked": "all"})
return
}
writeError(w, http.StatusBadRequest, "nothing to revoke: provide refresh_token or all=true")
}

56
pkg/gateway/db_helpers.go Normal file
View File

@ -0,0 +1,56 @@
package gateway
import (
"context"
"strings"
)
func (g *Gateway) resolveNamespaceID(ctx context.Context, ns string) (interface{}, error) {
db := g.client.Database()
if _, err := db.Query(ctx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil {
return nil, err
}
res, err := db.Query(ctx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns)
if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 {
return nil, err
}
return res.Rows[0][0], nil
}
func (g *Gateway) seedConfiguredAPIKeys(ctx context.Context) error {
db := g.client.Database()
for key, nsOverride := range g.cfg.APIKeys {
ns := strings.TrimSpace(nsOverride)
if ns == "" {
ns = strings.TrimSpace(g.cfg.ClientNamespace)
if ns == "" {
ns = "default"
}
}
// Ensure namespace exists
if _, err := db.Query(ctx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil {
return err
}
// Lookup namespace id
nres, err := db.Query(ctx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns)
if err != nil {
return err
}
var nsID interface{}
if nres != nil && nres.Count > 0 && len(nres.Rows) > 0 && len(nres.Rows[0]) > 0 {
nsID = nres.Rows[0][0]
} else {
// Should not happen, but guard
continue
}
// Upsert API key
if _, err := db.Query(ctx, "INSERT OR IGNORE INTO api_keys(key, name, namespace_id) VALUES (?, ?, ?)", key, "", nsID); err != nil {
return err
}
// Record namespace ownership for API key (best-effort)
_, _ = db.Query(ctx, "INSERT OR IGNORE INTO namespace_ownership(namespace_id, owner_type, owner_id) VALUES (?, 'api_key', ?)", nsID, key)
}
return nil
}

85
pkg/gateway/gateway.go Normal file
View File

@ -0,0 +1,85 @@
package gateway
import (
"context"
"crypto/rand"
"crypto/rsa"
"strconv"
"time"
"git.debros.io/DeBros/network/pkg/client"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
// Config holds configuration for the gateway server
type Config struct {
ListenAddr string
ClientNamespace string
BootstrapPeers []string
RequireAuth bool
APIKeys map[string]string // key -> optional namespace override
}
type Gateway struct {
logger *logging.ColoredLogger
cfg *Config
client client.NetworkClient
startedAt time.Time
signingKey *rsa.PrivateKey
keyID string
}
// New creates and initializes a new Gateway instance
func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
// Build client config from gateway cfg
cliCfg := client.DefaultClientConfig(cfg.ClientNamespace)
if len(cfg.BootstrapPeers) > 0 {
cliCfg.BootstrapPeers = cfg.BootstrapPeers
}
c, err := client.NewClient(cliCfg)
if err != nil {
logger.ComponentError(logging.ComponentClient, "failed to create network client", zap.Error(err))
return nil, err
}
if err := c.Connect(); err != nil {
logger.ComponentError(logging.ComponentClient, "failed to connect network client", zap.Error(err))
return nil, err
}
logger.ComponentInfo(logging.ComponentClient, "Network client connected",
zap.String("namespace", cliCfg.AppName),
zap.Int("bootstrap_peer_count", len(cliCfg.BootstrapPeers)),
)
gw := &Gateway{
logger: logger,
cfg: cfg,
client: c,
startedAt: time.Now(),
}
// Generate local RSA signing key for JWKS/JWT (ephemeral for now)
if key, err := rsa.GenerateKey(rand.Reader, 2048); err == nil {
gw.signingKey = key
gw.keyID = "gw-" + strconv.FormatInt(time.Now().Unix(), 10)
} else {
logger.ComponentWarn(logging.ComponentGeneral, "failed to generate RSA key; jwks will be empty", zap.Error(err))
}
// Seed configured API keys into DB (best-effort)
_ = gw.seedConfiguredAPIKeys(context.Background())
return gw, nil
}
// Close disconnects the gateway client
func (g *Gateway) Close() {
if g.client != nil {
if err := g.client.Disconnect(); err != nil {
g.logger.ComponentWarn(logging.ComponentClient, "error during client disconnect", zap.Error(err))
}
}
}

View File

@ -0,0 +1,35 @@
package gateway
import (
"encoding/json"
"net/http"
)
type statusResponseWriter struct {
http.ResponseWriter
status int
bytes int
}
func (w *statusResponseWriter) WriteHeader(code int) {
w.status = code
w.ResponseWriter.WriteHeader(code)
}
func (w *statusResponseWriter) Write(b []byte) (int, error) {
n, err := w.ResponseWriter.Write(b)
w.bytes += n
return n, err
}
// writeJSON writes JSON with status code
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
// writeError writes a standardized JSON error
func writeError(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]any{"error": msg})
}

157
pkg/gateway/jwt.go Normal file
View File

@ -0,0 +1,157 @@
package gateway
import (
"crypto"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"net/http"
"strings"
"time"
)
func (g *Gateway) jwksHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if g.signingKey == nil {
_ = json.NewEncoder(w).Encode(map[string]any{"keys": []any{}})
return
}
pub := g.signingKey.Public().(*rsa.PublicKey)
n := pub.N.Bytes()
// Encode exponent as big-endian bytes
eVal := pub.E
eb := make([]byte, 0)
for eVal > 0 {
eb = append([]byte{byte(eVal & 0xff)}, eb...)
eVal >>= 8
}
if len(eb) == 0 {
eb = []byte{0}
}
jwk := map[string]string{
"kty": "RSA",
"use": "sig",
"alg": "RS256",
"kid": g.keyID,
"n": base64.RawURLEncoding.EncodeToString(n),
"e": base64.RawURLEncoding.EncodeToString(eb),
}
_ = json.NewEncoder(w).Encode(map[string]any{"keys": []any{jwk}})
}
// Internal types for JWT handling
type jwtHeader struct {
Alg string `json:"alg"`
Typ string `json:"typ"`
Kid string `json:"kid"`
}
type jwtClaims struct {
Iss string `json:"iss"`
Sub string `json:"sub"`
Aud string `json:"aud"`
Iat int64 `json:"iat"`
Nbf int64 `json:"nbf"`
Exp int64 `json:"exp"`
Namespace string `json:"namespace"`
}
// parseAndVerifyJWT verifies an RS256 JWT created by this gateway and returns claims
func (g *Gateway) parseAndVerifyJWT(token string) (*jwtClaims, error) {
if g.signingKey == nil {
return nil, errors.New("signing key unavailable")
}
parts := strings.Split(token, ".")
if len(parts) != 3 {
return nil, errors.New("invalid token format")
}
hb, err := base64.RawURLEncoding.DecodeString(parts[0])
if err != nil {
return nil, errors.New("invalid header encoding")
}
pb, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
return nil, errors.New("invalid payload encoding")
}
sb, err := base64.RawURLEncoding.DecodeString(parts[2])
if err != nil {
return nil, errors.New("invalid signature encoding")
}
var header jwtHeader
if err := json.Unmarshal(hb, &header); err != nil {
return nil, errors.New("invalid header json")
}
if header.Alg != "RS256" {
return nil, errors.New("unsupported alg")
}
// Verify signature
signingInput := parts[0] + "." + parts[1]
sum := sha256.Sum256([]byte(signingInput))
pub := g.signingKey.Public().(*rsa.PublicKey)
if err := rsa.VerifyPKCS1v15(pub, crypto.SHA256, sum[:], sb); err != nil {
return nil, errors.New("invalid signature")
}
// Parse claims
var claims jwtClaims
if err := json.Unmarshal(pb, &claims); err != nil {
return nil, errors.New("invalid claims json")
}
// Validate issuer
if claims.Iss != "debros-gateway" {
return nil, errors.New("invalid issuer")
}
// Validate registered claims
now := time.Now().Unix()
// allow small clock skew ±60s
const skew = int64(60)
if claims.Nbf != 0 && now+skew < claims.Nbf {
return nil, errors.New("token not yet valid")
}
if claims.Exp != 0 && now-skew > claims.Exp {
return nil, errors.New("token expired")
}
if claims.Iat != 0 && claims.Iat-skew > now {
return nil, errors.New("invalid iat")
}
if claims.Aud != "gateway" {
return nil, errors.New("invalid audience")
}
return &claims, nil
}
func (g *Gateway) generateJWT(ns, subject string, ttl time.Duration) (string, int64, error) {
if g.signingKey == nil {
return "", 0, errors.New("signing key unavailable")
}
header := map[string]string{
"alg": "RS256",
"typ": "JWT",
"kid": g.keyID,
}
hb, _ := json.Marshal(header)
now := time.Now().UTC()
exp := now.Add(ttl)
payload := map[string]any{
"iss": "debros-gateway",
"sub": subject,
"aud": "gateway",
"iat": now.Unix(),
"nbf": now.Unix(),
"exp": exp.Unix(),
"namespace": ns,
}
pb, _ := json.Marshal(payload)
hb64 := base64.RawURLEncoding.EncodeToString(hb)
pb64 := base64.RawURLEncoding.EncodeToString(pb)
signingInput := hb64 + "." + pb64
sum := sha256.Sum256([]byte(signingInput))
sig, err := rsa.SignPKCS1v15(rand.Reader, g.signingKey, crypto.SHA256, sum[:])
if err != nil {
return "", 0, err
}
sb64 := base64.RawURLEncoding.EncodeToString(sig)
return signingInput + "." + sb64, exp.Unix(), nil
}

340
pkg/gateway/middleware.go Normal file
View File

@ -0,0 +1,340 @@
package gateway
import (
"context"
"net"
"net/http"
"strconv"
"strings"
"time"
"git.debros.io/DeBros/network/pkg/logging"
"git.debros.io/DeBros/network/pkg/storage"
"go.uber.org/zap"
)
// context keys for request-scoped auth metadata (private to package)
type contextKey string
const (
ctxKeyAPIKey contextKey = "api_key"
ctxKeyJWT contextKey = "jwt_claims"
)
// withMiddleware adds CORS and logging middleware
func (g *Gateway) withMiddleware(next http.Handler) http.Handler {
// Order: logging (outermost) -> CORS -> auth -> handler
// Add authorization layer after auth to enforce namespace ownership
return g.loggingMiddleware(g.corsMiddleware(g.authMiddleware(g.authorizationMiddleware(next))))
}
// loggingMiddleware logs basic request info and duration
func (g *Gateway) loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
srw := &statusResponseWriter{ResponseWriter: w, status: http.StatusOK}
next.ServeHTTP(srw, r)
dur := time.Since(start)
g.logger.ComponentInfo(logging.ComponentGeneral, "request",
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.Int("status", srw.status),
zap.Int("bytes", srw.bytes),
zap.String("duration", dur.String()),
)
// Persist request log asynchronously (best-effort)
go g.persistRequestLog(r, srw, dur)
})
}
// authMiddleware enforces auth when enabled via config.
// Accepts:
// - Authorization: Bearer <JWT> (RS256 issued by this gateway)
// - Authorization: Bearer <API key> or ApiKey <API key>
// - X-API-Key: <API key>
func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// If auth not required, pass through.
if g.cfg == nil || !g.cfg.RequireAuth {
next.ServeHTTP(w, r)
return
}
// Allow preflight without auth
if r.Method == http.MethodOptions {
next.ServeHTTP(w, r)
return
}
// Allow public endpoints without auth
if isPublicPath(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
// 1) Try JWT Bearer first if Authorization looks like one
if auth := r.Header.Get("Authorization"); auth != "" {
lower := strings.ToLower(auth)
if strings.HasPrefix(lower, "bearer ") {
tok := strings.TrimSpace(auth[len("Bearer "):])
if strings.Count(tok, ".") == 2 {
if claims, err := g.parseAndVerifyJWT(tok); err == nil {
// Attach JWT claims and namespace to context
ctx := context.WithValue(r.Context(), ctxKeyJWT, claims)
if ns := strings.TrimSpace(claims.Namespace); ns != "" {
ctx = storage.WithNamespace(ctx, ns)
}
next.ServeHTTP(w, r.WithContext(ctx))
return
}
// If it looked like a JWT but failed verification, fall through to API key check
}
}
}
// 2) Fallback to API key
key := extractAPIKey(r)
if key == "" {
w.Header().Set("WWW-Authenticate", "Bearer realm=\"gateway\", charset=\"UTF-8\"")
writeError(w, http.StatusUnauthorized, "missing API key")
return
}
// Validate key
nsOverride, ok := g.cfg.APIKeys[key]
if !ok {
w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"")
writeError(w, http.StatusUnauthorized, "invalid API key")
return
}
// Attach auth metadata to context for downstream use
ctx := context.WithValue(r.Context(), ctxKeyAPIKey, key)
if ns := strings.TrimSpace(nsOverride); ns != "" {
ctx = storage.WithNamespace(ctx, ns)
}
next.ServeHTTP(w, r.WithContext(ctx))
})
}
// extractAPIKey extracts API key from Authorization or X-API-Key
func extractAPIKey(r *http.Request) string {
// Prefer Authorization header
auth := r.Header.Get("Authorization")
if auth != "" {
// Support "Bearer <token>" and "ApiKey <token>"
lower := strings.ToLower(auth)
if strings.HasPrefix(lower, "bearer ") {
return strings.TrimSpace(auth[len("Bearer "):])
}
if strings.HasPrefix(lower, "apikey ") {
return strings.TrimSpace(auth[len("ApiKey "):])
}
// If header has no scheme, treat the whole value as token (lenient for dev)
if !strings.Contains(auth, " ") {
return strings.TrimSpace(auth)
}
}
// Fallback header
if v := strings.TrimSpace(r.Header.Get("X-API-Key")); v != "" {
return v
}
return ""
}
// isPublicPath returns true for routes that should be accessible without API key auth
func isPublicPath(p string) bool {
switch p {
case "/health", "/v1/health", "/status", "/v1/status", "/v1/auth/jwks", "/v1/auth/challenge", "/v1/auth/verify", "/v1/auth/register", "/v1/auth/refresh", "/v1/auth/logout":
return true
default:
return false
}
}
// authorizationMiddleware enforces that the authenticated actor owns the namespace
// for certain protected paths (e.g., apps CRUD and storage APIs).
func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Skip if auth not required or for public/OPTIONS paths
if g.cfg == nil || !g.cfg.RequireAuth || r.Method == http.MethodOptions || isPublicPath(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
// Exempt whoami from ownership enforcement so users can inspect their session
if r.URL.Path == "/v1/auth/whoami" {
next.ServeHTTP(w, r)
return
}
// Only enforce for specific resource paths
if !requiresNamespaceOwnership(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
// Determine namespace from context
ctx := r.Context()
ns := ""
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok {
ns = strings.TrimSpace(s)
}
}
if ns == "" && g.cfg != nil {
ns = strings.TrimSpace(g.cfg.ClientNamespace)
}
if ns == "" {
writeError(w, http.StatusForbidden, "namespace not resolved")
return
}
// Identify actor from context
ownerType := ""
ownerID := ""
if v := ctx.Value(ctxKeyJWT); v != nil {
if claims, ok := v.(*jwtClaims); ok && claims != nil && strings.TrimSpace(claims.Sub) != "" {
ownerType = "wallet"
ownerID = strings.TrimSpace(claims.Sub)
}
}
if ownerType == "" && ownerID == "" {
if v := ctx.Value(ctxKeyAPIKey); v != nil {
if s, ok := v.(string); ok && strings.TrimSpace(s) != "" {
ownerType = "api_key"
ownerID = strings.TrimSpace(s)
}
}
}
if ownerType == "" || ownerID == "" {
writeError(w, http.StatusForbidden, "missing identity")
return
}
// Check ownership in DB
db := g.client.Database()
// Ensure namespace exists and get id
if _, err := db.Query(ctx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
nres, err := db.Query(ctx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns)
if err != nil || nres == nil || nres.Count == 0 || len(nres.Rows) == 0 || len(nres.Rows[0]) == 0 {
writeError(w, http.StatusForbidden, "namespace not found")
return
}
nsID := nres.Rows[0][0]
q := "SELECT 1 FROM namespace_ownership WHERE namespace_id = ? AND owner_type = ? AND owner_id = ? LIMIT 1"
res, err := db.Query(ctx, q, nsID, ownerType, ownerID)
if err != nil || res == nil || res.Count == 0 {
writeError(w, http.StatusForbidden, "forbidden: not an owner of namespace")
return
}
next.ServeHTTP(w, r)
})
}
// requiresNamespaceOwnership returns true if the path should be guarded by
// namespace ownership checks.
func requiresNamespaceOwnership(p string) bool {
if p == "/storage" || p == "/v1/storage" {
return true
}
if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") {
return true
}
return false
}
// corsMiddleware applies permissive CORS headers suitable for early development
func (g *Gateway) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-API-Key")
w.Header().Set("Access-Control-Max-Age", strconv.Itoa(600))
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
// persistRequestLog writes request metadata to the database (best-effort)
func (g *Gateway) persistRequestLog(r *http.Request, srw *statusResponseWriter, dur time.Duration) {
if g.client == nil {
return
}
// Use a short timeout to avoid blocking shutdowns
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
db := g.client.Database()
// Resolve API key ID if available
var apiKeyID interface{} = nil
if v := r.Context().Value(ctxKeyAPIKey); v != nil {
if key, ok := v.(string); ok && key != "" {
if res, err := db.Query(ctx, "SELECT id FROM api_keys WHERE key = ? LIMIT 1", key); err == nil {
if res != nil && res.Count > 0 && len(res.Rows) > 0 && len(res.Rows[0]) > 0 {
switch idv := res.Rows[0][0].(type) {
case int64:
apiKeyID = idv
case float64:
apiKeyID = int64(idv)
case int:
apiKeyID = int64(idv)
case string:
// best effort parse
if n, err := strconv.Atoi(idv); err == nil {
apiKeyID = int64(n)
}
}
}
}
}
}
ip := getClientIP(r)
// Insert the log row
_, _ = db.Query(ctx,
"INSERT INTO request_logs (method, path, status_code, bytes_out, duration_ms, ip, api_key_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
r.Method,
r.URL.Path,
srw.status,
srw.bytes,
dur.Milliseconds(),
ip,
apiKeyID,
)
// Update last_used_at for the API key if present
if apiKeyID != nil {
_, _ = db.Query(ctx, "UPDATE api_keys SET last_used_at = CURRENT_TIMESTAMP WHERE id = ?", apiKeyID)
}
}
// getClientIP extracts the client IP from headers or RemoteAddr
func getClientIP(r *http.Request) string {
// X-Forwarded-For may contain a list of IPs, take the first
if xff := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); xff != "" {
parts := strings.Split(xff, ",")
if len(parts) > 0 {
return strings.TrimSpace(parts[0])
}
}
if xr := strings.TrimSpace(r.Header.Get("X-Real-IP")); xr != "" {
return xr
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return r.RemoteAddr
}
return host
}

152
pkg/gateway/migrate.go Normal file
View File

@ -0,0 +1,152 @@
package gateway
import (
"context"
"errors"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
var errNoMigrationsFound = errors.New("no migrations found")
func (g *Gateway) applyAutoMigrations(ctx context.Context) error {
if g.client == nil {
return nil
}
db := g.client.Database()
stmts := []string{
// namespaces
"CREATE TABLE IF NOT EXISTS namespaces (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t name TEXT NOT NULL UNIQUE,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP\n)",
// api_keys
"CREATE TABLE IF NOT EXISTS api_keys (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t key TEXT NOT NULL UNIQUE,\n\t name TEXT,\n\t namespace_id INTEGER NOT NULL,\n\t scopes TEXT,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t last_used_at TIMESTAMP,\n\t FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE\n)",
"CREATE INDEX IF NOT EXISTS idx_api_keys_namespace ON api_keys(namespace_id)",
// request_logs
"CREATE TABLE IF NOT EXISTS request_logs (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t method TEXT NOT NULL,\n\t path TEXT NOT NULL,\n\t status_code INTEGER NOT NULL,\n\t bytes_out INTEGER NOT NULL DEFAULT 0,\n\t duration_ms INTEGER NOT NULL DEFAULT 0,\n\t ip TEXT,\n\t api_key_id INTEGER,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE SET NULL\n)",
"CREATE INDEX IF NOT EXISTS idx_request_logs_api_key ON request_logs(api_key_id)",
"CREATE INDEX IF NOT EXISTS idx_request_logs_created_at ON request_logs(created_at)",
// seed default namespace
"INSERT OR IGNORE INTO namespaces(name) VALUES ('default')",
}
for _, s := range stmts {
if _, err := db.Query(ctx, s); err != nil {
return err
}
}
return nil
}
func (g *Gateway) applyMigrations(ctx context.Context) error {
if g.client == nil {
return nil
}
db := g.client.Database()
// Ensure schema_migrations exists first
if _, err := db.Query(ctx, "CREATE TABLE IF NOT EXISTS schema_migrations (\n\tversion INTEGER PRIMARY KEY,\n\tapplied_at TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))\n)"); err != nil {
return err
}
// Locate migrations directory relative to CWD
migDir := "migrations"
if fi, err := os.Stat(migDir); err != nil || !fi.IsDir() {
return errNoMigrationsFound
}
entries, err := os.ReadDir(migDir)
if err != nil {
return err
}
type mig struct{ ver int; path string }
migrations := make([]mig, 0)
for _, e := range entries {
if e.IsDir() { continue }
name := e.Name()
if !strings.HasSuffix(strings.ToLower(name), ".sql") { continue }
if ver, ok := parseMigrationVersion(name); ok {
migrations = append(migrations, mig{ver: ver, path: filepath.Join(migDir, name)})
}
}
if len(migrations) == 0 {
return errNoMigrationsFound
}
sort.Slice(migrations, func(i, j int) bool { return migrations[i].ver < migrations[j].ver })
// Helper to check if version applied
isApplied := func(ctx context.Context, v int) (bool, error) {
res, err := db.Query(ctx, "SELECT 1 FROM schema_migrations WHERE version = ? LIMIT 1", v)
if err != nil { return false, err }
return res != nil && res.Count > 0, nil
}
for _, m := range migrations {
applied, err := isApplied(ctx, m.ver)
if err != nil { return err }
if applied {
continue
}
// Read and split SQL file into statements
content, err := os.ReadFile(m.path)
if err != nil { return err }
stmts := splitSQLStatements(string(content))
for _, s := range stmts {
if s == "" { continue }
if _, err := db.Query(ctx, s); err != nil {
return err
}
}
// Mark as applied
if _, err := db.Query(ctx, "INSERT OR IGNORE INTO schema_migrations(version) VALUES (?)", m.ver); err != nil {
return err
}
g.logger.ComponentInfo(logging.ComponentDatabase, "applied migration", zap.Int("version", m.ver), zap.String("file", m.path))
}
return nil
}
func parseMigrationVersion(name string) (int, bool) {
i := 0
for i < len(name) && name[i] >= '0' && name[i] <= '9' {
i++
}
if i == 0 { return 0, false }
v, err := strconv.Atoi(name[:i])
if err != nil { return 0, false }
return v, true
}
func splitSQLStatements(sqlText string) []string {
lines := strings.Split(sqlText, "\n")
cleaned := make([]string, 0, len(lines))
for _, ln := range lines {
s := strings.TrimSpace(ln)
if s == "" { continue }
if strings.HasPrefix(s, "--") { continue }
upper := strings.ToUpper(s)
if upper == "BEGIN;" || upper == "COMMIT;" || upper == "BEGIN" || upper == "COMMIT" {
continue
}
if strings.HasPrefix(upper, "INSERT") && strings.Contains(upper, "SCHEMA_MIGRATIONS") {
// ignore in-file migration markers
continue
}
cleaned = append(cleaned, s)
}
// Join and split by ';'
joined := strings.Join(cleaned, "\n")
parts := strings.Split(joined, ";")
out := make([]string, 0, len(parts))
for _, p := range parts {
sp := strings.TrimSpace(p)
if sp == "" { continue }
out = append(out, sp)
}
return out
}

34
pkg/gateway/routes.go Normal file
View File

@ -0,0 +1,34 @@
package gateway
import "net/http"
// Routes returns the http.Handler with all routes and middleware configured
func (g *Gateway) Routes() http.Handler {
mux := http.NewServeMux()
// root and v1 health/status
mux.HandleFunc("/health", g.healthHandler)
mux.HandleFunc("/status", g.statusHandler)
mux.HandleFunc("/v1/health", g.healthHandler)
mux.HandleFunc("/v1/status", g.statusHandler)
// auth endpoints
mux.HandleFunc("/v1/auth/jwks", g.jwksHandler)
mux.HandleFunc("/v1/auth/challenge", g.challengeHandler)
mux.HandleFunc("/v1/auth/verify", g.verifyHandler)
mux.HandleFunc("/v1/auth/register", g.registerHandler)
mux.HandleFunc("/v1/auth/refresh", g.refreshHandler)
mux.HandleFunc("/v1/auth/logout", g.logoutHandler)
mux.HandleFunc("/v1/auth/whoami", g.whoamiHandler)
// apps CRUD
mux.HandleFunc("/v1/apps", g.appsHandler)
mux.HandleFunc("/v1/apps/", g.appsHandler)
// storage and network
mux.HandleFunc("/v1/storage", g.storageHandler)
mux.HandleFunc("/v1/network/status", g.networkStatusHandler)
mux.HandleFunc("/v1/network/peers", g.networkPeersHandler)
return g.withMiddleware(mux)
}

View File

@ -0,0 +1,69 @@
package gateway
import (
"encoding/json"
"net/http"
"time"
"git.debros.io/DeBros/network/pkg/client"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
// healthResponse is the JSON structure used by healthHandler
type healthResponse struct {
Status string `json:"status"`
StartedAt time.Time `json:"started_at"`
Uptime string `json:"uptime"`
}
func (g *Gateway) healthHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
server := healthResponse{
Status: "ok",
StartedAt: g.startedAt,
Uptime: time.Since(g.startedAt).String(),
}
var clientHealth *client.HealthStatus
if g.client != nil {
if h, err := g.client.Health(); err == nil {
clientHealth = h
} else {
g.logger.ComponentWarn(logging.ComponentClient, "failed to fetch client health", zap.Error(err))
}
}
resp := struct {
Status string `json:"status"`
Server healthResponse `json:"server"`
Client *client.HealthStatus `json:"client"`
}{
Status: "ok",
Server: server,
Client: clientHealth,
}
_ = json.NewEncoder(w).Encode(resp)
}
// statusHandler aggregates server uptime and network status
func (g *Gateway) statusHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
status, err := g.client.Network().GetStatus(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"server": map[string]any{
"started_at": g.startedAt,
"uptime": time.Since(g.startedAt).String(),
},
"network": status,
})
}

View File

@ -0,0 +1,87 @@
package gateway
import (
"io"
"net/http"
)
func (g *Gateway) storageHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key' query parameter")
return
}
ctx := r.Context()
switch r.Method {
case http.MethodGet:
val, err := g.client.Storage().Get(ctx, key)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(val)
return
case http.MethodPut:
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, http.StatusBadRequest, "failed to read body")
return
}
if err := g.client.Storage().Put(ctx, key, b); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{
"status": "ok",
"key": key,
"size": len(b),
})
return
case http.MethodOptions:
w.WriteHeader(http.StatusNoContent)
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
status, err := g.client.Network().GetStatus(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, status)
}
func (g *Gateway) networkPeersHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
peers, err := g.client.Network().GetPeers(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, peers)
}

View File

@ -19,6 +19,17 @@ type Client struct {
namespace string namespace string
} }
// Context utilities for namespace override
type ctxKey string
// CtxKeyNamespaceOverride is the context key used to override namespace per request
const CtxKeyNamespaceOverride ctxKey = "storage_ns_override"
// WithNamespace returns a new context that carries a storage namespace override
func WithNamespace(ctx context.Context, ns string) context.Context {
return context.WithValue(ctx, CtxKeyNamespaceOverride, ns)
}
// NewClient creates a new storage client // NewClient creates a new storage client
func NewClient(h host.Host, namespace string, logger *zap.Logger) *Client { func NewClient(h host.Host, namespace string, logger *zap.Logger) *Client {
return &Client{ return &Client{
@ -30,11 +41,17 @@ func NewClient(h host.Host, namespace string, logger *zap.Logger) *Client {
// Put stores a key-value pair in the distributed storage // Put stores a key-value pair in the distributed storage
func (c *Client) Put(ctx context.Context, key string, value []byte) error { func (c *Client) Put(ctx context.Context, key string, value []byte) error {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{ request := &StorageRequest{
Type: MessageTypePut, Type: MessageTypePut,
Key: key, Key: key,
Value: value, Value: value,
Namespace: c.namespace, Namespace: ns,
} }
return c.sendRequest(ctx, request) return c.sendRequest(ctx, request)
@ -42,10 +59,16 @@ func (c *Client) Put(ctx context.Context, key string, value []byte) error {
// Get retrieves a value by key from the distributed storage // Get retrieves a value by key from the distributed storage
func (c *Client) Get(ctx context.Context, key string) ([]byte, error) { func (c *Client) Get(ctx context.Context, key string) ([]byte, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{ request := &StorageRequest{
Type: MessageTypeGet, Type: MessageTypeGet,
Key: key, Key: key,
Namespace: c.namespace, Namespace: ns,
} }
response, err := c.sendRequestWithResponse(ctx, request) response, err := c.sendRequestWithResponse(ctx, request)
@ -62,10 +85,16 @@ func (c *Client) Get(ctx context.Context, key string) ([]byte, error) {
// Delete removes a key from the distributed storage // Delete removes a key from the distributed storage
func (c *Client) Delete(ctx context.Context, key string) error { func (c *Client) Delete(ctx context.Context, key string) error {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{ request := &StorageRequest{
Type: MessageTypeDelete, Type: MessageTypeDelete,
Key: key, Key: key,
Namespace: c.namespace, Namespace: ns,
} }
return c.sendRequest(ctx, request) return c.sendRequest(ctx, request)
@ -73,11 +102,17 @@ func (c *Client) Delete(ctx context.Context, key string) error {
// List returns keys with a given prefix // List returns keys with a given prefix
func (c *Client) List(ctx context.Context, prefix string, limit int) ([]string, error) { func (c *Client) List(ctx context.Context, prefix string, limit int) ([]string, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{ request := &StorageRequest{
Type: MessageTypeList, Type: MessageTypeList,
Prefix: prefix, Prefix: prefix,
Limit: limit, Limit: limit,
Namespace: c.namespace, Namespace: ns,
} }
response, err := c.sendRequestWithResponse(ctx, request) response, err := c.sendRequestWithResponse(ctx, request)
@ -94,10 +129,16 @@ func (c *Client) List(ctx context.Context, prefix string, limit int) ([]string,
// Exists checks if a key exists in the distributed storage // Exists checks if a key exists in the distributed storage
func (c *Client) Exists(ctx context.Context, key string) (bool, error) { func (c *Client) Exists(ctx context.Context, key string) (bool, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{ request := &StorageRequest{
Type: MessageTypeExists, Type: MessageTypeExists,
Key: key, Key: key,
Namespace: c.namespace, Namespace: ns,
} }
response, err := c.sendRequestWithResponse(ctx, request) response, err := c.sendRequestWithResponse(ctx, request)