From a3e0243e540c1207b9710b00d55bd8577ff84863 Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Sat, 23 Aug 2025 11:46:49 +0300 Subject: [PATCH] Add REST database endpoints to gateway with tests and docs Implemented CRUD operations for database tables via REST: create-table, drop-table, query, transaction, and schema retrieval. Included authentication and namespace ownership enforcement. Added comprehensive end-to-end tests for new database routes. Updated documentation with usage examples and migration workflow. --- AI_CONTEXT.md | 13 ++++++ README.md | 32 +++++++++++++ e2e/client_e2e_test.go | 83 +++++++++++++++++++++++++++++++++ e2e/gateway_e2e_test.go | 44 +++++++++++++++++ pkg/gateway/middleware.go | 3 ++ pkg/gateway/routes.go | 7 +++ pkg/gateway/storage_handlers.go | 51 ++++++++++++++++++++ 7 files changed, 233 insertions(+) create mode 100644 e2e/client_e2e_test.go diff --git a/AI_CONTEXT.md b/AI_CONTEXT.md index 57b0bf2..4a277be 100644 --- a/AI_CONTEXT.md +++ b/AI_CONTEXT.md @@ -246,6 +246,19 @@ make run-gateway - `POST /v1/pubsub/publish` → body `{topic, data_base64}` → `{status:"ok"}` - `GET /v1/pubsub/topics` → `{topics:["", ...]}` (names trimmed to caller namespace) +### Database (Gateway) + +- `POST /v1/db/create-table` → `{schema}`: Create tables with SQL DDL +- `POST /v1/db/drop-table` → `{table}`: Drop a table +- `POST /v1/db/query` → `{sql, args?}`: Execute single SQL +- `POST /v1/db/transaction` → `{statements:[...]}`: Apply multiple statements atomically +- `GET /v1/db/schema` → Return tables and columns + +Notes: +- Auth and namespace ownership are enforced for all `/v1/db/*` routes. +- The gateway uses internal DB context for validation and execution to avoid circular auth checks. +- Perform migrations by POSTing DDL statements to `/v1/db/transaction`. + ### Authentication Improvements The gateway authentication system has been significantly enhanced with the following features: diff --git a/README.md b/README.md index 1a33754..e4682e7 100644 --- a/README.md +++ b/README.md @@ -315,6 +315,38 @@ logging: --disable-anonrc # Disable anonymous routing (Tor/SOCKS5) ``` +### Database Operations (Gateway REST) + +```http +POST /v1/db/create-table # Body: {"schema": "CREATE TABLE ..."} +POST /v1/db/drop-table # Body: {"table": "table_name"} +POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]} +POST /v1/db/transaction # Body: {"statements": ["SQL 1", "SQL 2", ...]} +GET /v1/db/schema # Returns current tables and columns +``` + +Common migration workflow: + +```bash +# Add a new table +curl -X POST "$GW/v1/db/create-table" \ + -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)"}' + +# Apply multiple statements atomically +curl -X POST "$GW/v1/db/transaction" \ + -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{"statements":[ + "ALTER TABLE users ADD COLUMN email TEXT", + "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)" + ]}' + +# Verify +curl -X POST "$GW/v1/db/query" \ + -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{"sql":"PRAGMA table_info(users)"}' +``` + ### Authentication The CLI features an enhanced authentication system with automatic wallet detection and multi-wallet support: diff --git a/e2e/client_e2e_test.go b/e2e/client_e2e_test.go new file mode 100644 index 0000000..16bfbc2 --- /dev/null +++ b/e2e/client_e2e_test.go @@ -0,0 +1,83 @@ +//go:build e2e + +package e2e + +import ( + "context" + "fmt" + "os" + "strings" + "testing" + "time" + + "git.debros.io/DeBros/network/pkg/client" +) + +func getenv(k, def string) string { + if v := strings.TrimSpace(os.Getenv(k)); v != "" { + return v + } + return def +} + +func requireEnv(t *testing.T, key string) string { + t.Helper() + v := strings.TrimSpace(os.Getenv(key)) + if v == "" { + t.Skipf("%s not set; skipping", key) + } + return v +} + +func TestClient_Database_CreateQueryMigrate(t *testing.T) { + apiKey := requireEnv(t, "GATEWAY_API_KEY") + namespace := getenv("E2E_CLIENT_NAMESPACE", "default") + + cfg := client.DefaultClientConfig(namespace) + cfg.APIKey = apiKey + cfg.QuietMode = true + + if v := strings.TrimSpace(os.Getenv("E2E_BOOTSTRAP_PEERS")); v != "" { + parts := strings.Split(v, ",") + var peers []string + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { peers = append(peers, p) } + } + cfg.BootstrapPeers = peers + } + if v := strings.TrimSpace(os.Getenv("E2E_RQLITE_NODES")); v != "" { + nodes := strings.Fields(strings.ReplaceAll(v, ",", " ")) + cfg.DatabaseEndpoints = nodes + } + + c, err := client.NewClient(cfg) + if err != nil { t.Fatalf("new client: %v", err) } + if err := c.Connect(); err != nil { t.Fatalf("connect: %v", err) } + t.Cleanup(func(){ _ = c.Disconnect() }) + + // Unique table per run + table := fmt.Sprintf("e2e_items_client_%d", time.Now().UnixNano()) + schema := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)", table) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := c.Database().CreateTable(ctx, schema); err != nil { + t.Fatalf("create table: %v", err) + } + // Insert via transaction + stmts := []string{ + fmt.Sprintf("INSERT INTO %s(name) VALUES ('alpha')", table), + fmt.Sprintf("INSERT INTO %s(name) VALUES ('beta')", table), + } + ctx2, cancel2 := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel2() + if err := c.Database().Transaction(ctx2, stmts); err != nil { + t.Fatalf("transaction: %v", err) + } + // Query rows + ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel3() + res, err := c.Database().Query(ctx3, fmt.Sprintf("SELECT name FROM %s ORDER BY id", table)) + if err != nil { t.Fatalf("query: %v", err) } + if res.Count < 2 { t.Fatalf("expected at least 2 rows, got %d", res.Count) } +} diff --git a/e2e/gateway_e2e_test.go b/e2e/gateway_e2e_test.go index 550086a..7b34aa3 100644 --- a/e2e/gateway_e2e_test.go +++ b/e2e/gateway_e2e_test.go @@ -215,6 +215,50 @@ func TestGateway_PubSub_RestPublishToWS(t *testing.T) { if !found { t.Fatalf("topic %s not found in topics list", topic) } } +func TestGateway_Database_CreateQueryMigrate(t *testing.T) { + key := requireAPIKey(t) + base := gatewayBaseURL() + + // Create table + schema := `CREATE TABLE IF NOT EXISTS e2e_items (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)` + body := fmt.Sprintf(`{"schema":%q}`, schema) + req, _ := http.NewRequest(http.MethodPost, base+"/v1/db/create-table", strings.NewReader(body)) + req.Header = authHeader(key) + resp, err := httpClient().Do(req) + if err != nil { t.Fatalf("create-table do: %v", err) } + resp.Body.Close() + if resp.StatusCode != http.StatusCreated { t.Fatalf("create-table status: %d", resp.StatusCode) } + + // Insert via transaction (simulate migration/data seed) + txBody := `{"statements":["INSERT INTO e2e_items(name) VALUES ('one')","INSERT INTO e2e_items(name) VALUES ('two')"]}` + req, _ = http.NewRequest(http.MethodPost, base+"/v1/db/transaction", strings.NewReader(txBody)) + req.Header = authHeader(key) + resp, err = httpClient().Do(req) + if err != nil { t.Fatalf("tx do: %v", err) } + resp.Body.Close() + if resp.StatusCode != http.StatusOK { t.Fatalf("tx status: %d", resp.StatusCode) } + + // Query rows + qBody := `{"sql":"SELECT name FROM e2e_items ORDER BY id ASC"}` + req, _ = http.NewRequest(http.MethodPost, base+"/v1/db/query", strings.NewReader(qBody)) + req.Header = authHeader(key) + resp, err = httpClient().Do(req) + if err != nil { t.Fatalf("query do: %v", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { t.Fatalf("query status: %d", resp.StatusCode) } + var qr struct { Columns []string `json:"columns"`; Rows [][]any `json:"rows"`; Count int `json:"count"` } + if err := json.NewDecoder(resp.Body).Decode(&qr); err != nil { t.Fatalf("query decode: %v", err) } + if qr.Count < 2 { t.Fatalf("expected at least 2 rows, got %d", qr.Count) } + + // Schema endpoint returns tables + req, _ = http.NewRequest(http.MethodGet, base+"/v1/db/schema", nil) + req.Header = authHeader(key) + resp2, err := httpClient().Do(req) + if err != nil { t.Fatalf("schema do: %v", err) } + defer resp2.Body.Close() + if resp2.StatusCode != http.StatusOK { t.Fatalf("schema status: %d", resp2.StatusCode) } +} + func toWSURL(httpURL string) string { u, err := url.Parse(httpURL) if err != nil { return httpURL } diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 4400395..6194d49 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -264,6 +264,9 @@ func requiresNamespaceOwnership(p string) bool { if strings.HasPrefix(p, "/v1/pubsub") { return true } + if strings.HasPrefix(p, "/v1/db/") { + return true + } return false } diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index 5fd8f63..e90d1f8 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -39,6 +39,13 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/storage/list", g.storageListHandler) mux.HandleFunc("/v1/storage/exists", g.storageExistsHandler) + // database + mux.HandleFunc("/v1/db/query", g.dbQueryHandler) + mux.HandleFunc("/v1/db/transaction", g.dbTransactionHandler) + mux.HandleFunc("/v1/db/schema", g.dbSchemaHandler) + mux.HandleFunc("/v1/db/create-table", g.dbCreateTableHandler) + mux.HandleFunc("/v1/db/drop-table", g.dbDropTableHandler) + // network mux.HandleFunc("/v1/network/status", g.networkStatusHandler) mux.HandleFunc("/v1/network/peers", g.networkPeersHandler) diff --git a/pkg/gateway/storage_handlers.go b/pkg/gateway/storage_handlers.go index 983bbfa..0dec56c 100644 --- a/pkg/gateway/storage_handlers.go +++ b/pkg/gateway/storage_handlers.go @@ -10,6 +10,57 @@ import ( "git.debros.io/DeBros/network/pkg/storage" ) +// Database HTTP handlers +func (g *Gateway) dbQueryHandler(w http.ResponseWriter, r *http.Request) { + if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized"); return } + if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed"); return } + var body struct{ SQL string `json:"sql"`; Args []any `json:"args"` } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.SQL == "" { writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}"); return } + ctx := client.WithInternalAuth(r.Context()) + res, err := g.client.Database().Query(ctx, body.SQL, body.Args...) + if err != nil { writeError(w, http.StatusInternalServerError, err.Error()); return } + writeJSON(w, http.StatusOK, res) +} + +func (g *Gateway) dbTransactionHandler(w http.ResponseWriter, r *http.Request) { + if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized"); return } + if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed"); return } + var body struct{ Statements []string `json:"statements"` } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Statements) == 0 { writeError(w, http.StatusBadRequest, "invalid body: {statements:[...]}"); return } + ctx := client.WithInternalAuth(r.Context()) + if err := g.client.Database().Transaction(ctx, body.Statements); err != nil { writeError(w, http.StatusInternalServerError, err.Error()); return } + writeJSON(w, http.StatusOK, map[string]any{"status":"ok"}) +} + +func (g *Gateway) dbSchemaHandler(w http.ResponseWriter, r *http.Request) { + if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized"); return } + if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed"); return } + ctx := client.WithInternalAuth(r.Context()) + schema, err := g.client.Database().GetSchema(ctx) + if err != nil { writeError(w, http.StatusInternalServerError, err.Error()); return } + writeJSON(w, http.StatusOK, schema) +} + +func (g *Gateway) dbCreateTableHandler(w http.ResponseWriter, r *http.Request) { + if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized"); return } + if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed"); return } + var body struct{ Schema string `json:"schema"` } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Schema == "" { writeError(w, http.StatusBadRequest, "invalid body: {schema}"); return } + ctx := client.WithInternalAuth(r.Context()) + if err := g.client.Database().CreateTable(ctx, body.Schema); err != nil { writeError(w, http.StatusInternalServerError, err.Error()); return } + writeJSON(w, http.StatusCreated, map[string]any{"status":"ok"}) +} + +func (g *Gateway) dbDropTableHandler(w http.ResponseWriter, r *http.Request) { + if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized"); return } + if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed"); return } + var body struct{ Table string `json:"table"` } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Table == "" { writeError(w, http.StatusBadRequest, "invalid body: {table}"); return } + ctx := client.WithInternalAuth(r.Context()) + if err := g.client.Database().DropTable(ctx, body.Table); err != nil { writeError(w, http.StatusInternalServerError, err.Error()); return } + writeJSON(w, http.StatusOK, map[string]any{"status":"ok"}) +} + func (g *Gateway) storageHandler(w http.ResponseWriter, r *http.Request) { if g.client == nil { writeError(w, http.StatusServiceUnavailable, "client not initialized")