Merge pull request 'feat: implement HTTP gateway with auth, storage, and namespace isolation' (#5) from gateway into main

Reviewed-on: #5
This commit is contained in:
anonpenguin 2025-08-20 09:56:09 +00:00
commit 8e5b91dee0
37 changed files with 4960 additions and 25 deletions

View File

@ -9,6 +9,7 @@
- [Configuration System](#configuration-system)
- [Node vs Client Roles](#node-vs-client-roles)
- [Network Protocol & Data Flow](#network-protocol--data-flow)
- [Gateway Service](#gateway-service)
- [Build & Development](#build--development)
- [API Reference](#api-reference)
- [Troubleshooting](#troubleshooting)
@ -109,6 +110,7 @@ network/
### 4. **Database Layer (`pkg/database/`)**
- **RQLite:** Distributed SQLite with Raft consensus, automatic leader election, and failover.
- **Client API:** SQL queries, transactions, schema management.
- **Migration System:** Robust database migration handling with automatic schema versioning, SQL statement processing, and error recovery. Supports complex SQL files with comments and multiple statements.
### 5. **Storage System (`pkg/storage/`)**
- **Distributed KV:** Namespace-isolated, CRUD operations, prefix queries, replication.
@ -179,6 +181,108 @@ network/
---
## Gateway Service
The Gateway provides an HTTP(S)/WebSocket surface over the network client with strict namespace enforcement.
- **Run:**
```bash
make run-gateway
# Env overrides: GATEWAY_ADDR, GATEWAY_NAMESPACE, GATEWAY_BOOTSTRAP_PEERS,
# GATEWAY_REQUIRE_AUTH, GATEWAY_API_KEYS
```
- **Enhanced Authentication System:** The gateway features an improved authentication system with automatic wallet detection and multi-wallet support:
- **Automatic Authentication:** No manual auth command required - authentication happens automatically when needed
- **Multi-Wallet Support:** Users can manage multiple wallet credentials seamlessly
- **JWT Authentication:** Issued by this gateway; JWKS available at `GET /v1/auth/jwks` or `/.well-known/jwks.json`
- **API Key Support:** Via `Authorization: Bearer <key>` or `X-API-Key`, optionally mapped to a namespace
- **Wallet Verification:** Uses Ethereum EIP-191 `personal_sign` with enhanced flow:
- `POST /v1/auth/challenge` returns `{nonce}` (public endpoint with internal auth context)
- `POST /v1/auth/verify` expects `{wallet, nonce, signature}` with 65-byte r||s||v hex (0x allowed)
- `v` normalized (27/28 or 0/1), address match is case-insensitive
- Nonce is marked used only after successful verification
- Supports automatic wallet switching and credential persistence
- **Namespace Enforcement:** Storage and PubSub are internally prefixed `ns::<namespace>::...`. Ownership of namespace is enforced by middleware for routes under `/v1/storage*`, `/v1/apps*`, and `/v1/pubsub*`.
### Endpoints
- Health/Version
- `GET /health`, `GET /v1/health`
- `GET /v1/status`
- `GET /v1/version``{version, commit, build_time, started_at, uptime}`
- JWKS
- `GET /v1/auth/jwks`
- `GET /.well-known/jwks.json`
- Auth (Enhanced Multi-Wallet System)
- `POST /v1/auth/challenge` (public endpoint, generates wallet challenge)
- `POST /v1/auth/verify` (public endpoint, verifies wallet signature)
- `POST /v1/auth/register` (public endpoint, wallet registration)
- `POST /v1/auth/refresh` (public endpoint, token refresh)
- `POST /v1/auth/logout` (clears authentication state)
- `GET /v1/auth/whoami` (returns current auth status)
- `POST /v1/auth/api-key` (generates API keys for authenticated users)
- Storage
- `POST /v1/storage/get`, `POST /v1/storage/put`, `POST /v1/storage/delete`
- `GET /v1/storage/list?prefix=...`, `GET /v1/storage/exists?key=...`
- Network
- `GET /v1/network/status`, `GET /v1/network/peers`
- `POST /v1/network/connect`, `POST /v1/network/disconnect`
### PubSub
- WebSocket
- `GET /v1/pubsub/ws?topic=<topic>`
- Server sends messages as binary frames; 30s ping keepalive.
- Client text/binary frames are published to the same namespaced topic.
- REST
- `POST /v1/pubsub/publish` → body `{topic, data_base64}``{status:"ok"}`
- `GET /v1/pubsub/topics``{topics:["<topic>", ...]}` (names trimmed to caller namespace)
### Authentication Improvements
The gateway authentication system has been significantly enhanced with the following features:
- **Database Migration System:** Robust SQL migration handling with proper statement processing and error handling
- **Automatic Wallet Detection:** CLI automatically detects and manages wallet credentials without manual auth commands
- **Multi-Wallet Management:** Support for multiple wallet credentials with automatic switching
- **Enhanced User Experience:** Streamlined authentication flow with credential persistence
- **Internal Auth Context:** Public authentication endpoints use internal auth context to prevent circular dependencies
- **Improved Error Handling:** Better error messages and debugging information for authentication issues
Security note: CORS and WS origin checks are permissive for development; harden for production. The enhanced authentication system maintains security while improving accessibility and user experience.
### Database Migration System
The gateway includes an enhanced database migration system with the following features:
- **Automatic Schema Management:** Database schema is automatically initialized and updated using migration files
- **Robust SQL Processing:** Handles complex SQL files with comments, multiple statements, and proper statement termination
- **Version Control:** Tracks migration versions to prevent duplicate execution and ensure proper upgrade paths
- **Error Recovery:** Comprehensive error handling with detailed logging for debugging migration issues
- **Transaction Safety:** Each migration runs in a transaction to ensure atomicity and data integrity
- **SQL File Support:** Supports standard SQL migration files with `.sql` extension in the `migrations/` directory
**Migration File Structure:**
```
migrations/
├── 001_initial_schema.sql # Initial database setup
├── 002_add_auth_tables.sql # Authentication tables
├── 003_add_indexes.sql # Performance indexes
└── ... # Additional migrations
```
The migration system automatically processes SQL statements, handles comments, and ensures proper execution order during gateway startup.
---
## Build & Development
### **Prerequisites**
@ -192,6 +296,7 @@ network/
make build # Build all executables
make test # Run tests
make run-node # Start node (auto-detects bootstrap vs regular)
make run-gateway # Start HTTP gateway (env overrides supported)
```
### **Development Workflow**
@ -248,10 +353,24 @@ peers, err := client.Network().GetPeers(ctx)
- **Message Delivery Failures:** Verify topic names, subscription status, and network connectivity.
- **High Memory Usage:** Unsubscribe from topics when done, monitor connection pool size.
### **Authentication Issues**
- **Wallet Connection Failed:** Check wallet signature format (65-byte r||s||v hex), ensure nonce matches exactly, verify wallet address case-insensitivity.
- **JWT Token Expired:** Use refresh endpoint or re-authenticate with wallet.
- **API Key Invalid:** Verify key format and namespace mapping in gateway configuration.
- **Multi-Wallet Conflicts:** Clear credential cache and re-authenticate: `rm -rf ~/.debros/credentials`
- **Circular Auth Dependencies:** Ensure public auth endpoints use internal auth context.
### **Database Migration Issues**
- **Migration Failures:** Check SQL syntax, ensure proper statement termination, review migration logs.
- **Version Conflicts:** Verify migration file naming and sequential order.
- **Incomplete Migrations:** Check for transaction rollbacks and database locks.
### **Debugging**
- Enable debug logging: `export LOG_LEVEL=debug`
- Check service logs: `sudo journalctl -u debros-node.service -f`
- Use CLI for health and peer checks: `./bin/network-cli health`, `./bin/network-cli peers`
- Check gateway logs: `sudo journalctl -u debros-gateway.service -f`
- Test authentication flow: `./bin/network-cli storage put test-key test-value`
---

View File

@ -3,7 +3,7 @@
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.19.0-beta
VERSION := 0.34.0-beta
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
@ -14,6 +14,8 @@ build: deps
@mkdir -p bin
go build -ldflags "$(LDFLAGS)" -o bin/node ./cmd/node
go build -ldflags "$(LDFLAGS)" -o bin/network-cli cmd/cli/main.go
# Inject gateway build metadata via pkg path variables
go build -ldflags "$(LDFLAGS) -X 'git.debros.io/DeBros/network/pkg/gateway.BuildVersion=$(VERSION)' -X 'git.debros.io/DeBros/network/pkg/gateway.BuildCommit=$(COMMIT)' -X 'git.debros.io/DeBros/network/pkg/gateway.BuildTime=$(DATE)'" -o bin/gateway ./cmd/gateway
@echo "Build complete! Run ./bin/network-cli version"
# Clean build artifacts
@ -45,6 +47,22 @@ run-node3:
@echo "Starting regular node3 with config..."
go run ./cmd/node --config configs/node.yaml
# Run gateway HTTP server
# Usage examples:
# make run-gateway # uses defaults (:8080, namespace=default)
# GATEWAY_ADDR=":8081" make run-gateway # override listen addr via env
# GATEWAY_NAMESPACE=myapp make run-gateway # set namespace
# GATEWAY_BOOTSTRAP_PEERS="/ip4/127.0.0.1/tcp/4001/p2p/<ID>" make run-gateway
# GATEWAY_REQUIRE_AUTH=1 GATEWAY_API_KEYS="key1:ns1,key2:ns2" make run-gateway
run-gateway:
@echo "Starting gateway HTTP server..."
GATEWAY_ADDR=$(or $(ADDR),$(GATEWAY_ADDR)) \
GATEWAY_NAMESPACE=$(or $(NAMESPACE),$(GATEWAY_NAMESPACE)) \
GATEWAY_BOOTSTRAP_PEERS=$(GATEWAY_BOOTSTRAP_PEERS) \
GATEWAY_REQUIRE_AUTH=$(GATEWAY_REQUIRE_AUTH) \
GATEWAY_API_KEYS=$(GATEWAY_API_KEYS) \
go run ./cmd/gateway
# Run basic usage example
run-example:
@echo "Running basic usage example..."
@ -170,6 +188,7 @@ help:
@echo " run-node - Start bootstrap node"
@echo " run-node2 - Start second node (requires JOINADDR, optional HTTP/RAFT/P2P)"
@echo " run-node3 - Start third node (requires JOINADDR, optional HTTP/RAFT/P2P)"
@echo " run-gateway - Start HTTP gateway (flags via env: GATEWAY_ADDR, GATEWAY_NAMESPACE, GATEWAY_BOOTSTRAP_PEERS, GATEWAY_REQUIRE_AUTH, GATEWAY_API_KEYS)"
@echo " run-example - Run usage example"
@echo " run-cli - Run network CLI help"
@echo " show-bootstrap - Show example bootstrap usage with flags"

226
README.md
View File

@ -13,6 +13,7 @@ A robust, decentralized peer-to-peer network built in Go, providing distributed
- [Deployment & Installation](#deployment--installation)
- [Configuration](#configuration)
- [CLI Usage](#cli-usage)
- [HTTP Gateway](#http-gateway)
- [Development](#development)
- [Troubleshooting](#troubleshooting)
- [License](#license)
@ -316,9 +317,191 @@ logging:
--disable-anonrc # Disable anonymous routing (Tor/SOCKS5)
```
### Authentication
The CLI features an enhanced authentication system with automatic wallet detection and multi-wallet support:
- **Automatic Authentication:** No manual auth commands required - authentication happens automatically when operations need credentials
- **Multi-Wallet Management:** Seamlessly switch between multiple wallet credentials
- **Persistent Sessions:** Wallet credentials are automatically saved and restored between sessions
- **Enhanced User Experience:** Streamlined authentication flow with better error handling and user feedback
When using operations that require authentication (storage, database, pubsub), the CLI will automatically:
1. Check for existing valid credentials
2. Prompt for wallet authentication if needed
3. Handle signature verification
4. Persist credentials for future use
**Example with automatic authentication:**
```bash
# First time - will prompt for wallet authentication
./bin/network-cli storage put user:123 "John Doe"
# Subsequent calls - uses saved credentials automatically
./bin/network-cli storage get user:123
./bin/network-cli pubsub publish notifications "Hello World"
```
---
## HTTP Gateway
The DeBros Network includes a powerful HTTP/WebSocket gateway that provides a modern REST API and WebSocket interface over the P2P network, featuring an enhanced authentication system with multi-wallet support.
### Quick Start
```bash
make run-gateway
# Or manually:
go run ./cmd/gateway
```
### Configuration
The gateway can be configured via environment variables:
```bash
# Basic Configuration
export GATEWAY_ADDR="0.0.0.0:8080"
export GATEWAY_NAMESPACE="my-app"
export GATEWAY_BOOTSTRAP_PEERS="/ip4/127.0.0.1/tcp/4001/p2p/YOUR_PEER_ID"
# Authentication Configuration
export GATEWAY_REQUIRE_AUTH=true
export GATEWAY_API_KEYS="key1:namespace1,key2:namespace2"
```
### Enhanced Authentication System
The gateway features a significantly improved authentication system with the following capabilities:
#### Key Features
- **Automatic Authentication:** No manual auth commands required - authentication happens automatically when needed
- **Multi-Wallet Support:** Seamlessly manage multiple wallet credentials with automatic switching
- **Persistent Sessions:** Wallet credentials are automatically saved and restored
- **Enhanced User Experience:** Streamlined authentication flow with better error handling
#### Authentication Methods
**Wallet-Based Authentication (Ethereum EIP-191)**
- Uses `personal_sign` for secure wallet verification
- Supports multiple wallets with automatic detection
- Addresses are case-insensitive with normalized signature handling
**JWT Tokens**
- Issued by the gateway with configurable expiration
- JWKS endpoints available at `/v1/auth/jwks` and `/.well-known/jwks.json`
- Automatic refresh capability
**API Keys**
- Support for pre-configured API keys via `Authorization: Bearer <key>` or `X-API-Key` headers
- Optional namespace mapping for multi-tenant applications
### API Endpoints
#### Health & Status
```http
GET /health # Basic health check
GET /v1/health # Detailed health status
GET /v1/status # Network status
GET /v1/version # Version information
```
#### Authentication (Public Endpoints)
```http
POST /v1/auth/challenge # Generate wallet challenge
POST /v1/auth/verify # Verify wallet signature
POST /v1/auth/register # Register new wallet
POST /v1/auth/refresh # Refresh JWT token
POST /v1/auth/logout # Clear authentication
GET /v1/auth/whoami # Current auth status
POST /v1/auth/api-key # Generate API key (authenticated)
```
#### Storage Operations
```http
POST /v1/storage/get # Retrieve data
POST /v1/storage/put # Store data
POST /v1/storage/delete # Delete data
GET /v1/storage/list # List keys with optional prefix
GET /v1/storage/exists # Check key existence
```
#### Network Operations
```http
GET /v1/network/status # Network status
GET /v1/network/peers # Connected peers
POST /v1/network/connect # Connect to peer
POST /v1/network/disconnect # Disconnect from peer
```
#### Pub/Sub Messaging
**WebSocket Interface**
```http
GET /v1/pubsub/ws?topic=<topic> # WebSocket connection for real-time messaging
```
**REST Interface**
```http
POST /v1/pubsub/publish # Publish message to topic
GET /v1/pubsub/topics # List active topics
```
### Security Features
- **Namespace Enforcement:** All operations are automatically prefixed with namespace for isolation
- **CORS Support:** Configurable CORS policies (permissive for development, configurable for production)
- **Transport Security:** All network communications use Noise/TLS encryption
- **Authentication Middleware:** Flexible authentication with support for multiple credential types
### Usage Examples
#### Wallet Authentication Flow
```bash
# 1. Get challenge (automatic)
curl -X POST http://localhost:8080/v1/auth/challenge
# 2. Sign challenge with wallet (handled by client)
# 3. Verify signature (automatic)
curl -X POST http://localhost:8080/v1/auth/verify \
-H "Content-Type: application/json" \
-d '{"wallet":"0x...","nonce":"...","signature":"0x..."}'
```
#### Storage Operations
```bash
# Store data
curl -X POST http://localhost:8080/v1/storage/put \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{"key":"user:123","value":"eyJuYW1lIjoiSm9obiJ9"}'
# Retrieve data
curl -X POST http://localhost:8080/v1/storage/get \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{"key":"user:123"}'
```
#### Real-time Messaging
```javascript
// WebSocket connection
const ws = new WebSocket('ws://localhost:8080/v1/pubsub/ws?topic=chat');
ws.onmessage = (event) => {
console.log('Received:', event.data);
};
// Send message
ws.send('Hello, network!');
```
---
## Development
</text>
### Project Structure
@ -380,6 +563,34 @@ scripts/test-multinode.sh
- **Symptoms:** Memory usage grows continuously
- **Solutions:** Unsubscribe when done, monitor connection pool, review message retention.
#### Authentication Issues
- **Symptoms:** `Authentication failed`, `Invalid wallet signature`, `JWT token expired`
- **Solutions:**
- Check wallet signature format (65-byte r||s||v hex)
- Ensure nonce matches exactly during wallet verification
- Verify wallet address case-insensitivity
- Use refresh endpoint or re-authenticate for expired tokens
- Clear credential cache if multi-wallet conflicts occur: `rm -rf ~/.debros/credentials`
#### Gateway Issues
- **Symptoms:** `Gateway connection refused`, `CORS errors`, `WebSocket disconnections`
- **Solutions:**
- Verify gateway is running and accessible on configured port
- Check CORS configuration for web applications
- Ensure proper authentication headers for protected endpoints
- Verify namespace configuration and enforcement
#### Database Migration Issues
- **Symptoms:** `Migration failed`, `SQL syntax error`, `Version conflict`
- **Solutions:**
- Check SQL syntax in migration files
- Ensure proper statement termination
- Verify migration file naming and sequential order
- Review migration logs for transaction rollbacks
### Debugging & Health Checks
```bash
@ -389,12 +600,27 @@ export LOG_LEVEL=debug
./bin/network-cli query "SELECT 1"
./bin/network-cli pubsub publish test "hello"
./bin/network-cli pubsub subscribe test 10s
# Test authentication flow
./bin/network-cli storage put test-key test-value
# Gateway health checks
curl http://localhost:8080/health
curl http://localhost:8080/v1/status
```
### Service Logs
```bash
# Node service logs
sudo journalctl -u debros-node.service --since "1 hour ago"
# Gateway service logs (if running as service)
sudo journalctl -u debros-gateway.service --since "1 hour ago"
# Application logs
tail -f ./logs/gateway.log
tail -f ./logs/node.log
```
---

135
TASK.md Normal file
View File

@ -0,0 +1,135 @@
# Task: Enforce API Key/JWT and Namespace in Go Client (Auto-Resolve Namespace) and Guard All Operations
Owner: To be assigned
Status: Ready to implement
## Objective
Implement strict client-side access enforcement in the Go client (`pkg/client`) so that:
- An API key or JWT is required by default to use the client.
- The client auto-resolves the namespace from the provided API key or JWT without requiring callers to pass the namespace per call.
- Per-call namespace overrides via context are still allowed for compatibility, but must match the resolved namespace; otherwise, deny the call.
- All operations (Storage, PubSub, Database/RQLite, and NetworkInfo) are guarded and return access errors when unauthenticated or namespace-mismatched.
- No backward compatibility guarantees required.
Note: This is client-side enforcement for now. Protocol-level auth/ACL for libp2p can be added later.
## High-level behavior
- `ClientConfig.RequireAPIKey` defaults to true. If true and neither `APIKey` nor `JWT` is present, `Connect()` fails.
- Namespace is automatically derived:
- From JWT: parse claims and read `Namespace` claim (no network roundtrip). Verification of signature is not required for this task; parsing is enough to derive namespace. Optionally, add a TODO hook for future verification against JWKS if provided.
- From API key: the namespace must be embedded in the key using a documented format (below). The client parses it locally and derives the namespace without any remote calls.
- All calls check that any provided per-call namespace override matches the derived namespace, else return an “access denied: namespace mismatch” error.
- All modules are guarded: Database (RQLite), Storage, PubSub, and NetworkInfo.
## API key and JWT formats
- JWT: RS256 token with claim `Namespace` (string). We will parse claims (unverified) to obtain `Namespace`.
- API key: change to an encoded format that includes the namespace so the client can parse locally. Options (pick one and implement consistently):
- Option A (dotted): `ak_<random>.<namespace>`
- Option B (colon): `ak_<random>:<namespace>`
- Option C (base64 JSON): base64url of `{ "kid": "...", "ns": "<namespace>" }` prefixed by `ak_`
For simplicity and readability, choose Option B: `ak_<random>:<namespace>`.
- Parsing rules:
- If `APIKey` contains a single colon, split and use the right side as `namespace` (trim spaces). If empty -> error.
- If more than one colon or invalid format -> error.
## Changes to implement
### 1) Client configuration and types
- File: `pkg/client/interface.go`
- Extend `ClientConfig`:
- `Namespace string` // optional; if empty, auto-derived from API key or JWT; if still empty, fallback to `AppName`.
- `RequireAPIKey bool` // default true; when true, require either `APIKey` or `JWT`.
- `JWT string` // optional bearer token; used for namespace derivation and future protocol auth.
- Update `DefaultClientConfig(appName string)` to set:
- `RequireAPIKey: true`
- `Namespace: ""` (meaning auto)
### 2) Namespace resolution and access gating
- File: `pkg/client/client.go`
- At construction or `Connect()` time:
- Implement `deriveNamespace()`:
- If `config.Namespace != ""`, use it.
- Else if `config.JWT != ""`, parse JWT claims (unverified) and read `Namespace` claim.
- Else if `config.APIKey != ""`, parse `ak_<random>:<namespace>` and extract namespace.
- Else use `config.AppName`.
- Store the resolved namespace back into `config.Namespace`.
- Enforce presence of credentials:
- If `config.RequireAPIKey` is true AND both `config.APIKey` and `config.JWT` are empty -> return error `access denied: API key or JWT required`.
- Add `func (c *Client) requireAccess(ctx context.Context) error` that:
- If `RequireAPIKey` and both `APIKey` and `JWT` are empty -> error `access denied: credentials required`.
- Resolve per-call namespace override from context (via storage/pubsub helpers below). If present and `override != c.config.Namespace` -> error `access denied: namespace mismatch`.
### 3) Guard all operations
- File: `pkg/client/implementations.go`
- At the start of each public method, call `client.requireAccess(ctx)` and return the error if any.
- DatabaseClientImpl: `Query`, `Transaction`, `CreateTable`, `DropTable`, `GetSchema`.
- StorageClientImpl: `Get`, `Put`, `Delete`, `List`, `Exists`.
- NetworkInfoImpl: `GetPeers`, `GetStatus`, `ConnectToPeer`, `DisconnectFromPeer`.
- For Storage operations, ensure we propagate the effective namespace:
- If override present and equals `config.Namespace`, pass that context through; else use `storage.WithNamespace(ctx, config.Namespace)`.
### 4) PubSub context-based namespace override (parity with Storage)
- Files: `pkg/pubsub/*`
- Add:
- `type ctxKey string`
- `const CtxKeyNamespaceOverride ctxKey = "pubsub_ns_override"`
- `func WithNamespace(ctx context.Context, ns string) context.Context`
- Update topic naming in `manager.go` and `subscriptions.go`/`publish.go`:
- Before computing `namespacedTopic`, check for ctx override; if present and non-empty, use it; else fall back to `m.namespace`.
### 5) Client context helper
- New file: `pkg/client/context.go`
- Add `func WithNamespace(ctx context.Context, ns string) context.Context` that applies both storage and pubsub overrides by chaining:
- `ctx = storage.WithNamespace(ctx, ns)`
- `ctx = pubsub.WithNamespace(ctx, ns)`
- return `ctx`
### 6) Documentation updates
- Files: `README.md`, `AI_CONTEXT.md`
- Document the new client auth behavior:
- An API key or JWT is required by default (`RequireAPIKey=true`).
- Namespace auto-derived from token:
- JWT claim `Namespace`.
- API key format `ak_<random>:<namespace>`.
- Per-call override via `client.WithNamespace(ctx, ns)` allowed but must match derived namespace.
- All modules (Storage, PubSub, Database, NetworkInfo) are guarded.
- Provide usage examples for constructing `ClientConfig` with API key or JWT and making calls.
## Helper details
- JWT parsing: implement a minimal helper to split the token and base64url-decode the payload; read `Namespace` field from JSON. Do not verify signature for this task. If parsing fails, return a clear error.
- API key parsing: simple split on `:`; trim spaces; validate non-empty.
## Error messages (standardize)
- Missing credentials: `access denied: API key or JWT required`
- Namespace mismatch: `access denied: namespace mismatch`
- Client not connected: keep existing `client not connected` error.
## Acceptance criteria
- Without credentials and `RequireAPIKey=true`, `Connect()` returns error and no operations are allowed.
- With API key `ak_abc123:myapp`, the client auto-resolves namespace `myapp`; operations succeed.
- With JWT containing `{ "Namespace": "myapp" }`, the client auto-resolves `myapp`; operations succeed.
- If a caller sets `client.WithNamespace(ctx, "otherNS")` while resolved namespace is `myapp`, any operation returns `access denied: namespace mismatch`.
- PubSub topic names use the override when present (and allowed) else the resolved namespace.
- NetworkInfo methods are also guarded and require credentials.
## Out of scope (for this task)
- Protocol-level auth or verification of JWT signatures against JWKS.
- ETH payments/subscriptions and tier enforcement. (Separate design/implementation.)
## Files to modify/add
- Modify:
- `pkg/client/interface.go`
- `pkg/client/client.go`
- `pkg/client/implementations.go`
- `pkg/pubsub/manager.go`
- `pkg/pubsub/subscriptions.go`
- `pkg/pubsub/publish.go` (if exists; add override resolution there too)
- `README.md`, `AI_CONTEXT.md`
- Add:
- `pkg/pubsub/context.go` (if not present)
- `pkg/client/context.go`
## Notes
- Keep logs concise and avoid leaking tokens in logs. You may log the resolved namespace at `INFO` level on connect.
- Ensure thread-safety when accessing `Client.config` fields (use existing locks if needed).

View File

@ -5,12 +5,15 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"strconv"
"strings"
"time"
"git.debros.io/DeBros/network/pkg/anyoneproxy"
"git.debros.io/DeBros/network/pkg/auth"
"git.debros.io/DeBros/network/pkg/client"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
@ -83,6 +86,7 @@ func main() {
handlePeerID()
case "help", "--help", "-h":
showHelp()
default:
fmt.Fprintf(os.Stderr, "Unknown command: %s\n", command)
showHelp()
@ -185,6 +189,9 @@ func handleStatus() {
}
func handleQuery(sql string) {
// Ensure user is authenticated
_ = ensureAuthenticated()
client, err := createClient()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create client: %v\n", err)
@ -214,6 +221,9 @@ func handleStorage(args []string) {
os.Exit(1)
}
// Ensure user is authenticated
_ = ensureAuthenticated()
client, err := createClient()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create client: %v\n", err)
@ -283,6 +293,9 @@ func handlePubSub(args []string) {
os.Exit(1)
}
// Ensure user is authenticated
_ = ensureAuthenticated()
client, err := createClient()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create client: %v\n", err)
@ -358,6 +371,44 @@ func handlePubSub(args []string) {
}
}
// ensureAuthenticated ensures the user has valid credentials for the gateway
// Returns the credentials or exits the program on failure
func ensureAuthenticated() *auth.Credentials {
gatewayURL := auth.GetDefaultGatewayURL()
credentials, err := auth.GetOrPromptForCredentials(gatewayURL)
if err != nil {
fmt.Fprintf(os.Stderr, "Authentication failed: %v\n", err)
os.Exit(1)
}
return credentials
}
func openBrowser(target string) error {
cmds := [][]string{
{"xdg-open", target},
{"open", target},
{"cmd", "/c", "start", target},
}
for _, c := range cmds {
cmd := exec.Command(c[0], c[1:]...)
if err := cmd.Start(); err == nil {
return nil
}
}
log.Printf("Please open %s manually", target)
return nil
}
// getenvDefault returns env var or default if empty/undefined.
func getenvDefault(key, def string) string {
if v := strings.TrimSpace(os.Getenv(key)); v != "" {
return v
}
return def
}
func handleConnect(peerAddr string) {
client, err := createClient()
if err != nil {
@ -447,6 +498,39 @@ func handlePeerID() {
func createClient() (client.NetworkClient, error) {
config := client.DefaultClientConfig("network-cli")
// Check for existing credentials
creds, err := auth.GetValidCredentials()
if err != nil {
// No valid credentials found, trigger authentication flow
fmt.Printf("🔐 Authentication required for DeBros Network CLI\n")
fmt.Printf("💡 This will open your browser to authenticate with your wallet\n")
gatewayURL := auth.GetDefaultGatewayURL()
fmt.Printf("🌐 Gateway: %s\n\n", gatewayURL)
// Perform wallet authentication
newCreds, authErr := auth.PerformWalletAuthentication(gatewayURL)
if authErr != nil {
return nil, fmt.Errorf("authentication failed: %w", authErr)
}
// Save credentials
if saveErr := auth.SaveCredentialsForDefaultGateway(newCreds); saveErr != nil {
fmt.Printf("⚠️ Warning: failed to save credentials: %v\n", saveErr)
} else {
fmt.Printf("💾 Credentials saved to ~/.debros/credentials.json\n")
}
creds = newCreds
}
// Configure client with API key
config.APIKey = creds.APIKey
// Update last used time
creds.UpdateLastUsed()
auth.SaveCredentialsForDefaultGateway(creds) // Best effort save
networkClient, err := client.NewClient(config)
if err != nil {
return nil, err
@ -513,25 +597,31 @@ func isPrintableText(s string) bool {
func showHelp() {
fmt.Printf("Network CLI - Distributed P2P Network Management Tool\n\n")
fmt.Printf("Usage: network-cli <command> [args...]\n\n")
fmt.Printf("🔐 Authentication: Commands requiring authentication will automatically prompt for wallet connection.\n\n")
fmt.Printf("Commands:\n")
fmt.Printf(" health - Check network health\n")
fmt.Printf(" peers - List connected peers\n")
fmt.Printf(" status - Show network status\n")
fmt.Printf(" peer-id - Show this node's peer ID\n")
fmt.Printf(" query <sql> - Execute database query\n")
fmt.Printf(" storage get <key> - Get value from storage\n")
fmt.Printf(" storage put <key> <value> - Store value in storage\n")
fmt.Printf(" storage list [prefix] - List storage keys\n")
fmt.Printf(" pubsub publish <topic> <msg> - Publish message\n")
fmt.Printf(" pubsub subscribe <topic> [duration] - Subscribe to topic\n")
fmt.Printf(" pubsub topics - List topics\n")
fmt.Printf(" query <sql> 🔐 Execute database query\n")
fmt.Printf(" storage get <key> 🔐 Get value from storage\n")
fmt.Printf(" storage put <key> <value> 🔐 Store value in storage\n")
fmt.Printf(" storage list [prefix] 🔐 List storage keys\n")
fmt.Printf(" pubsub publish <topic> <msg> 🔐 Publish message\n")
fmt.Printf(" pubsub subscribe <topic> [duration] 🔐 Subscribe to topic\n")
fmt.Printf(" pubsub topics 🔐 List topics\n")
fmt.Printf(" connect <peer_address> - Connect to peer\n")
fmt.Printf(" help - Show this help\n\n")
fmt.Printf("Global Flags:\n")
fmt.Printf(" -b, --bootstrap <addr> - Bootstrap peer address (default: /ip4/127.0.0.1/tcp/4001)\n")
fmt.Printf(" -f, --format <format> - Output format: table, json (default: table)\n")
fmt.Printf(" -t, --timeout <duration> - Operation timeout (default: 30s)\n")
fmt.Printf(" --production - Connect to production bootstrap peers\n\n")
fmt.Printf("Authentication:\n")
fmt.Printf(" Commands marked with 🔐 will automatically prompt for wallet authentication\n")
fmt.Printf(" if no valid credentials are found. You can manage multiple wallets and\n")
fmt.Printf(" choose between them during the authentication flow.\n\n")
fmt.Printf("Examples:\n")
fmt.Printf(" network-cli health\n")
fmt.Printf(" network-cli peer-id\n")

71
cmd/gateway/config.go Normal file
View File

@ -0,0 +1,71 @@
package main
import (
"flag"
"os"
"strings"
"git.debros.io/DeBros/network/pkg/gateway"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
// For transition, alias main.GatewayConfig to pkg/gateway.Config
// server.go will be removed; this keeps compatibility until then.
type GatewayConfig = gateway.Config
func getEnvDefault(key, def string) string {
if v := os.Getenv(key); strings.TrimSpace(v) != "" {
return v
}
return def
}
func getEnvBoolDefault(key string, def bool) bool {
v := strings.TrimSpace(os.Getenv(key))
if v == "" {
return def
}
switch strings.ToLower(v) {
case "1", "true", "t", "yes", "y", "on":
return true
case "0", "false", "f", "no", "n", "off":
return false
default:
return def
}
}
// parseGatewayConfig parses flags and environment variables into GatewayConfig.
// Priority: flags > env > defaults.
func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
addr := flag.String("addr", getEnvDefault("GATEWAY_ADDR", ":8080"), "HTTP listen address (e.g., :8080)")
ns := flag.String("namespace", getEnvDefault("GATEWAY_NAMESPACE", "default"), "Client namespace for scoping resources")
peers := flag.String("bootstrap-peers", getEnvDefault("GATEWAY_BOOTSTRAP_PEERS", ""), "Comma-separated bootstrap peers for network client")
// Do not call flag.Parse() elsewhere to avoid double-parsing
flag.Parse()
var bootstrap []string
if p := strings.TrimSpace(*peers); p != "" {
parts := strings.Split(p, ",")
for _, part := range parts {
val := strings.TrimSpace(part)
if val != "" {
bootstrap = append(bootstrap, val)
}
}
}
logger.ComponentInfo(logging.ComponentGeneral, "Loaded gateway configuration",
zap.String("addr", *addr),
zap.String("namespace", *ns),
zap.Int("bootstrap_peer_count", len(bootstrap)),
)
return &gateway.Config{
ListenAddr: *addr,
ClientNamespace: *ns,
BootstrapPeers: bootstrap,
}
}

102
cmd/gateway/main.go Normal file
View File

@ -0,0 +1,102 @@
package main
import (
"context"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"git.debros.io/DeBros/network/pkg/gateway"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
func setupLogger() *logging.ColoredLogger {
logger, err := logging.NewColoredLogger(logging.ComponentGeneral, true)
if err != nil {
panic(err)
}
return logger
}
func main() {
logger := setupLogger()
// Load gateway config (flags/env)
cfg := parseGatewayConfig(logger)
logger.ComponentInfo(logging.ComponentGeneral, "Starting gateway initialization...")
// Initialize gateway (connect client, prepare routes)
gw, err := gateway.New(logger, cfg)
if err != nil {
logger.ComponentError(logging.ComponentGeneral, "failed to initialize gateway", zap.Error(err))
os.Exit(1)
}
defer gw.Close()
logger.ComponentInfo(logging.ComponentGeneral, "Gateway initialization completed successfully")
logger.ComponentInfo(logging.ComponentGeneral, "Creating HTTP server and routes...")
server := &http.Server{
Addr: cfg.ListenAddr,
Handler: gw.Routes(),
}
// Try to bind listener explicitly so binding failures are visible immediately.
logger.ComponentInfo(logging.ComponentGeneral, "Gateway HTTP server starting",
zap.String("addr", cfg.ListenAddr),
zap.String("namespace", cfg.ClientNamespace),
zap.Int("bootstrap_peer_count", len(cfg.BootstrapPeers)),
)
logger.ComponentInfo(logging.ComponentGeneral, "Attempting to bind HTTP listener...")
ln, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
logger.ComponentError(logging.ComponentGeneral, "failed to bind HTTP listen address", zap.Error(err))
// exit because server cannot function without a listener
os.Exit(1)
}
logger.ComponentInfo(logging.ComponentGeneral, "HTTP listener bound", zap.String("listen_addr", ln.Addr().String()))
// Serve in a goroutine so we can handle graceful shutdown on signals.
serveErrCh := make(chan error, 1)
go func() {
if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
serveErrCh <- err
return
}
serveErrCh <- nil
}()
// Wait for termination signal or server error
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
select {
case sig := <-quit:
logger.ComponentInfo(logging.ComponentGeneral, "shutdown signal received", zap.String("signal", sig.String()))
case err := <-serveErrCh:
if err != nil {
logger.ComponentError(logging.ComponentGeneral, "HTTP server error", zap.Error(err))
// continue to shutdown path so we close resources cleanly
} else {
logger.ComponentInfo(logging.ComponentGeneral, "HTTP server exited normally")
}
}
logger.ComponentInfo(logging.ComponentGeneral, "Shutting down gateway HTTP server...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
logger.ComponentError(logging.ComponentGeneral, "HTTP server shutdown error", zap.Error(err))
} else {
logger.ComponentInfo(logging.ComponentGeneral, "Gateway shutdown complete")
}
}

5
go.mod
View File

@ -5,6 +5,8 @@ go 1.23.8
toolchain go1.24.1
require (
github.com/ethereum/go-ethereum v1.13.14
github.com/gorilla/websocket v1.5.3
github.com/libp2p/go-libp2p v0.41.1
github.com/libp2p/go-libp2p-pubsub v0.14.2
github.com/multiformats/go-multiaddr v0.15.0
@ -17,6 +19,7 @@ require (
require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
@ -32,8 +35,8 @@ require (
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20250208200701-d0013a598941 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/go-cid v0.5.0 // indirect
github.com/ipfs/go-log/v2 v2.6.0 // indirect

8
go.sum
View File

@ -16,6 +16,10 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@ -46,6 +50,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/uo=
github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/ethereum/go-ethereum v1.13.14 h1:EwiY3FZP94derMCIam1iW4HFVrSgIcpsu0HwTQtm6CQ=
github.com/ethereum/go-ethereum v1.13.14/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
@ -97,6 +103,8 @@ github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:Fecb
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg=

View File

@ -0,0 +1,55 @@
-- DeBros Gateway - Initial database schema (SQLite/RQLite dialect)
-- This file scaffolds core tables used by the HTTP gateway for auth, observability, and namespacing.
-- Apply via your migration tooling or manual execution in RQLite.
BEGIN;
-- Tracks applied migrations (optional if your runner manages this separately)
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))
);
-- Namespaces (tenant/app isolation)
CREATE TABLE IF NOT EXISTS namespaces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- API keys (basic authentication/authorization scaffold)
CREATE TABLE IF NOT EXISTS api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL UNIQUE,
name TEXT,
namespace_id INTEGER NOT NULL,
scopes TEXT, -- comma-separated or JSON array; refine later
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_api_keys_namespace ON api_keys(namespace_id);
-- Request logs (simple observability; expand with more fields later)
CREATE TABLE IF NOT EXISTS request_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
method TEXT NOT NULL,
path TEXT NOT NULL,
status_code INTEGER NOT NULL,
bytes_out INTEGER NOT NULL DEFAULT 0,
duration_ms INTEGER NOT NULL DEFAULT 0,
ip TEXT,
api_key_id INTEGER,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_request_logs_api_key ON request_logs(api_key_id);
CREATE INDEX IF NOT EXISTS idx_request_logs_created_at ON request_logs(created_at);
-- Seed a default namespace for development convenience
INSERT OR IGNORE INTO namespaces(name) VALUES ('default');
-- Mark this migration as applied (optional)
INSERT OR IGNORE INTO schema_migrations(version) VALUES (1);
COMMIT;

95
migrations/002_core.sql Normal file
View File

@ -0,0 +1,95 @@
-- DeBros Gateway - Core schema (Phase 2)
-- Adds apps, nonces, subscriptions, refresh_tokens, audit_events, namespace_ownership
-- SQLite/RQLite dialect
BEGIN;
-- Apps registered within a namespace (optional public key for attestation)
CREATE TABLE IF NOT EXISTS apps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
app_id TEXT NOT NULL,
name TEXT,
public_key TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(namespace_id, app_id),
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_apps_namespace ON apps(namespace_id);
-- Wallet nonces for challenge-response auth
CREATE TABLE IF NOT EXISTS nonces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
wallet TEXT NOT NULL,
nonce TEXT NOT NULL,
purpose TEXT,
expires_at TIMESTAMP,
used_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(namespace_id, wallet, nonce),
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_nonces_wallet ON nonces(wallet);
CREATE INDEX IF NOT EXISTS idx_nonces_expires ON nonces(expires_at);
-- Subscriptions to topics or channels for callbacks/notifications
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
app_id INTEGER,
topic TEXT NOT NULL,
endpoint TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE,
FOREIGN KEY(app_id) REFERENCES apps(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_subscriptions_ns ON subscriptions(namespace_id);
CREATE INDEX IF NOT EXISTS idx_subscriptions_topic ON subscriptions(topic);
-- Opaque refresh tokens for JWT
CREATE TABLE IF NOT EXISTS refresh_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
subject TEXT NOT NULL,
token TEXT NOT NULL UNIQUE,
audience TEXT,
expires_at TIMESTAMP,
revoked_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_refresh_subject ON refresh_tokens(subject);
CREATE INDEX IF NOT EXISTS idx_refresh_expires ON refresh_tokens(expires_at);
-- Audit events for security and observability
CREATE TABLE IF NOT EXISTS audit_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
actor TEXT,
action TEXT NOT NULL,
resource TEXT,
ip TEXT,
metadata TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_audit_ns_time ON audit_events(namespace_id, created_at);
CREATE INDEX IF NOT EXISTS idx_audit_action ON audit_events(action);
-- Namespace ownership mapping (who controls a namespace)
CREATE TABLE IF NOT EXISTS namespace_ownership (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
owner_type TEXT NOT NULL, -- e.g., 'wallet', 'api_key'
owner_id TEXT NOT NULL, -- e.g., wallet address or api key string
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(namespace_id, owner_type, owner_id),
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_ns_owner_ns ON namespace_ownership(namespace_id);
-- Optional marker (ignored by runner)
INSERT OR IGNORE INTO schema_migrations(version) VALUES (2);
COMMIT;

View File

@ -0,0 +1,21 @@
-- DeBros Gateway - Wallet to API Key linkage (Phase 3)
-- Ensures one API key per (namespace, wallet) and enables lookup
BEGIN;
CREATE TABLE IF NOT EXISTS wallet_api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace_id INTEGER NOT NULL,
wallet TEXT NOT NULL,
api_key_id INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(namespace_id, wallet),
FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE,
FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_wallet_api_keys_ns ON wallet_api_keys(namespace_id);
INSERT OR IGNORE INTO schema_migrations(version) VALUES (3);
COMMIT;

234
pkg/auth/credentials.go Normal file
View File

@ -0,0 +1,234 @@
package auth
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
)
// Credentials represents authentication credentials for a specific gateway
type Credentials struct {
APIKey string `json:"api_key"`
RefreshToken string `json:"refresh_token,omitempty"`
Namespace string `json:"namespace"`
UserID string `json:"user_id,omitempty"`
Wallet string `json:"wallet,omitempty"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
IssuedAt time.Time `json:"issued_at"`
LastUsedAt time.Time `json:"last_used_at,omitempty"`
Plan string `json:"plan,omitempty"`
}
// CredentialStore manages credentials for multiple gateways
type CredentialStore struct {
Gateways map[string]*Credentials `json:"gateways"`
Version string `json:"version"`
}
// GetCredentialsPath returns the path to the credentials file
func GetCredentialsPath() (string, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("failed to get home directory: %w", err)
}
debrosDir := filepath.Join(homeDir, ".debros")
if err := os.MkdirAll(debrosDir, 0700); err != nil {
return "", fmt.Errorf("failed to create .debros directory: %w", err)
}
return filepath.Join(debrosDir, "credentials.json"), nil
}
// LoadCredentials loads credentials from ~/.debros/credentials.json
func LoadCredentials() (*CredentialStore, error) {
credPath, err := GetCredentialsPath()
if err != nil {
return nil, err
}
// If file doesn't exist, return empty store
if _, err := os.Stat(credPath); os.IsNotExist(err) {
return &CredentialStore{
Gateways: make(map[string]*Credentials),
Version: "1.0",
}, nil
}
data, err := os.ReadFile(credPath)
if err != nil {
return nil, fmt.Errorf("failed to read credentials file: %w", err)
}
var store CredentialStore
if err := json.Unmarshal(data, &store); err != nil {
return nil, fmt.Errorf("failed to parse credentials file: %w", err)
}
// Initialize gateways map if nil
if store.Gateways == nil {
store.Gateways = make(map[string]*Credentials)
}
// Set version if empty
if store.Version == "" {
store.Version = "1.0"
}
return &store, nil
}
// SaveCredentials saves credentials to ~/.debros/credentials.json
func (store *CredentialStore) SaveCredentials() error {
credPath, err := GetCredentialsPath()
if err != nil {
return err
}
// Ensure version is set
if store.Version == "" {
store.Version = "1.0"
}
data, err := json.MarshalIndent(store, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal credentials: %w", err)
}
// Write with restricted permissions (readable only by owner)
if err := os.WriteFile(credPath, data, 0600); err != nil {
return fmt.Errorf("failed to write credentials file: %w", err)
}
return nil
}
// GetCredentialsForGateway returns credentials for a specific gateway URL
func (store *CredentialStore) GetCredentialsForGateway(gatewayURL string) (*Credentials, bool) {
creds, exists := store.Gateways[gatewayURL]
if !exists || creds == nil {
return nil, false
}
// Check if credentials are expired (if expiration is set)
if !creds.ExpiresAt.IsZero() && time.Now().After(creds.ExpiresAt) {
return nil, false
}
return creds, true
}
// SetCredentialsForGateway stores credentials for a specific gateway URL
func (store *CredentialStore) SetCredentialsForGateway(gatewayURL string, creds *Credentials) {
if store.Gateways == nil {
store.Gateways = make(map[string]*Credentials)
}
// Update last used time
creds.LastUsedAt = time.Now()
store.Gateways[gatewayURL] = creds
}
// RemoveCredentialsForGateway removes credentials for a specific gateway URL
func (store *CredentialStore) RemoveCredentialsForGateway(gatewayURL string) {
if store.Gateways != nil {
delete(store.Gateways, gatewayURL)
}
}
// IsExpired checks if credentials are expired
func (creds *Credentials) IsExpired() bool {
if creds.ExpiresAt.IsZero() {
return false // No expiration set
}
return time.Now().After(creds.ExpiresAt)
}
// IsValid checks if credentials are valid (not empty and not expired)
func (creds *Credentials) IsValid() bool {
if creds == nil {
return false
}
if creds.APIKey == "" {
return false
}
return !creds.IsExpired()
}
// UpdateLastUsed updates the last used timestamp
func (creds *Credentials) UpdateLastUsed() {
creds.LastUsedAt = time.Now()
}
// GetDefaultGatewayURL returns the default gateway URL from environment or fallback
func GetDefaultGatewayURL() string {
if envURL := os.Getenv("DEBROS_GATEWAY_URL"); envURL != "" {
return envURL
}
if envURL := os.Getenv("DEBROS_GATEWAY"); envURL != "" {
return envURL
}
return "http://localhost:8005"
}
// HasValidCredentials checks if there are valid credentials for the default gateway
func HasValidCredentials() (bool, error) {
store, err := LoadCredentials()
if err != nil {
return false, err
}
gatewayURL := GetDefaultGatewayURL()
creds, exists := store.GetCredentialsForGateway(gatewayURL)
return exists && creds.IsValid(), nil
}
// GetValidCredentials returns valid credentials for the default gateway
func GetValidCredentials() (*Credentials, error) {
store, err := LoadCredentials()
if err != nil {
return nil, err
}
gatewayURL := GetDefaultGatewayURL()
creds, exists := store.GetCredentialsForGateway(gatewayURL)
if !exists {
return nil, fmt.Errorf("no credentials found for gateway %s", gatewayURL)
}
if !creds.IsValid() {
return nil, fmt.Errorf("credentials for gateway %s are expired or invalid", gatewayURL)
}
return creds, nil
}
// SaveCredentialsForDefaultGateway saves credentials for the default gateway
func SaveCredentialsForDefaultGateway(creds *Credentials) error {
store, err := LoadCredentials()
if err != nil {
return err
}
gatewayURL := GetDefaultGatewayURL()
store.SetCredentialsForGateway(gatewayURL, creds)
return store.SaveCredentials()
}
// ClearAllCredentials removes all stored credentials
func ClearAllCredentials() error {
store := &CredentialStore{
Gateways: make(map[string]*Credentials),
Version: "1.0",
}
return store.SaveCredentials()
}

395
pkg/auth/enhanced_auth.go Normal file
View File

@ -0,0 +1,395 @@
package auth
import (
"bufio"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
)
// EnhancedCredentialStore manages multiple credentials per gateway
type EnhancedCredentialStore struct {
Gateways map[string]*GatewayCredentials `json:"gateways"`
Version string `json:"version"`
}
// GatewayCredentials holds multiple credentials for a single gateway
type GatewayCredentials struct {
Credentials []*Credentials `json:"credentials"`
DefaultIndex int `json:"default_index"`
LastUsedIndex int `json:"last_used_index"`
}
// AuthChoice represents user's choice during authentication
type AuthChoice int
const (
AuthChoiceUseCredential AuthChoice = iota
AuthChoiceAddCredential
AuthChoiceLogout
AuthChoiceExit
)
// LoadEnhancedCredentials loads the enhanced credential store
func LoadEnhancedCredentials() (*EnhancedCredentialStore, error) {
credPath, err := GetCredentialsPath()
if err != nil {
return nil, err
}
// If file doesn't exist, return empty store
if _, err := os.Stat(credPath); os.IsNotExist(err) {
return &EnhancedCredentialStore{
Gateways: make(map[string]*GatewayCredentials),
Version: "2.0",
}, nil
}
data, err := os.ReadFile(credPath)
if err != nil {
return nil, fmt.Errorf("failed to read credentials file: %w", err)
}
// Try to parse as enhanced store first
var enhancedStore EnhancedCredentialStore
if err := json.Unmarshal(data, &enhancedStore); err == nil && enhancedStore.Version == "2.0" {
// Initialize maps if nil
if enhancedStore.Gateways == nil {
enhancedStore.Gateways = make(map[string]*GatewayCredentials)
}
return &enhancedStore, nil
}
// Fall back to old format and migrate
var oldStore CredentialStore
if err := json.Unmarshal(data, &oldStore); err != nil {
return nil, fmt.Errorf("failed to parse credentials file: %w", err)
}
// Migrate old format to new
enhancedStore = EnhancedCredentialStore{
Gateways: make(map[string]*GatewayCredentials),
Version: "2.0",
}
for gatewayURL, creds := range oldStore.Gateways {
if creds != nil {
enhancedStore.Gateways[gatewayURL] = &GatewayCredentials{
Credentials: []*Credentials{creds},
DefaultIndex: 0,
LastUsedIndex: 0,
}
}
}
return &enhancedStore, nil
}
// Save saves the enhanced credential store
func (store *EnhancedCredentialStore) Save() error {
credPath, err := GetCredentialsPath()
if err != nil {
return err
}
if store.Version == "" {
store.Version = "2.0"
}
data, err := json.MarshalIndent(store, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal credentials: %w", err)
}
return os.WriteFile(credPath, data, 0600)
}
// AddCredential adds a new credential for the gateway
func (store *EnhancedCredentialStore) AddCredential(gatewayURL string, creds *Credentials) {
if store.Gateways == nil {
store.Gateways = make(map[string]*GatewayCredentials)
}
gatewayCredentials := store.Gateways[gatewayURL]
if gatewayCredentials == nil {
gatewayCredentials = &GatewayCredentials{
Credentials: []*Credentials{},
DefaultIndex: 0,
LastUsedIndex: 0,
}
store.Gateways[gatewayURL] = gatewayCredentials
}
// Check if credential already exists (by wallet address)
for i, existing := range gatewayCredentials.Credentials {
if strings.EqualFold(existing.Wallet, creds.Wallet) {
// Update existing credential
gatewayCredentials.Credentials[i] = creds
return
}
}
// Add new credential
gatewayCredentials.Credentials = append(gatewayCredentials.Credentials, creds)
}
// GetDefaultCredential returns the default credential for a gateway
func (store *EnhancedCredentialStore) GetDefaultCredential(gatewayURL string) *Credentials {
gatewayCredentials := store.Gateways[gatewayURL]
if gatewayCredentials == nil || len(gatewayCredentials.Credentials) == 0 {
return nil
}
// Ensure default index is valid
if gatewayCredentials.DefaultIndex < 0 || gatewayCredentials.DefaultIndex >= len(gatewayCredentials.Credentials) {
gatewayCredentials.DefaultIndex = 0
}
return gatewayCredentials.Credentials[gatewayCredentials.DefaultIndex]
}
// SetDefaultCredential sets the default credential by index
func (store *EnhancedCredentialStore) SetDefaultCredential(gatewayURL string, index int) bool {
gatewayCredentials := store.Gateways[gatewayURL]
if gatewayCredentials == nil || index < 0 || index >= len(gatewayCredentials.Credentials) {
return false
}
gatewayCredentials.DefaultIndex = index
gatewayCredentials.LastUsedIndex = index
return true
}
// ClearAllCredentials removes all credentials
func (store *EnhancedCredentialStore) ClearAllCredentials() {
store.Gateways = make(map[string]*GatewayCredentials)
}
// DisplayCredentialMenu shows the interactive credential selection menu
func (store *EnhancedCredentialStore) DisplayCredentialMenu(gatewayURL string) (AuthChoice, int, error) {
gatewayCredentials := store.Gateways[gatewayURL]
if gatewayCredentials == nil || len(gatewayCredentials.Credentials) == 0 {
fmt.Println("\n🔐 No credentials found. Choose an option:")
fmt.Println("1. Authenticate with new wallet")
fmt.Println("2. Exit")
fmt.Print("Choose (1-2): ")
choice, err := readUserChoice(2)
if err != nil {
return AuthChoiceExit, -1, err
}
switch choice {
case 1:
return AuthChoiceAddCredential, -1, nil
case 2:
return AuthChoiceExit, -1, nil
default:
return AuthChoiceExit, -1, fmt.Errorf("invalid choice")
}
}
fmt.Printf("\n🔐 Multiple wallets available for %s:\n", gatewayURL)
// Display credentials
for i, creds := range gatewayCredentials.Credentials {
defaultMark := ""
if i == gatewayCredentials.DefaultIndex {
defaultMark = " (default)"
}
// Format wallet address for display
displayAddr := creds.Wallet
if len(displayAddr) > 10 {
displayAddr = displayAddr[:6] + "..." + displayAddr[len(displayAddr)-4:]
}
statusEmoji := "✅"
if !creds.IsValid() {
statusEmoji = "❌"
}
planInfo := ""
if creds.Plan != "" {
planInfo = fmt.Sprintf(" (%s)", creds.Plan)
}
fmt.Printf("%d. %s %s%s%s\n", i+1, statusEmoji, displayAddr, planInfo, defaultMark)
}
fmt.Printf("%d. Add new wallet\n", len(gatewayCredentials.Credentials)+1)
fmt.Printf("%d. Logout (clear all credentials)\n", len(gatewayCredentials.Credentials)+2)
fmt.Printf("%d. Exit\n", len(gatewayCredentials.Credentials)+3)
maxChoice := len(gatewayCredentials.Credentials) + 3
fmt.Printf("Choose (1-%d): ", maxChoice)
choice, err := readUserChoice(maxChoice)
if err != nil {
return AuthChoiceExit, -1, err
}
if choice <= len(gatewayCredentials.Credentials) {
// User selected a credential
return AuthChoiceUseCredential, choice - 1, nil
} else if choice == len(gatewayCredentials.Credentials)+1 {
// Add new credential
return AuthChoiceAddCredential, -1, nil
} else if choice == len(gatewayCredentials.Credentials)+2 {
// Logout
return AuthChoiceLogout, -1, nil
} else {
// Exit
return AuthChoiceExit, -1, nil
}
}
// readUserChoice reads and validates user input
func readUserChoice(maxChoice int) (int, error) {
reader := bufio.NewReader(os.Stdin)
input, err := reader.ReadString('\n')
if err != nil {
return 0, fmt.Errorf("failed to read input: %w", err)
}
choiceStr := strings.TrimSpace(input)
choice, err := strconv.Atoi(choiceStr)
if err != nil {
return 0, fmt.Errorf("invalid input: please enter a number")
}
if choice < 1 || choice > maxChoice {
return 0, fmt.Errorf("invalid choice: please enter a number between 1 and %d", maxChoice)
}
return choice, nil
}
// GetOrPromptForCredentials handles the complete authentication flow
func GetOrPromptForCredentials(gatewayURL string) (*Credentials, error) {
store, err := LoadEnhancedCredentials()
if err != nil {
return nil, fmt.Errorf("failed to load credential store: %w", err)
}
// Check if we have a valid default credential
defaultCreds := store.GetDefaultCredential(gatewayURL)
if defaultCreds != nil && defaultCreds.IsValid() {
// Update last used time
defaultCreds.UpdateLastUsed()
if err := store.Save(); err != nil {
// Log warning but don't fail
fmt.Fprintf(os.Stderr, "Warning: failed to update last used time: %v\n", err)
}
return defaultCreds, nil
}
// Need to prompt user for credential selection
for {
choice, credIndex, err := store.DisplayCredentialMenu(gatewayURL)
if err != nil {
return nil, fmt.Errorf("menu selection failed: %w", err)
}
switch choice {
case AuthChoiceUseCredential:
gatewayCredentials := store.Gateways[gatewayURL]
if gatewayCredentials == nil || credIndex < 0 || credIndex >= len(gatewayCredentials.Credentials) {
fmt.Println("❌ Invalid credential selection")
continue
}
selectedCreds := gatewayCredentials.Credentials[credIndex]
if !selectedCreds.IsValid() {
fmt.Println("❌ Selected credentials are invalid or expired")
continue
}
// Update default and last used
store.SetDefaultCredential(gatewayURL, credIndex)
selectedCreds.UpdateLastUsed()
if err := store.Save(); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to save credentials: %v\n", err)
}
return selectedCreds, nil
case AuthChoiceAddCredential:
fmt.Println("\n🌐 Opening browser for wallet authentication...")
newCreds, err := PerformWalletAuthentication(gatewayURL)
if err != nil {
fmt.Printf("❌ Authentication failed: %v\n", err)
continue
}
// Add the new credential
store.AddCredential(gatewayURL, newCreds)
// Set as default if it's the first credential
gatewayCredentials := store.Gateways[gatewayURL]
if gatewayCredentials != nil && len(gatewayCredentials.Credentials) == 1 {
store.SetDefaultCredential(gatewayURL, 0)
}
if err := store.Save(); err != nil {
return nil, fmt.Errorf("failed to save new credentials: %w", err)
}
fmt.Printf("✅ Wallet %s added successfully\n", newCreds.Wallet)
return newCreds, nil
case AuthChoiceLogout:
store.ClearAllCredentials()
if err := store.Save(); err != nil {
return nil, fmt.Errorf("failed to clear credentials: %w", err)
}
fmt.Println("✅ All credentials cleared")
continue
case AuthChoiceExit:
return nil, fmt.Errorf("authentication cancelled by user")
default:
fmt.Println("❌ Invalid choice")
continue
}
}
}
// HasValidEnhancedCredentials checks if there are valid credentials for the default gateway
func HasValidEnhancedCredentials() (bool, error) {
store, err := LoadEnhancedCredentials()
if err != nil {
return false, err
}
gatewayURL := GetDefaultGatewayURL()
defaultCreds := store.GetDefaultCredential(gatewayURL)
return defaultCreds != nil && defaultCreds.IsValid(), nil
}
// GetValidEnhancedCredentials returns valid credentials for the default gateway
func GetValidEnhancedCredentials() (*Credentials, error) {
store, err := LoadEnhancedCredentials()
if err != nil {
return nil, err
}
gatewayURL := GetDefaultGatewayURL()
defaultCreds := store.GetDefaultCredential(gatewayURL)
if defaultCreds == nil {
return nil, fmt.Errorf("no credentials found for gateway %s", gatewayURL)
}
if !defaultCreds.IsValid() {
return nil, fmt.Errorf("credentials for gateway %s are expired or invalid", gatewayURL)
}
return defaultCreds, nil
}

310
pkg/auth/wallet.go Normal file
View File

@ -0,0 +1,310 @@
package auth
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"os/exec"
"runtime"
"strings"
"sync"
"time"
)
// WalletAuthResult represents the result of wallet authentication
type WalletAuthResult struct {
APIKey string `json:"api_key"`
RefreshToken string `json:"refresh_token,omitempty"`
Namespace string `json:"namespace"`
Wallet string `json:"wallet"`
Plan string `json:"plan,omitempty"`
ExpiresAt string `json:"expires_at,omitempty"`
}
// AuthServer handles the local HTTP server for receiving auth callbacks
type AuthServer struct {
server *http.Server
listener net.Listener
result chan WalletAuthResult
err chan error
mu sync.Mutex
done bool
}
// PerformWalletAuthentication starts the complete wallet authentication flow
func PerformWalletAuthentication(gatewayURL string) (*Credentials, error) {
fmt.Printf("🔐 Starting wallet authentication for gateway: %s\n", gatewayURL)
// Start local callback server
authServer, err := NewAuthServer()
if err != nil {
return nil, fmt.Errorf("failed to start auth server: %w", err)
}
defer authServer.Close()
callbackURL := fmt.Sprintf("http://localhost:%d/callback", authServer.GetPort())
fmt.Printf("📡 Authentication server started on port %d\n", authServer.GetPort())
// Open browser to gateway auth page
authURL := fmt.Sprintf("%s/v1/auth/login?callback=%s", gatewayURL, url.QueryEscape(callbackURL))
fmt.Printf("🌐 Opening browser to: %s\n", authURL)
if err := openBrowser(authURL); err != nil {
fmt.Printf("⚠️ Failed to open browser automatically: %v\n", err)
fmt.Printf("📋 Please manually open this URL in your browser:\n%s\n", authURL)
}
fmt.Println("⏳ Waiting for authentication to complete...")
fmt.Println("💡 Complete the wallet signature in your browser, then return here.")
// Wait for authentication result with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
select {
case result := <-authServer.result:
fmt.Println("✅ Authentication successful!")
return convertAuthResult(result), nil
case err := <-authServer.err:
return nil, fmt.Errorf("authentication failed: %w", err)
case <-ctx.Done():
return nil, fmt.Errorf("authentication timed out after 5 minutes")
}
}
// NewAuthServer creates a new authentication callback server
func NewAuthServer() (*AuthServer, error) {
// Listen on random available port
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to create listener: %w", err)
}
authServer := &AuthServer{
listener: listener,
result: make(chan WalletAuthResult, 1),
err: make(chan error, 1),
}
mux := http.NewServeMux()
mux.HandleFunc("/callback", authServer.handleCallback)
mux.HandleFunc("/health", authServer.handleHealth)
authServer.server = &http.Server{
Handler: mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
// Start server in background
go func() {
if err := authServer.server.Serve(listener); err != nil && err != http.ErrServerClosed {
authServer.err <- fmt.Errorf("auth server error: %w", err)
}
}()
return authServer, nil
}
// GetPort returns the port the server is listening on
func (as *AuthServer) GetPort() int {
return as.listener.Addr().(*net.TCPAddr).Port
}
// Close shuts down the authentication server
func (as *AuthServer) Close() error {
as.mu.Lock()
defer as.mu.Unlock()
if as.done {
return nil
}
as.done = true
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return as.server.Shutdown(ctx)
}
// handleCallback processes the authentication callback from the gateway
func (as *AuthServer) handleCallback(w http.ResponseWriter, r *http.Request) {
as.mu.Lock()
if as.done {
as.mu.Unlock()
return
}
as.mu.Unlock()
// Parse query parameters
query := r.URL.Query()
// Check for error
if errMsg := query.Get("error"); errMsg != "" {
as.err <- fmt.Errorf("authentication error: %s", errMsg)
http.Error(w, "Authentication failed", http.StatusBadRequest)
return
}
// Extract authentication result
result := WalletAuthResult{
APIKey: query.Get("api_key"),
RefreshToken: query.Get("refresh_token"),
Namespace: query.Get("namespace"),
Wallet: query.Get("wallet"),
Plan: query.Get("plan"),
ExpiresAt: query.Get("expires_at"),
}
// Validate required fields
if result.APIKey == "" || result.Namespace == "" {
as.err <- fmt.Errorf("incomplete authentication response: missing api_key or namespace")
http.Error(w, "Incomplete authentication response", http.StatusBadRequest)
return
}
// Send success response to browser
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `
<!DOCTYPE html>
<html>
<head>
<title>Authentication Successful</title>
<style>
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; background: #f5f5f5; }
.container { background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); max-width: 500px; margin: 0 auto; }
.success { color: #4CAF50; font-size: 48px; margin-bottom: 20px; }
.details { background: #f8f9fa; padding: 20px; border-radius: 5px; margin: 20px 0; text-align: left; }
.key { font-family: monospace; background: #e9ecef; padding: 10px; border-radius: 3px; word-break: break-all; }
</style>
</head>
<body>
<div class="container">
<div class="success"></div>
<h1>Authentication Successful!</h1>
<p>You have successfully authenticated with your wallet.</p>
<div class="details">
<h3>🔑 Your Credentials:</h3>
<p><strong>API Key:</strong></p>
<div class="key">%s</div>
<p><strong>Namespace:</strong> %s</p>
<p><strong>Wallet:</strong> %s</p>
%s
</div>
<p>Your credentials have been saved securely to <code>~/.debros/credentials.json</code></p>
<p><strong>You can now close this browser window and return to your terminal.</strong></p>
</div>
</body>
</html>`,
result.APIKey,
result.Namespace,
result.Wallet,
func() string {
if result.Plan != "" {
return fmt.Sprintf("<p><strong>Plan:</strong> %s</p>", result.Plan)
}
return ""
}(),
)
// Send result to waiting goroutine
select {
case as.result <- result:
// Success
default:
// Channel full, ignore
}
}
// handleHealth provides a simple health check endpoint
func (as *AuthServer) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"status": "ok",
"server": "debros-auth-callback",
})
}
// convertAuthResult converts WalletAuthResult to Credentials
func convertAuthResult(result WalletAuthResult) *Credentials {
creds := &Credentials{
APIKey: result.APIKey,
Namespace: result.Namespace,
UserID: result.Wallet,
Wallet: result.Wallet,
IssuedAt: time.Now(),
Plan: result.Plan,
}
// Set refresh token if provided
if result.RefreshToken != "" {
creds.RefreshToken = result.RefreshToken
}
// Parse expiration if provided
if result.ExpiresAt != "" {
if expTime, err := time.Parse(time.RFC3339, result.ExpiresAt); err == nil {
creds.ExpiresAt = expTime
}
}
return creds
}
// openBrowser opens the default browser to the specified URL
func openBrowser(url string) error {
var cmd string
var args []string
switch runtime.GOOS {
case "windows":
cmd = "cmd"
args = []string{"/c", "start"}
case "darwin":
cmd = "open"
default: // "linux", "freebsd", "openbsd", "netbsd"
cmd = "xdg-open"
}
args = append(args, url)
return exec.Command(cmd, args...).Start()
}
// GenerateRandomString generates a cryptographically secure random string
func GenerateRandomString(length int) (string, error) {
bytes := make([]byte, length)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes)[:length], nil
}
// ValidateWalletAddress validates that a wallet address is properly formatted
func ValidateWalletAddress(address string) bool {
// Remove 0x prefix if present
addr := strings.TrimPrefix(strings.ToLower(address), "0x")
// Check length (Ethereum addresses are 40 hex characters)
if len(addr) != 40 {
return false
}
// Check if all characters are hex
_, err := hex.DecodeString(addr)
return err == nil
}
// FormatWalletAddress formats a wallet address consistently
func FormatWalletAddress(address string) string {
addr := strings.TrimPrefix(strings.ToLower(address), "0x")
return "0x" + addr
}

View File

@ -2,7 +2,10 @@ package client
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
@ -41,6 +44,9 @@ type Client struct {
connected bool
startTime time.Time
mu sync.RWMutex
// resolvedNamespace is the namespace derived from JWT/APIKey.
resolvedNamespace string
}
// NewClient creates a new network client
@ -118,6 +124,13 @@ func (c *Client) Connect() error {
return nil
}
// Derive and set namespace from provided credentials
ns, err := c.deriveNamespace()
if err != nil {
return fmt.Errorf("failed to derive namespace: %w", err)
}
c.resolvedNamespace = ns
// Create LibP2P host with optional Anyone proxy for TCP and optional QUIC disable
var opts []libp2p.Option
opts = append(opts,
@ -153,6 +166,8 @@ func (c *Client) Connect() error {
zap.Strings("listen_addrs", addrStrs),
)
c.logger.Info("Creating GossipSub...")
// Create LibP2P GossipSub with PeerExchange enabled (gossip-based peer exchange).
// Peer exchange helps propagate peer addresses via pubsub gossip and is enabled
// globally so discovery works without Anchat-specific branches.
@ -165,17 +180,39 @@ func (c *Client) Connect() error {
return fmt.Errorf("failed to create pubsub: %w", err)
}
c.libp2pPS = ps
c.logger.Info("GossipSub created successfully")
// Create pubsub bridge once and store it
adapter := pubsub.NewClientAdapter(c.libp2pPS, c.getAppNamespace())
c.pubsub = &pubSubBridge{adapter: adapter}
c.logger.Info("Creating pubsub bridge...")
// Create storage client with the host
storageClient := storage.NewClient(h, c.getAppNamespace(), c.logger)
c.logger.Info("Getting app namespace for pubsub...")
// Access namespace directly to avoid deadlock (we already hold c.mu.Lock())
var namespace string
if c.resolvedNamespace != "" {
namespace = c.resolvedNamespace
} else {
namespace = c.config.AppName
}
c.logger.Info("App namespace retrieved", zap.String("namespace", namespace))
c.logger.Info("Calling pubsub.NewClientAdapter...")
adapter := pubsub.NewClientAdapter(c.libp2pPS, namespace)
c.logger.Info("pubsub.NewClientAdapter completed successfully")
c.logger.Info("Creating pubSubBridge...")
c.pubsub = &pubSubBridge{client: c, adapter: adapter}
c.logger.Info("Pubsub bridge created successfully")
c.logger.Info("Creating storage client...")
// Create storage client with the host (use namespace directly to avoid deadlock)
storageClient := storage.NewClient(h, namespace, c.logger)
c.storage = &StorageClientImpl{
client: c,
storageClient: storageClient,
}
c.logger.Info("Storage client created successfully")
c.logger.Info("Starting bootstrap peer connections...")
// Connect to bootstrap peers FIRST
ctx, cancel := context.WithTimeout(context.Background(), c.config.ConnectTimeout)
@ -183,6 +220,7 @@ func (c *Client) Connect() error {
bootstrapPeersConnected := 0
for _, bootstrapAddr := range c.config.BootstrapPeers {
c.logger.Info("Attempting to connect to bootstrap peer", zap.String("addr", bootstrapAddr))
if err := c.connectToBootstrap(ctx, bootstrapAddr); err != nil {
c.logger.Warn("Failed to connect to bootstrap peer",
zap.String("addr", bootstrapAddr),
@ -190,12 +228,17 @@ func (c *Client) Connect() error {
continue
}
bootstrapPeersConnected++
c.logger.Info("Successfully connected to bootstrap peer", zap.String("addr", bootstrapAddr))
}
if bootstrapPeersConnected == 0 {
c.logger.Warn("No bootstrap peers connected, continuing anyway")
} else {
c.logger.Info("Bootstrap peer connections completed", zap.Int("connected_count", bootstrapPeersConnected))
}
c.logger.Info("Adding bootstrap peers to peerstore...")
// Add bootstrap peers to peerstore so we can connect to them later
for _, bootstrapAddr := range c.config.BootstrapPeers {
if ma, err := multiaddr.NewMultiaddr(bootstrapAddr); err == nil {
@ -206,6 +249,9 @@ func (c *Client) Connect() error {
}
}
}
c.logger.Info("Bootstrap peers added to peerstore")
c.logger.Info("Starting connection monitoring...")
// Client is a lightweight P2P participant - no discovery needed
// We only connect to known bootstrap peers and let nodes handle discovery
@ -213,10 +259,14 @@ func (c *Client) Connect() error {
// Start minimal connection monitoring
c.startConnectionMonitoring()
c.logger.Info("Connection monitoring started")
c.logger.Info("Setting connected state...")
c.connected = true
c.logger.Info("Connected state set to true")
c.logger.Info("Client connected", zap.String("namespace", c.getAppNamespace()))
c.logger.Info("Client connected", zap.String("namespace", namespace))
return nil
}
@ -290,5 +340,100 @@ func (c *Client) isConnected() bool {
// getAppNamespace returns the namespace for this app
func (c *Client) getAppNamespace() string {
c.mu.RLock()
defer c.mu.RUnlock()
if c.resolvedNamespace != "" {
return c.resolvedNamespace
}
return c.config.AppName
}
// requireAccess enforces that credentials are present and that any context-based namespace overrides match
func (c *Client) requireAccess(ctx context.Context) error {
// Allow internal system operations to bypass authentication
if IsInternalContext(ctx) {
return nil
}
cfg := c.Config()
if cfg == nil || (strings.TrimSpace(cfg.APIKey) == "" && strings.TrimSpace(cfg.JWT) == "") {
return fmt.Errorf("access denied: API key or JWT required")
}
ns := c.getAppNamespace()
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" && s != ns {
return fmt.Errorf("access denied: namespace mismatch")
}
}
if v := ctx.Value(pubsub.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" && s != ns {
return fmt.Errorf("access denied: namespace mismatch")
}
}
return nil
}
// deriveNamespace determines the namespace from JWT or API key.
func (c *Client) deriveNamespace() (string, error) {
// Prefer JWT claim {"Namespace": "..."}
if strings.TrimSpace(c.config.JWT) != "" {
ns, err := parseJWTNamespace(c.config.JWT)
if err != nil {
return "", err
}
if ns != "" {
return ns, nil
}
}
// Fallback to API key format ak_<random>:<namespace>
if strings.TrimSpace(c.config.APIKey) != "" {
ns, err := parseAPIKeyNamespace(c.config.APIKey)
if err != nil {
return "", err
}
if ns != "" {
return ns, nil
}
}
return c.config.AppName, nil
}
// parseJWTNamespace decodes base64url payload to extract Namespace claim (no signature verification)
func parseJWTNamespace(token string) (string, error) {
parts := strings.Split(token, ".")
if len(parts) < 2 {
return "", fmt.Errorf("invalid JWT format")
}
payload := parts[1]
// Decode base64url (raw, no padding)
data, err := base64.RawURLEncoding.DecodeString(payload)
if err != nil {
return "", fmt.Errorf("failed to decode JWT payload: %w", err)
}
// Minimal JSON struct
var claims struct {
Namespace string `json:"Namespace"`
}
if err := json.Unmarshal(data, &claims); err != nil {
return "", fmt.Errorf("failed to parse JWT claims: %w", err)
}
return strings.TrimSpace(claims.Namespace), nil
}
// parseAPIKeyNamespace extracts the namespace from ak_<random>:<namespace>
func parseAPIKeyNamespace(key string) (string, error) {
key = strings.TrimSpace(key)
if key == "" {
return "", fmt.Errorf("invalid API key: empty")
}
// Allow but ignore prefix ak_
parts := strings.Split(key, ":")
if len(parts) != 2 {
return "", fmt.Errorf("invalid API key format: expected ak_<random>:<namespace>")
}
ns := strings.TrimSpace(parts[1])
if ns == "" {
return "", fmt.Errorf("invalid API key: empty namespace")
}
return ns, nil
}

41
pkg/client/context.go Normal file
View File

@ -0,0 +1,41 @@
package client
import (
"context"
"git.debros.io/DeBros/network/pkg/pubsub"
"git.debros.io/DeBros/network/pkg/storage"
)
// contextKey for internal operations
type contextKey string
const (
// ctxKeyInternal marks contexts for internal system operations that bypass auth
ctxKeyInternal contextKey = "internal_operation"
)
// WithNamespace applies both storage and pubsub namespace overrides to the context.
// It is a convenience helper for client callers to ensure both subsystems receive
// the same, consistent namespace override.
func WithNamespace(ctx context.Context, ns string) context.Context {
ctx = storage.WithNamespace(ctx, ns)
ctx = pubsub.WithNamespace(ctx, ns)
return ctx
}
// WithInternalAuth creates a context that bypasses authentication for internal system operations.
// This should only be used by the system itself (migrations, internal tasks, etc.)
func WithInternalAuth(ctx context.Context) context.Context {
return context.WithValue(ctx, ctxKeyInternal, true)
}
// IsInternalContext checks if a context is marked for internal operations
func IsInternalContext(ctx context.Context) bool {
if v := ctx.Value(ctxKeyInternal); v != nil {
if internal, ok := v.(bool); ok {
return internal
}
}
return false
}

View File

@ -61,6 +61,10 @@ func (d *DatabaseClientImpl) Query(ctx context.Context, sql string, args ...inte
return nil, err
}
if err := d.client.requireAccess(ctx); err != nil {
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
// Determine if this is a read or write operation
isWriteOperation := d.isWriteOperation(sql)
@ -260,6 +264,10 @@ func (d *DatabaseClientImpl) Transaction(ctx context.Context, queries []string)
return fmt.Errorf("client not connected")
}
if err := d.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
maxRetries := 3
var lastErr error
@ -298,6 +306,10 @@ func (d *DatabaseClientImpl) CreateTable(ctx context.Context, schema string) err
return err
}
if err := d.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
return d.withRetry(func(conn *gorqlite.Connection) error {
_, err := conn.WriteOne(schema)
return err
@ -323,6 +335,10 @@ func (d *DatabaseClientImpl) GetSchema(ctx context.Context) (*SchemaInfo, error)
return nil, fmt.Errorf("client not connected")
}
if err := d.client.requireAccess(ctx); err != nil {
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
// Get RQLite connection
conn, err := d.getRQLiteConnection()
if err != nil {
@ -396,6 +412,10 @@ func (s *StorageClientImpl) Get(ctx context.Context, key string) ([]byte, error)
return nil, fmt.Errorf("client not connected")
}
if err := s.client.requireAccess(ctx); err != nil {
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
return s.storageClient.Get(ctx, key)
}
@ -405,6 +425,10 @@ func (s *StorageClientImpl) Put(ctx context.Context, key string, value []byte) e
return fmt.Errorf("client not connected")
}
if err := s.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
err := s.storageClient.Put(ctx, key, value)
if err != nil {
return err
@ -419,6 +443,10 @@ func (s *StorageClientImpl) Delete(ctx context.Context, key string) error {
return fmt.Errorf("client not connected")
}
if err := s.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
err := s.storageClient.Delete(ctx, key)
if err != nil {
return err
@ -433,6 +461,10 @@ func (s *StorageClientImpl) List(ctx context.Context, prefix string, limit int)
return nil, fmt.Errorf("client not connected")
}
if err := s.client.requireAccess(ctx); err != nil {
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
return s.storageClient.List(ctx, prefix, limit)
}
@ -442,6 +474,10 @@ func (s *StorageClientImpl) Exists(ctx context.Context, key string) (bool, error
return false, fmt.Errorf("client not connected")
}
if err := s.client.requireAccess(ctx); err != nil {
return false, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
return s.storageClient.Exists(ctx, key)
}
@ -456,6 +492,10 @@ func (n *NetworkInfoImpl) GetPeers(ctx context.Context) ([]PeerInfo, error) {
return nil, fmt.Errorf("client not connected")
}
if err := n.client.requireAccess(ctx); err != nil {
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
// Get peers from LibP2P host
host := n.client.host
if host == nil {
@ -512,6 +552,10 @@ func (n *NetworkInfoImpl) GetStatus(ctx context.Context) (*NetworkStatus, error)
return nil, fmt.Errorf("client not connected")
}
if err := n.client.requireAccess(ctx); err != nil {
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
host := n.client.host
if host == nil {
return nil, fmt.Errorf("no host available")
@ -551,6 +595,10 @@ func (n *NetworkInfoImpl) ConnectToPeer(ctx context.Context, peerAddr string) er
return fmt.Errorf("client not connected")
}
if err := n.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
host := n.client.host
if host == nil {
return fmt.Errorf("no host available")
@ -582,6 +630,10 @@ func (n *NetworkInfoImpl) DisconnectFromPeer(ctx context.Context, peerID string)
return fmt.Errorf("client not connected")
}
if err := n.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
host := n.client.host
if host == nil {
return fmt.Errorf("no host available")

View File

@ -129,6 +129,8 @@ type ClientConfig struct {
RetryAttempts int `json:"retry_attempts"`
RetryDelay time.Duration `json:"retry_delay"`
QuietMode bool `json:"quiet_mode"` // Suppress debug/info logs
APIKey string `json:"api_key"` // API key for gateway auth
JWT string `json:"jwt"` // Optional JWT bearer token
}
// DefaultClientConfig returns a default client configuration
@ -145,5 +147,8 @@ func DefaultClientConfig(appName string) *ClientConfig {
ConnectTimeout: time.Second * 30,
RetryAttempts: 3,
RetryDelay: time.Second * 5,
QuietMode: false,
APIKey: "",
JWT: "",
}
}

View File

@ -2,16 +2,21 @@ package client
import (
"context"
"fmt"
"git.debros.io/DeBros/network/pkg/pubsub"
)
// pubSubBridge bridges between our PubSubClient interface and the pubsub package
type pubSubBridge struct {
client *Client
adapter *pubsub.ClientAdapter
}
func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
if err := p.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
// Convert our MessageHandler to the pubsub package MessageHandler
pubsubHandler := func(topic string, data []byte) error {
return handler(topic, data)
@ -20,13 +25,22 @@ func (p *pubSubBridge) Subscribe(ctx context.Context, topic string, handler Mess
}
func (p *pubSubBridge) Publish(ctx context.Context, topic string, data []byte) error {
if err := p.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
return p.adapter.Publish(ctx, topic, data)
}
func (p *pubSubBridge) Unsubscribe(ctx context.Context, topic string) error {
if err := p.client.requireAccess(ctx); err != nil {
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
return p.adapter.Unsubscribe(ctx, topic)
}
func (p *pubSubBridge) ListTopics(ctx context.Context) ([]string, error) {
if err := p.client.requireAccess(ctx); err != nil {
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
}
return p.adapter.ListTopics(ctx)
}

View File

@ -110,7 +110,7 @@ func DefaultConfig() *Config {
},
Discovery: DiscoveryConfig{
BootstrapPeers: []string{
"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWRjaa3STPr2PDVai1eqZ2KEc942sbJpxcd42qSAc1P9A2",
"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWGqqR8bxgmYsYrGYMKnUWwZUCpioLmA3H37ggRDnAiFa7",
},
BootstrapPort: 4001, // Default LibP2P port
DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing

View File

@ -0,0 +1,168 @@
package gateway
import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"net/http"
"strings"
"git.debros.io/DeBros/network/pkg/storage"
)
// appsHandler implements minimal CRUD for apps within a namespace.
// Routes handled:
// - GET /v1/apps -> list
// - POST /v1/apps -> create
// - GET /v1/apps/{app_id} -> fetch
// - PUT /v1/apps/{app_id} -> update (name/public_key)
// - DELETE /v1/apps/{app_id} -> delete
func (g *Gateway) appsHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
ns := g.cfg.ClientNamespace
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
if strings.TrimSpace(ns) == "" { ns = "default" }
db := g.client.Database()
nsID, err := g.resolveNamespaceID(ctx, ns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
path := r.URL.Path
// Determine if operating on collection or single resource
if path == "/v1/apps" || path == "/v1/apps/" {
switch r.Method {
case http.MethodGet:
// List apps
res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? ORDER BY created_at DESC", nsID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
items := make([]map[string]any, 0, res.Count)
for _, row := range res.Rows {
item := map[string]any{
"app_id": row[0],
"name": row[1],
"public_key": row[2],
"namespace": ns,
"created_at": row[3],
}
items = append(items, item)
}
writeJSON(w, http.StatusOK, map[string]any{"items": items, "count": len(items)})
return
case http.MethodPost:
// Create app with provided name/public_key
var req struct {
Name string `json:"name"`
PublicKey string `json:"public_key"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
// Generate app_id
buf := make([]byte, 12)
if _, err := rand.Read(buf); err != nil {
writeError(w, http.StatusInternalServerError, "failed to generate app id")
return
}
appID := "app_" + base64.RawURLEncoding.EncodeToString(buf)
if _, err := db.Query(ctx, "INSERT INTO apps(namespace_id, app_id, name, public_key) VALUES (?, ?, ?, ?)", nsID, appID, req.Name, req.PublicKey); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{
"app_id": appID,
"name": req.Name,
"public_key": req.PublicKey,
"namespace": ns,
})
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
// Single resource: /v1/apps/{app_id}
if strings.HasPrefix(path, "/v1/apps/") {
appID := strings.TrimPrefix(path, "/v1/apps/")
appID = strings.TrimSpace(appID)
if appID == "" {
writeError(w, http.StatusBadRequest, "missing app_id")
return
}
switch r.Method {
case http.MethodGet:
res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? AND app_id = ? LIMIT 1", nsID, appID)
if err != nil || res == nil || res.Count == 0 {
writeError(w, http.StatusNotFound, "app not found")
return
}
row := res.Rows[0]
writeJSON(w, http.StatusOK, map[string]any{
"app_id": row[0],
"name": row[1],
"public_key": row[2],
"namespace": ns,
"created_at": row[3],
})
return
case http.MethodPut:
var req struct {
Name *string `json:"name"`
PublicKey *string `json:"public_key"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
// Build update dynamically
sets := make([]string, 0, 2)
args := make([]any, 0, 4)
if req.Name != nil {
sets = append(sets, "name = ?")
args = append(args, *req.Name)
}
if req.PublicKey != nil {
sets = append(sets, "public_key = ?")
args = append(args, *req.PublicKey)
}
if len(sets) == 0 {
writeError(w, http.StatusBadRequest, "no fields to update")
return
}
q := "UPDATE apps SET " + strings.Join(sets, ", ") + " WHERE namespace_id = ? AND app_id = ?"
args = append(args, nsID, appID)
if _, err := db.Query(ctx, q, args...); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
return
case http.MethodDelete:
if _, err := db.Query(ctx, "DELETE FROM apps WHERE namespace_id = ? AND app_id = ?", nsID, appID); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
writeError(w, http.StatusNotFound, "not found")
}

1028
pkg/gateway/auth_handlers.go Normal file

File diff suppressed because it is too large Load Diff

23
pkg/gateway/db_helpers.go Normal file
View File

@ -0,0 +1,23 @@
package gateway
import (
"context"
"git.debros.io/DeBros/network/pkg/client"
)
func (g *Gateway) resolveNamespaceID(ctx context.Context, ns string) (interface{}, error) {
// Use internal context to bypass authentication for system operations
internalCtx := client.WithInternalAuth(ctx)
db := g.client.Database()
if _, err := db.Query(internalCtx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil {
return nil, err
}
res, err := db.Query(internalCtx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns)
if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 {
return nil, err
}
return res.Rows[0][0], nil
}
// Deprecated: seeding API keys from config is removed.

142
pkg/gateway/gateway.go Normal file
View File

@ -0,0 +1,142 @@
package gateway
import (
"context"
"crypto/rand"
"crypto/rsa"
"net/http"
"strconv"
"time"
"git.debros.io/DeBros/network/pkg/client"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
// Config holds configuration for the gateway server
type Config struct {
ListenAddr string
ClientNamespace string
BootstrapPeers []string
}
type Gateway struct {
logger *logging.ColoredLogger
cfg *Config
client client.NetworkClient
startedAt time.Time
signingKey *rsa.PrivateKey
keyID string
}
// New creates and initializes a new Gateway instance
func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
logger.ComponentInfo(logging.ComponentGeneral, "Building client config...")
// Build client config from gateway cfg
cliCfg := client.DefaultClientConfig(cfg.ClientNamespace)
if len(cfg.BootstrapPeers) > 0 {
cliCfg.BootstrapPeers = cfg.BootstrapPeers
}
logger.ComponentInfo(logging.ComponentGeneral, "Creating network client...")
c, err := client.NewClient(cliCfg)
if err != nil {
logger.ComponentError(logging.ComponentClient, "failed to create network client", zap.Error(err))
return nil, err
}
logger.ComponentInfo(logging.ComponentGeneral, "Connecting network client...")
if err := c.Connect(); err != nil {
logger.ComponentError(logging.ComponentClient, "failed to connect network client", zap.Error(err))
return nil, err
}
logger.ComponentInfo(logging.ComponentClient, "Network client connected",
zap.String("namespace", cliCfg.AppName),
zap.Int("bootstrap_peer_count", len(cliCfg.BootstrapPeers)),
)
logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway instance...")
gw := &Gateway{
logger: logger,
cfg: cfg,
client: c,
startedAt: time.Now(),
}
logger.ComponentInfo(logging.ComponentGeneral, "Generating RSA signing key...")
// Generate local RSA signing key for JWKS/JWT (ephemeral for now)
if key, err := rsa.GenerateKey(rand.Reader, 2048); err == nil {
gw.signingKey = key
gw.keyID = "gw-" + strconv.FormatInt(time.Now().Unix(), 10)
logger.ComponentInfo(logging.ComponentGeneral, "RSA key generated successfully")
} else {
logger.ComponentWarn(logging.ComponentGeneral, "failed to generate RSA key; jwks will be empty", zap.Error(err))
}
logger.ComponentInfo(logging.ComponentGeneral, "Starting database migrations goroutine...")
// Non-blocking DB migrations: probe RQLite; if reachable, apply migrations asynchronously
go func() {
if gw.probeRQLiteReachable(3 * time.Second) {
internalCtx := gw.withInternalAuth(context.Background())
if err := gw.applyMigrations(internalCtx); err != nil {
if err == errNoMigrationsFound {
if err2 := gw.applyAutoMigrations(internalCtx); err2 != nil {
logger.ComponentWarn(logging.ComponentDatabase, "auto migrations failed", zap.Error(err2))
} else {
logger.ComponentInfo(logging.ComponentDatabase, "auto migrations applied")
}
} else {
logger.ComponentWarn(logging.ComponentDatabase, "migrations failed", zap.Error(err))
}
} else {
logger.ComponentInfo(logging.ComponentDatabase, "migrations applied")
}
} else {
logger.ComponentWarn(logging.ComponentDatabase, "RQLite not reachable; skipping migrations for now")
}
}()
logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...")
return gw, nil
}
// withInternalAuth creates a context for internal gateway operations that bypass authentication
func (g *Gateway) withInternalAuth(ctx context.Context) context.Context {
return client.WithInternalAuth(ctx)
}
// probeRQLiteReachable performs a quick GET /status against candidate endpoints with a short timeout.
func (g *Gateway) probeRQLiteReachable(timeout time.Duration) bool {
endpoints := client.DefaultDatabaseEndpoints()
httpClient := &http.Client{Timeout: timeout}
for _, ep := range endpoints {
url := ep
if url == "" {
continue
}
if url[len(url)-1] == '/' {
url = url[:len(url)-1]
}
reqURL := url + "/status"
resp, err := httpClient.Get(reqURL)
if err != nil {
continue
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return true
}
}
return false
}
// Close disconnects the gateway client
func (g *Gateway) Close() {
if g.client != nil {
if err := g.client.Disconnect(); err != nil {
g.logger.ComponentWarn(logging.ComponentClient, "error during client disconnect", zap.Error(err))
}
}
}

View File

@ -0,0 +1,35 @@
package gateway
import (
"encoding/json"
"net/http"
)
type statusResponseWriter struct {
http.ResponseWriter
status int
bytes int
}
func (w *statusResponseWriter) WriteHeader(code int) {
w.status = code
w.ResponseWriter.WriteHeader(code)
}
func (w *statusResponseWriter) Write(b []byte) (int, error) {
n, err := w.ResponseWriter.Write(b)
w.bytes += n
return n, err
}
// writeJSON writes JSON with status code
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
// writeError writes a standardized JSON error
func writeError(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]any{"error": msg})
}

157
pkg/gateway/jwt.go Normal file
View File

@ -0,0 +1,157 @@
package gateway
import (
"crypto"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"net/http"
"strings"
"time"
)
func (g *Gateway) jwksHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if g.signingKey == nil {
_ = json.NewEncoder(w).Encode(map[string]any{"keys": []any{}})
return
}
pub := g.signingKey.Public().(*rsa.PublicKey)
n := pub.N.Bytes()
// Encode exponent as big-endian bytes
eVal := pub.E
eb := make([]byte, 0)
for eVal > 0 {
eb = append([]byte{byte(eVal & 0xff)}, eb...)
eVal >>= 8
}
if len(eb) == 0 {
eb = []byte{0}
}
jwk := map[string]string{
"kty": "RSA",
"use": "sig",
"alg": "RS256",
"kid": g.keyID,
"n": base64.RawURLEncoding.EncodeToString(n),
"e": base64.RawURLEncoding.EncodeToString(eb),
}
_ = json.NewEncoder(w).Encode(map[string]any{"keys": []any{jwk}})
}
// Internal types for JWT handling
type jwtHeader struct {
Alg string `json:"alg"`
Typ string `json:"typ"`
Kid string `json:"kid"`
}
type jwtClaims struct {
Iss string `json:"iss"`
Sub string `json:"sub"`
Aud string `json:"aud"`
Iat int64 `json:"iat"`
Nbf int64 `json:"nbf"`
Exp int64 `json:"exp"`
Namespace string `json:"namespace"`
}
// parseAndVerifyJWT verifies an RS256 JWT created by this gateway and returns claims
func (g *Gateway) parseAndVerifyJWT(token string) (*jwtClaims, error) {
if g.signingKey == nil {
return nil, errors.New("signing key unavailable")
}
parts := strings.Split(token, ".")
if len(parts) != 3 {
return nil, errors.New("invalid token format")
}
hb, err := base64.RawURLEncoding.DecodeString(parts[0])
if err != nil {
return nil, errors.New("invalid header encoding")
}
pb, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
return nil, errors.New("invalid payload encoding")
}
sb, err := base64.RawURLEncoding.DecodeString(parts[2])
if err != nil {
return nil, errors.New("invalid signature encoding")
}
var header jwtHeader
if err := json.Unmarshal(hb, &header); err != nil {
return nil, errors.New("invalid header json")
}
if header.Alg != "RS256" {
return nil, errors.New("unsupported alg")
}
// Verify signature
signingInput := parts[0] + "." + parts[1]
sum := sha256.Sum256([]byte(signingInput))
pub := g.signingKey.Public().(*rsa.PublicKey)
if err := rsa.VerifyPKCS1v15(pub, crypto.SHA256, sum[:], sb); err != nil {
return nil, errors.New("invalid signature")
}
// Parse claims
var claims jwtClaims
if err := json.Unmarshal(pb, &claims); err != nil {
return nil, errors.New("invalid claims json")
}
// Validate issuer
if claims.Iss != "debros-gateway" {
return nil, errors.New("invalid issuer")
}
// Validate registered claims
now := time.Now().Unix()
// allow small clock skew ±60s
const skew = int64(60)
if claims.Nbf != 0 && now+skew < claims.Nbf {
return nil, errors.New("token not yet valid")
}
if claims.Exp != 0 && now-skew > claims.Exp {
return nil, errors.New("token expired")
}
if claims.Iat != 0 && claims.Iat-skew > now {
return nil, errors.New("invalid iat")
}
if claims.Aud != "gateway" {
return nil, errors.New("invalid audience")
}
return &claims, nil
}
func (g *Gateway) generateJWT(ns, subject string, ttl time.Duration) (string, int64, error) {
if g.signingKey == nil {
return "", 0, errors.New("signing key unavailable")
}
header := map[string]string{
"alg": "RS256",
"typ": "JWT",
"kid": g.keyID,
}
hb, _ := json.Marshal(header)
now := time.Now().UTC()
exp := now.Add(ttl)
payload := map[string]any{
"iss": "debros-gateway",
"sub": subject,
"aud": "gateway",
"iat": now.Unix(),
"nbf": now.Unix(),
"exp": exp.Unix(),
"namespace": ns,
}
pb, _ := json.Marshal(payload)
hb64 := base64.RawURLEncoding.EncodeToString(hb)
pb64 := base64.RawURLEncoding.EncodeToString(pb)
signingInput := hb64 + "." + pb64
sum := sha256.Sum256([]byte(signingInput))
sig, err := rsa.SignPKCS1v15(rand.Reader, g.signingKey, crypto.SHA256, sum[:])
if err != nil {
return "", 0, err
}
sb64 := base64.RawURLEncoding.EncodeToString(sig)
return signingInput + "." + sb64, exp.Unix(), nil
}

354
pkg/gateway/middleware.go Normal file
View File

@ -0,0 +1,354 @@
package gateway
import (
"context"
"encoding/json"
"net"
"net/http"
"strconv"
"strings"
"time"
"git.debros.io/DeBros/network/pkg/logging"
"git.debros.io/DeBros/network/pkg/storage"
"go.uber.org/zap"
)
// context keys for request-scoped auth metadata (private to package)
type contextKey string
const (
ctxKeyAPIKey contextKey = "api_key"
ctxKeyJWT contextKey = "jwt_claims"
)
// withMiddleware adds CORS and logging middleware
func (g *Gateway) withMiddleware(next http.Handler) http.Handler {
// Order: logging (outermost) -> CORS -> auth -> handler
// Add authorization layer after auth to enforce namespace ownership
return g.loggingMiddleware(g.corsMiddleware(g.authMiddleware(g.authorizationMiddleware(next))))
}
// loggingMiddleware logs basic request info and duration
func (g *Gateway) loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
srw := &statusResponseWriter{ResponseWriter: w, status: http.StatusOK}
next.ServeHTTP(srw, r)
dur := time.Since(start)
g.logger.ComponentInfo(logging.ComponentGeneral, "request",
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.Int("status", srw.status),
zap.Int("bytes", srw.bytes),
zap.String("duration", dur.String()),
)
// Persist request log asynchronously (best-effort)
go g.persistRequestLog(r, srw, dur)
})
}
// authMiddleware enforces auth when enabled via config.
// Accepts:
// - Authorization: Bearer <JWT> (RS256 issued by this gateway)
// - Authorization: Bearer <API key> or ApiKey <API key>
// - X-API-Key: <API key>
func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Allow preflight without auth
if r.Method == http.MethodOptions {
next.ServeHTTP(w, r)
return
}
// Allow public endpoints without auth
if isPublicPath(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
// 1) Try JWT Bearer first if Authorization looks like one
if auth := r.Header.Get("Authorization"); auth != "" {
lower := strings.ToLower(auth)
if strings.HasPrefix(lower, "bearer ") {
tok := strings.TrimSpace(auth[len("Bearer "):])
if strings.Count(tok, ".") == 2 {
if claims, err := g.parseAndVerifyJWT(tok); err == nil {
// Attach JWT claims and namespace to context
ctx := context.WithValue(r.Context(), ctxKeyJWT, claims)
if ns := strings.TrimSpace(claims.Namespace); ns != "" {
ctx = storage.WithNamespace(ctx, ns)
}
next.ServeHTTP(w, r.WithContext(ctx))
return
}
// If it looked like a JWT but failed verification, fall through to API key check
}
}
}
// 2) Fallback to API key (validate against DB)
key := extractAPIKey(r)
if key == "" {
w.Header().Set("WWW-Authenticate", "Bearer realm=\"gateway\", charset=\"UTF-8\"")
writeError(w, http.StatusUnauthorized, "missing API key")
return
}
// Look up API key in DB and derive namespace
db := g.client.Database()
ctx := r.Context()
// Join to namespaces to resolve name in one query
q := "SELECT namespaces.name FROM api_keys JOIN namespaces ON api_keys.namespace_id = namespaces.id WHERE api_keys.key = ? LIMIT 1"
res, err := db.Query(ctx, q, key)
if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 {
w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"")
writeError(w, http.StatusUnauthorized, "invalid API key")
return
}
// Extract namespace name
var ns string
if s, ok := res.Rows[0][0].(string); ok {
ns = strings.TrimSpace(s)
} else {
b, _ := json.Marshal(res.Rows[0][0])
_ = json.Unmarshal(b, &ns)
ns = strings.TrimSpace(ns)
}
if ns == "" {
w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"")
writeError(w, http.StatusUnauthorized, "invalid API key")
return
}
// Attach auth metadata to context for downstream use
ctx = context.WithValue(ctx, ctxKeyAPIKey, key)
ctx = storage.WithNamespace(ctx, ns)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
// extractAPIKey extracts API key from Authorization or X-API-Key
func extractAPIKey(r *http.Request) string {
// Prefer Authorization header
auth := r.Header.Get("Authorization")
if auth != "" {
// Support "Bearer <token>" and "ApiKey <token>"
lower := strings.ToLower(auth)
if strings.HasPrefix(lower, "bearer ") {
return strings.TrimSpace(auth[len("Bearer "):])
}
if strings.HasPrefix(lower, "apikey ") {
return strings.TrimSpace(auth[len("ApiKey "):])
}
// If header has no scheme, treat the whole value as token (lenient for dev)
if !strings.Contains(auth, " ") {
return strings.TrimSpace(auth)
}
}
// Fallback header
if v := strings.TrimSpace(r.Header.Get("X-API-Key")); v != "" {
return v
}
return ""
}
// isPublicPath returns true for routes that should be accessible without API key auth
func isPublicPath(p string) bool {
switch p {
case "/health", "/v1/health", "/status", "/v1/status", "/v1/auth/jwks", "/.well-known/jwks.json", "/v1/version", "/v1/auth/login", "/v1/auth/challenge", "/v1/auth/verify", "/v1/auth/register", "/v1/auth/refresh", "/v1/auth/logout", "/v1/auth/api-key":
return true
default:
return false
}
}
// authorizationMiddleware enforces that the authenticated actor owns the namespace
// for certain protected paths (e.g., apps CRUD and storage APIs).
func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Skip for public/OPTIONS paths only
if r.Method == http.MethodOptions || isPublicPath(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
// Exempt whoami from ownership enforcement so users can inspect their session
if r.URL.Path == "/v1/auth/whoami" {
next.ServeHTTP(w, r)
return
}
// Only enforce for specific resource paths
if !requiresNamespaceOwnership(r.URL.Path) {
next.ServeHTTP(w, r)
return
}
// Determine namespace from context
ctx := r.Context()
ns := ""
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok {
ns = strings.TrimSpace(s)
}
}
if ns == "" && g.cfg != nil {
ns = strings.TrimSpace(g.cfg.ClientNamespace)
}
if ns == "" {
writeError(w, http.StatusForbidden, "namespace not resolved")
return
}
// Identify actor from context
ownerType := ""
ownerID := ""
if v := ctx.Value(ctxKeyJWT); v != nil {
if claims, ok := v.(*jwtClaims); ok && claims != nil && strings.TrimSpace(claims.Sub) != "" {
ownerType = "wallet"
ownerID = strings.TrimSpace(claims.Sub)
}
}
if ownerType == "" && ownerID == "" {
if v := ctx.Value(ctxKeyAPIKey); v != nil {
if s, ok := v.(string); ok && strings.TrimSpace(s) != "" {
ownerType = "api_key"
ownerID = strings.TrimSpace(s)
}
}
}
if ownerType == "" || ownerID == "" {
writeError(w, http.StatusForbidden, "missing identity")
return
}
// Check ownership in DB
db := g.client.Database()
// Ensure namespace exists and get id
if _, err := db.Query(ctx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
nres, err := db.Query(ctx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns)
if err != nil || nres == nil || nres.Count == 0 || len(nres.Rows) == 0 || len(nres.Rows[0]) == 0 {
writeError(w, http.StatusForbidden, "namespace not found")
return
}
nsID := nres.Rows[0][0]
q := "SELECT 1 FROM namespace_ownership WHERE namespace_id = ? AND owner_type = ? AND owner_id = ? LIMIT 1"
res, err := db.Query(ctx, q, nsID, ownerType, ownerID)
if err != nil || res == nil || res.Count == 0 {
writeError(w, http.StatusForbidden, "forbidden: not an owner of namespace")
return
}
next.ServeHTTP(w, r)
})
}
// requiresNamespaceOwnership returns true if the path should be guarded by
// namespace ownership checks.
func requiresNamespaceOwnership(p string) bool {
if p == "/storage" || p == "/v1/storage" || strings.HasPrefix(p, "/v1/storage/") {
return true
}
if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") {
return true
}
if strings.HasPrefix(p, "/v1/pubsub") {
return true
}
return false
}
// corsMiddleware applies permissive CORS headers suitable for early development
func (g *Gateway) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-API-Key")
w.Header().Set("Access-Control-Max-Age", strconv.Itoa(600))
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
// persistRequestLog writes request metadata to the database (best-effort)
func (g *Gateway) persistRequestLog(r *http.Request, srw *statusResponseWriter, dur time.Duration) {
if g.client == nil {
return
}
// Use a short timeout to avoid blocking shutdowns
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
db := g.client.Database()
// Resolve API key ID if available
var apiKeyID interface{} = nil
if v := r.Context().Value(ctxKeyAPIKey); v != nil {
if key, ok := v.(string); ok && key != "" {
if res, err := db.Query(ctx, "SELECT id FROM api_keys WHERE key = ? LIMIT 1", key); err == nil {
if res != nil && res.Count > 0 && len(res.Rows) > 0 && len(res.Rows[0]) > 0 {
switch idv := res.Rows[0][0].(type) {
case int64:
apiKeyID = idv
case float64:
apiKeyID = int64(idv)
case int:
apiKeyID = int64(idv)
case string:
// best effort parse
if n, err := strconv.Atoi(idv); err == nil {
apiKeyID = int64(n)
}
}
}
}
}
}
ip := getClientIP(r)
// Insert the log row
_, _ = db.Query(ctx,
"INSERT INTO request_logs (method, path, status_code, bytes_out, duration_ms, ip, api_key_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
r.Method,
r.URL.Path,
srw.status,
srw.bytes,
dur.Milliseconds(),
ip,
apiKeyID,
)
// Update last_used_at for the API key if present
if apiKeyID != nil {
_, _ = db.Query(ctx, "UPDATE api_keys SET last_used_at = CURRENT_TIMESTAMP WHERE id = ?", apiKeyID)
}
}
// getClientIP extracts the client IP from headers or RemoteAddr
func getClientIP(r *http.Request) string {
// X-Forwarded-For may contain a list of IPs, take the first
if xff := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); xff != "" {
parts := strings.Split(xff, ",")
if len(parts) > 0 {
return strings.TrimSpace(parts[0])
}
}
if xr := strings.TrimSpace(r.Header.Get("X-Real-IP")); xr != "" {
return xr
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return r.RemoteAddr
}
return host
}

188
pkg/gateway/migrate.go Normal file
View File

@ -0,0 +1,188 @@
package gateway
import (
"context"
"errors"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"git.debros.io/DeBros/network/pkg/client"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
var errNoMigrationsFound = errors.New("no migrations found")
func (g *Gateway) applyAutoMigrations(ctx context.Context) error {
if g.client == nil {
return nil
}
db := g.client.Database()
// Use internal context to bypass authentication for system migrations
internalCtx := client.WithInternalAuth(ctx)
stmts := []string{
// namespaces
"CREATE TABLE IF NOT EXISTS namespaces (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t name TEXT NOT NULL UNIQUE,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP\n)",
// api_keys
"CREATE TABLE IF NOT EXISTS api_keys (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t key TEXT NOT NULL UNIQUE,\n\t name TEXT,\n\t namespace_id INTEGER NOT NULL,\n\t scopes TEXT,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t last_used_at TIMESTAMP,\n\t FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE\n)",
"CREATE INDEX IF NOT EXISTS idx_api_keys_namespace ON api_keys(namespace_id)",
// request_logs
"CREATE TABLE IF NOT EXISTS request_logs (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t method TEXT NOT NULL,\n\t path TEXT NOT NULL,\n\t status_code INTEGER NOT NULL,\n\t bytes_out INTEGER NOT NULL DEFAULT 0,\n\t duration_ms INTEGER NOT NULL DEFAULT 0,\n\t ip TEXT,\n\t api_key_id INTEGER,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE SET NULL\n)",
"CREATE INDEX IF NOT EXISTS idx_request_logs_api_key ON request_logs(api_key_id)",
"CREATE INDEX IF NOT EXISTS idx_request_logs_created_at ON request_logs(created_at)",
// seed default namespace
"INSERT OR IGNORE INTO namespaces(name) VALUES ('default')",
}
for _, stmt := range stmts {
if _, err := db.Query(internalCtx, stmt); err != nil {
return err
}
}
return nil
}
func (g *Gateway) applyMigrations(ctx context.Context) error {
if g.client == nil {
return nil
}
db := g.client.Database()
// Use internal context to bypass authentication for system migrations
internalCtx := client.WithInternalAuth(ctx)
// Ensure schema_migrations exists first
if _, err := db.Query(internalCtx, "CREATE TABLE IF NOT EXISTS schema_migrations (\n\tversion INTEGER PRIMARY KEY,\n\tapplied_at TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))\n)"); err != nil {
return err
}
// Locate migrations directory relative to CWD
migDir := "migrations"
if fi, err := os.Stat(migDir); err != nil || !fi.IsDir() {
return errNoMigrationsFound
}
entries, err := os.ReadDir(migDir)
if err != nil {
return err
}
type mig struct {
ver int
path string
}
migrations := make([]mig, 0)
for _, e := range entries {
if e.IsDir() {
continue
}
name := e.Name()
if !strings.HasSuffix(strings.ToLower(name), ".sql") {
continue
}
if ver, ok := parseMigrationVersion(name); ok {
migrations = append(migrations, mig{ver: ver, path: filepath.Join(migDir, name)})
}
}
if len(migrations) == 0 {
return errNoMigrationsFound
}
sort.Slice(migrations, func(i, j int) bool { return migrations[i].ver < migrations[j].ver })
// Helper to check if version applied
isApplied := func(ctx context.Context, v int) (bool, error) {
res, err := db.Query(ctx, "SELECT 1 FROM schema_migrations WHERE version = ? LIMIT 1", v)
if err != nil {
return false, err
}
return res != nil && res.Count > 0, nil
}
for _, m := range migrations {
applied, err := isApplied(internalCtx, m.ver)
if err != nil {
return err
}
if applied {
continue
}
// Read and split SQL file into statements
content, err := os.ReadFile(m.path)
if err != nil {
return err
}
stmts := splitSQLStatements(string(content))
for _, s := range stmts {
if s == "" {
continue
}
if _, err := db.Query(internalCtx, s); err != nil {
return err
}
}
// Mark as applied
if _, err := db.Query(internalCtx, "INSERT INTO schema_migrations (version) VALUES (?)", m.ver); err != nil {
return err
}
g.logger.ComponentInfo(logging.ComponentDatabase, "applied migration", zap.Int("version", m.ver), zap.String("file", m.path))
}
return nil
}
func parseMigrationVersion(name string) (int, bool) {
i := 0
for i < len(name) && name[i] >= '0' && name[i] <= '9' {
i++
}
if i == 0 {
return 0, false
}
v, err := strconv.Atoi(name[:i])
if err != nil {
return 0, false
}
return v, true
}
func splitSQLStatements(sqlText string) []string {
lines := strings.Split(sqlText, "\n")
cleaned := make([]string, 0, len(lines))
for _, ln := range lines {
s := strings.TrimSpace(ln)
if s == "" {
continue
}
// Handle inline comments by removing everything after --
if commentIdx := strings.Index(s, "--"); commentIdx >= 0 {
s = strings.TrimSpace(s[:commentIdx])
if s == "" {
continue // line was only a comment
}
}
upper := strings.ToUpper(s)
if upper == "BEGIN;" || upper == "COMMIT;" || upper == "BEGIN" || upper == "COMMIT" {
continue
}
if strings.HasPrefix(upper, "INSERT") && strings.Contains(upper, "SCHEMA_MIGRATIONS") {
// ignore in-file migration markers
continue
}
cleaned = append(cleaned, s)
}
// Join and split by ';'
joined := strings.Join(cleaned, "\n")
parts := strings.Split(joined, ";")
out := make([]string, 0, len(parts))
for _, p := range parts {
sp := strings.TrimSpace(p)
if sp == "" {
continue
}
out = append(out, sp+";")
}
return out
}

View File

@ -0,0 +1,194 @@
package gateway
import (
"encoding/base64"
"encoding/json"
"net/http"
"time"
"git.debros.io/DeBros/network/pkg/storage"
"github.com/gorilla/websocket"
)
var wsUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// For early development we accept any origin; tighten later.
CheckOrigin: func(r *http.Request) bool { return true },
}
// pubsubWebsocketHandler upgrades to WS, subscribes to a namespaced topic, and
// forwards received PubSub messages to the client. Messages sent by the client
// are published to the same namespaced topic.
func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
// Resolve namespace from auth context
ns := resolveNamespaceFromRequest(r)
if ns == "" {
writeError(w, http.StatusForbidden, "namespace not resolved")
return
}
topic := r.URL.Query().Get("topic")
if topic == "" {
writeError(w, http.StatusBadRequest, "missing 'topic'")
return
}
fullTopic := namespacedTopic(ns, topic)
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
// Channel to deliver PubSub messages to WS writer
msgs := make(chan []byte, 128)
ctx := r.Context()
// Subscribe to the topic; push data into msgs
h := func(_ string, data []byte) error {
select {
case msgs <- data:
return nil
default:
// Drop if client is slow to avoid blocking network
return nil
}
}
if err := g.client.PubSub().Subscribe(ctx, fullTopic, h); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
defer func() { _ = g.client.PubSub().Unsubscribe(ctx, fullTopic) }()
// Writer loop
done := make(chan struct{})
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case b, ok := <-msgs:
if !ok {
_ = conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(5*time.Second))
close(done)
return
}
conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
if err := conn.WriteMessage(websocket.BinaryMessage, b); err != nil {
close(done)
return
}
case <-ticker.C:
// Ping keepalive
_ = conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(5*time.Second))
case <-ctx.Done():
close(done)
return
}
}
}()
// Reader loop: treat any client message as publish to the same topic
for {
mt, data, err := conn.ReadMessage()
if err != nil {
break
}
if mt != websocket.TextMessage && mt != websocket.BinaryMessage {
continue
}
if err := g.client.PubSub().Publish(ctx, fullTopic, data); err != nil {
// Best-effort notify client
_ = conn.WriteMessage(websocket.TextMessage, []byte("publish_error"))
}
}
<-done
}
// pubsubPublishHandler handles POST /v1/pubsub/publish {topic, data_base64}
func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
ns := resolveNamespaceFromRequest(r)
if ns == "" {
writeError(w, http.StatusForbidden, "namespace not resolved")
return
}
var body struct {
Topic string `json:"topic"`
DataB64 string `json:"data_base64"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Topic == "" || body.DataB64 == "" {
writeError(w, http.StatusBadRequest, "invalid body: expected {topic,data_base64}")
return
}
data, err := base64.StdEncoding.DecodeString(body.DataB64)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid base64 data")
return
}
if err := g.client.PubSub().Publish(r.Context(), namespacedTopic(ns, body.Topic), data); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
// pubsubTopicsHandler lists topics within the caller's namespace
func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ns := resolveNamespaceFromRequest(r)
if ns == "" {
writeError(w, http.StatusForbidden, "namespace not resolved")
return
}
all, err := g.client.PubSub().ListTopics(r.Context())
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
prefix := namespacePrefix(ns)
var filtered []string
for _, t := range all {
if len(t) >= len(prefix) && t[:len(prefix)] == prefix {
filtered = append(filtered, t[len(prefix):])
}
}
writeJSON(w, http.StatusOK, map[string]any{"topics": filtered})
}
// resolveNamespaceFromRequest gets namespace from context set by auth middleware
func resolveNamespaceFromRequest(r *http.Request) string {
if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok {
return s
}
}
return ""
}
func namespacePrefix(ns string) string {
return "ns::" + ns + "::"
}
func namespacedTopic(ns, topic string) string {
return namespacePrefix(ns) + topic
}

54
pkg/gateway/routes.go Normal file
View File

@ -0,0 +1,54 @@
package gateway
import "net/http"
// Routes returns the http.Handler with all routes and middleware configured
func (g *Gateway) Routes() http.Handler {
mux := http.NewServeMux()
// root and v1 health/status
mux.HandleFunc("/health", g.healthHandler)
mux.HandleFunc("/status", g.statusHandler)
mux.HandleFunc("/v1/health", g.healthHandler)
mux.HandleFunc("/v1/version", g.versionHandler)
mux.HandleFunc("/v1/status", g.statusHandler)
// auth endpoints
mux.HandleFunc("/v1/auth/jwks", g.jwksHandler)
mux.HandleFunc("/.well-known/jwks.json", g.jwksHandler)
mux.HandleFunc("/v1/auth/login", g.loginPageHandler)
mux.HandleFunc("/v1/auth/challenge", g.challengeHandler)
mux.HandleFunc("/v1/auth/verify", g.verifyHandler)
// New: issue JWT from API key; new: create or return API key for a wallet after verification
mux.HandleFunc("/v1/auth/token", g.apiKeyToJWTHandler)
mux.HandleFunc("/v1/auth/api-key", g.issueAPIKeyHandler)
mux.HandleFunc("/v1/auth/register", g.registerHandler)
mux.HandleFunc("/v1/auth/refresh", g.refreshHandler)
mux.HandleFunc("/v1/auth/logout", g.logoutHandler)
mux.HandleFunc("/v1/auth/whoami", g.whoamiHandler)
// apps CRUD
mux.HandleFunc("/v1/apps", g.appsHandler)
mux.HandleFunc("/v1/apps/", g.appsHandler)
// storage
mux.HandleFunc("/v1/storage", g.storageHandler) // legacy/basic
mux.HandleFunc("/v1/storage/get", g.storageGetHandler)
mux.HandleFunc("/v1/storage/put", g.storagePutHandler)
mux.HandleFunc("/v1/storage/delete", g.storageDeleteHandler)
mux.HandleFunc("/v1/storage/list", g.storageListHandler)
mux.HandleFunc("/v1/storage/exists", g.storageExistsHandler)
// network
mux.HandleFunc("/v1/network/status", g.networkStatusHandler)
mux.HandleFunc("/v1/network/peers", g.networkPeersHandler)
mux.HandleFunc("/v1/network/connect", g.networkConnectHandler)
mux.HandleFunc("/v1/network/disconnect", g.networkDisconnectHandler)
// pubsub
mux.HandleFunc("/v1/pubsub/ws", g.pubsubWebsocketHandler)
mux.HandleFunc("/v1/pubsub/publish", g.pubsubPublishHandler)
mux.HandleFunc("/v1/pubsub/topics", g.pubsubTopicsHandler)
return g.withMiddleware(mux)
}

View File

@ -0,0 +1,87 @@
package gateway
import (
"encoding/json"
"net/http"
"time"
"git.debros.io/DeBros/network/pkg/client"
"git.debros.io/DeBros/network/pkg/logging"
"go.uber.org/zap"
)
// Build info (set via -ldflags at build time; defaults for dev)
var (
BuildVersion = "dev"
BuildCommit = ""
BuildTime = ""
)
// healthResponse is the JSON structure used by healthHandler
type healthResponse struct {
Status string `json:"status"`
StartedAt time.Time `json:"started_at"`
Uptime string `json:"uptime"`
}
func (g *Gateway) healthHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
server := healthResponse{
Status: "ok",
StartedAt: g.startedAt,
Uptime: time.Since(g.startedAt).String(),
}
var clientHealth *client.HealthStatus
if g.client != nil {
if h, err := g.client.Health(); err == nil {
clientHealth = h
} else {
g.logger.ComponentWarn(logging.ComponentClient, "failed to fetch client health", zap.Error(err))
}
}
resp := struct {
Status string `json:"status"`
Server healthResponse `json:"server"`
Client *client.HealthStatus `json:"client"`
}{
Status: "ok",
Server: server,
Client: clientHealth,
}
_ = json.NewEncoder(w).Encode(resp)
}
// statusHandler aggregates server uptime and network status
func (g *Gateway) statusHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
status, err := g.client.Network().GetStatus(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"server": map[string]any{
"started_at": g.startedAt,
"uptime": time.Since(g.startedAt).String(),
},
"network": status,
})
}
// versionHandler returns gateway build/runtime information
func (g *Gateway) versionHandler(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{
"version": BuildVersion,
"commit": BuildCommit,
"build_time": BuildTime,
"started_at": g.startedAt,
"uptime": time.Since(g.startedAt).String(),
})
}

View File

@ -0,0 +1,279 @@
package gateway
import (
"encoding/json"
"io"
"net/http"
"strconv"
"git.debros.io/DeBros/network/pkg/storage"
)
func (g *Gateway) storageHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key' query parameter")
return
}
ctx := r.Context()
switch r.Method {
case http.MethodGet:
val, err := g.client.Storage().Get(ctx, key)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(val)
return
case http.MethodPut:
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, http.StatusBadRequest, "failed to read body")
return
}
if err := g.client.Storage().Put(ctx, key, b); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{
"status": "ok",
"key": key,
"size": len(b),
})
return
case http.MethodOptions:
w.WriteHeader(http.StatusNoContent)
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
status, err := g.client.Network().GetStatus(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, status)
}
func (g *Gateway) networkPeersHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
peers, err := g.client.Network().GetPeers(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, peers)
}
func (g *Gateway) storageGetHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key'")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
val, err := g.client.Storage().Get(r.Context(), key)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(val)
}
func (g *Gateway) storagePutHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key'")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, http.StatusBadRequest, "failed to read body")
return
}
if err := g.client.Storage().Put(r.Context(), key, b); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{"status": "ok", "key": key, "size": len(b)})
}
func (g *Gateway) storageDeleteHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
key := r.URL.Query().Get("key")
if key == "" {
var body struct {
Key string `json:"key"`
Namespace string `json:"namespace"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err == nil {
key = body.Key
}
}
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key'")
return
}
if err := g.client.Storage().Delete(r.Context(), key); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "key": key})
}
func (g *Gateway) storageListHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
prefix := r.URL.Query().Get("prefix")
limitStr := r.URL.Query().Get("limit")
limit := 100
if limitStr != "" {
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 {
limit = n
}
}
keys, err := g.client.Storage().List(r.Context(), prefix, limit)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"keys": keys})
}
func (g *Gateway) storageExistsHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key'")
return
}
exists, err := g.client.Storage().Exists(r.Context(), key)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"exists": exists})
}
func (g *Gateway) networkConnectHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var body struct {
Multiaddr string `json:"multiaddr"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Multiaddr == "" {
writeError(w, http.StatusBadRequest, "invalid body: expected {multiaddr}")
return
}
if err := g.client.Network().ConnectToPeer(r.Context(), body.Multiaddr); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
func (g *Gateway) networkDisconnectHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var body struct {
PeerID string `json:"peer_id"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.PeerID == "" {
writeError(w, http.StatusBadRequest, "invalid body: expected {peer_id}")
return
}
if err := g.client.Network().DisconnectFromPeer(r.Context(), body.PeerID); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
func (g *Gateway) validateNamespaceParam(r *http.Request) bool {
qns := r.URL.Query().Get("namespace")
if qns == "" {
return true
}
if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
return s == qns
}
}
// If no namespace in context, disallow explicit namespace param
return false
}

16
pkg/pubsub/context.go Normal file
View File

@ -0,0 +1,16 @@
package pubsub
import "context"
// Context utilities for namespace override
// Keep type unexported and expose the key as exported constant to avoid collisions
// while still allowing other packages to use the exact key value.
type ctxKey string
// CtxKeyNamespaceOverride is the context key used to override namespace per pubsub call
const CtxKeyNamespaceOverride ctxKey = "pubsub_ns_override"
// WithNamespace returns a new context that carries a pubsub namespace override
func WithNamespace(ctx context.Context, ns string) context.Context {
return context.WithValue(ctx, CtxKeyNamespaceOverride, ns)
}

View File

@ -11,7 +11,15 @@ func (m *Manager) Publish(ctx context.Context, topic string, data []byte) error
return fmt.Errorf("pubsub not initialized")
}
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
// Determine namespace (allow per-call override via context)
ns := m.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
// Get or create topic
libp2pTopic, err := m.getOrCreateTopic(namespacedTopic)

View File

@ -13,7 +13,14 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa
return fmt.Errorf("pubsub not initialized")
}
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
// Determine namespace (allow per-call override via context)
ns := m.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
// Check if already subscribed
m.mu.Lock()
@ -86,7 +93,14 @@ func (m *Manager) Unsubscribe(ctx context.Context, topic string) error {
m.mu.Lock()
defer m.mu.Unlock()
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
// Determine namespace (allow per-call override via context)
ns := m.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
namespacedTopic := fmt.Sprintf("%s.%s", ns, topic)
if subscription, exists := m.subscriptions[namespacedTopic]; exists {
// Cancel the subscription context to stop the message handler goroutine
@ -103,7 +117,14 @@ func (m *Manager) ListTopics(ctx context.Context) ([]string, error) {
defer m.mu.RUnlock()
var topics []string
prefix := m.namespace + "."
// Determine namespace (allow per-call override via context)
ns := m.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
prefix := ns + "."
for topic := range m.subscriptions {
if len(topic) > len(prefix) && topic[:len(prefix)] == prefix {

View File

@ -19,6 +19,17 @@ type Client struct {
namespace string
}
// Context utilities for namespace override
type ctxKey string
// CtxKeyNamespaceOverride is the context key used to override namespace per request
const CtxKeyNamespaceOverride ctxKey = "storage_ns_override"
// WithNamespace returns a new context that carries a storage namespace override
func WithNamespace(ctx context.Context, ns string) context.Context {
return context.WithValue(ctx, CtxKeyNamespaceOverride, ns)
}
// NewClient creates a new storage client
func NewClient(h host.Host, namespace string, logger *zap.Logger) *Client {
return &Client{
@ -30,11 +41,17 @@ func NewClient(h host.Host, namespace string, logger *zap.Logger) *Client {
// Put stores a key-value pair in the distributed storage
func (c *Client) Put(ctx context.Context, key string, value []byte) error {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypePut,
Key: key,
Value: value,
Namespace: c.namespace,
Namespace: ns,
}
return c.sendRequest(ctx, request)
@ -42,10 +59,16 @@ func (c *Client) Put(ctx context.Context, key string, value []byte) error {
// Get retrieves a value by key from the distributed storage
func (c *Client) Get(ctx context.Context, key string) ([]byte, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypeGet,
Key: key,
Namespace: c.namespace,
Namespace: ns,
}
response, err := c.sendRequestWithResponse(ctx, request)
@ -62,10 +85,16 @@ func (c *Client) Get(ctx context.Context, key string) ([]byte, error) {
// Delete removes a key from the distributed storage
func (c *Client) Delete(ctx context.Context, key string) error {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypeDelete,
Key: key,
Namespace: c.namespace,
Namespace: ns,
}
return c.sendRequest(ctx, request)
@ -73,11 +102,17 @@ func (c *Client) Delete(ctx context.Context, key string) error {
// List returns keys with a given prefix
func (c *Client) List(ctx context.Context, prefix string, limit int) ([]string, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypeList,
Prefix: prefix,
Limit: limit,
Namespace: c.namespace,
Namespace: ns,
}
response, err := c.sendRequestWithResponse(ctx, request)
@ -94,10 +129,16 @@ func (c *Client) List(ctx context.Context, prefix string, limit int) ([]string,
// Exists checks if a key exists in the distributed storage
func (c *Client) Exists(ctx context.Context, key string) (bool, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypeExists,
Key: key,
Namespace: c.namespace,
Namespace: ns,
}
response, err := c.sendRequestWithResponse(ctx, request)