diff --git a/CHANGELOG.md b/CHANGELOG.md index 664f1e0..ed22aae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,17 +10,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant - Created new rqlite folder - Created rqlite adapter, client, gateway, migrations and rqlite init +- Created namespace_helpers on gateway +- Created new rqlite implementation ### Changed - Updated node.go to support new rqlite architecture - Updated readme + ### Deprecated ### Removed - Removed old storage folder +- Removed old pkg/gatway storage and migrated to new rqlite ### Fixed diff --git a/README.md b/README.md index a0f038b..ddf6f03 100644 --- a/README.md +++ b/README.md @@ -414,33 +414,65 @@ bootstrap_peers: ### Database Operations (Gateway REST) ```http -POST /v1/db/create-table # Body: {"schema": "CREATE TABLE ..."} -POST /v1/db/drop-table # Body: {"table": "table_name"} -POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]} -POST /v1/db/transaction # Body: {"statements": ["SQL 1", "SQL 2", ...]} -GET /v1/db/schema # Returns current tables and columns +POST /v1/db/exec # Body: {"sql": "INSERT/UPDATE/DELETE/DDL ...", "args": [...]} +POST /v1/db/find # Body: {"table":"...", "criteria":{"col":val,...}, "options":{...}} +POST /v1/db/find-one # Body: same as /find, returns a single row (404 if not found) +POST /v1/db/select # Body: {"table":"...", "select":[...], "where":[...], "joins":[...], "order_by":[...], "limit":N, "offset":N, "one":false} +POST /v1/db/transaction # Body: {"ops":[{"kind":"exec|query","sql":"...","args":[...]}], "return_results": true} +POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]} (legacy-friendly SELECT) +GET /v1/db/schema # Returns tables/views + create SQL +POST /v1/db/create-table # Body: {"schema": "CREATE TABLE ..."} +POST /v1/db/drop-table # Body: {"table": "table_name"} ``` -Common migration workflow: +Common workflows: ```bash -# Add a new table -curl -X POST "$GW/v1/db/create-table" \ +# Exec (INSERT/UPDATE/DELETE/DDL) +curl -X POST "$GW/v1/db/exec" \ -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ - -d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)"}' + -d '{"sql":"INSERT INTO users(name,email) VALUES(?,?)","args":["Alice","alice@example.com"]}' -# Apply multiple statements atomically +# Find (criteria + options) +curl -X POST "$GW/v1/db/find" \ + -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{ + "table":"users", + "criteria":{"active":true}, + "options":{"select":["id","email"],"order_by":["created_at DESC"],"limit":25} + }' + +# Select (fluent builder via JSON) +curl -X POST "$GW/v1/db/select" \ + -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{ + "table":"orders o", + "select":["o.id","o.total","u.email AS user_email"], + "joins":[{"kind":"INNER","table":"users u","on":"u.id = o.user_id"}], + "where":[{"conj":"AND","expr":"o.total > ?","args":[100]}], + "order_by":["o.created_at DESC"], + "limit":10 + }' + +# Transaction (atomic batch) curl -X POST "$GW/v1/db/transaction" \ -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ - -d '{"statements":[ - "ALTER TABLE users ADD COLUMN email TEXT", - "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)" - ]}' + -d '{ + "return_results": true, + "ops": [ + {"kind":"exec","sql":"INSERT INTO users(email) VALUES(?)","args":["bob@example.com"]}, + {"kind":"query","sql":"SELECT last_insert_rowid() AS id","args":[]} + ] + }' -# Verify -curl -X POST "$GW/v1/db/query" \ - -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ - -d '{"sql":"PRAGMA table_info(users)"}' +# Schema +curl "$GW/v1/db/schema" -H "Authorization: Bearer $API_KEY" + +# DDL helpers +curl -X POST "$GW/v1/db/create-table" -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)"}' +curl -X POST "$GW/v1/db/drop-table" -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{"table":"users"}' ``` ### Authentication @@ -540,7 +572,23 @@ GET /v1/auth/whoami # Current auth status POST /v1/auth/api-key # Generate API key (authenticated) ``` +#### RQLite HTTP ORM Gateway (/v1/db) +The gateway now exposes a full HTTP interface over the Go ORM-like client (see `pkg/rqlite/gateway.go`) so you can build SDKs in any language. + +- Base path: `/v1/db` +- Endpoints: + - `POST /v1/db/exec` — Execute write/DDL SQL; returns `{ rows_affected, last_insert_id }` + - `POST /v1/db/find` — Map-based criteria; returns `{ items: [...], count: N }` + - `POST /v1/db/find-one` — Single row; 404 if not found + - `POST /v1/db/select` — Fluent SELECT via JSON (joins, where, order, group, limit, offset) + - `POST /v1/db/transaction` — Atomic batch of exec/query ops, optional per-op results + - `POST /v1/db/query` — Arbitrary SELECT (legacy-friendly), returns `items` + - `GET /v1/db/schema` — List user tables/views + create SQL + - `POST /v1/db/create-table` — Convenience for DDL + - `POST /v1/db/drop-table` — Safe drop (identifier validated) + +Payload examples are shown in the [Database Operations (Gateway REST)](#database-operations-gateway-rest) section. #### Network Operations ```http @@ -575,11 +623,15 @@ GET /v1/pubsub/topics # List active topics ### Key HTTP endpoints for SDKs - **Database** - - Create Table: `POST /v1/db/create-table` `{schema}` → `{status:"ok"}` - - Drop Table: `POST /v1/db/drop-table` `{table}` → `{status:"ok"}` - - Query: `POST /v1/db/query` `{sql, args?}` → `{columns, rows, count}` - - Transaction: `POST /v1/db/transaction` `{statements:[...]}` → `{status:"ok"}` - - Schema: `GET /v1/db/schema` → schema JSON + - Exec: `POST /v1/db/exec` `{sql, args?}` → `{rows_affected,last_insert_id}` + - Find: `POST /v1/db/find` `{table, criteria, options?}` → `{items,count}` + - FindOne: `POST /v1/db/find-one` `{table, criteria, options?}` → single object or 404 + - Select: `POST /v1/db/select` `{table, select?, joins?, where?, order_by?, group_by?, limit?, offset?, one?}` + - Transaction: `POST /v1/db/transaction` `{ops:[{kind,sql,args?}], return_results?}` + - Query: `POST /v1/db/query` `{sql, args?}` → `{items,count}` + - Schema: `GET /v1/db/schema` + - Create Table: `POST /v1/db/create-table` `{schema}` + - Drop Table: `POST /v1/db/drop-table` `{table}` - **PubSub** - WS Subscribe: `GET /v1/pubsub/ws?topic=` - Publish: `POST /v1/pubsub/publish` `{topic, data_base64}` → `{status:"ok"}` diff --git a/pkg/config/config.go b/pkg/config/config.go index de0bcbe..39bef1b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -109,12 +109,11 @@ func DefaultConfig() *Config { }, Discovery: DiscoveryConfig{ BootstrapPeers: []string{ - "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWSHHwEY6cga3ng7tD1rzStAU58ogQXVMX3LZJ6Gqf6dee", - // "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx", - // "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1", - // "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR", - // "/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1", - // "/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4", + "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx", + "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1", + "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR", + "/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1", + "/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4", }, BootstrapPort: 4001, // Default LibP2P port DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing diff --git a/pkg/gateway/apps_handlers.go b/pkg/gateway/apps_handlers.go deleted file mode 100644 index 2b6c533..0000000 --- a/pkg/gateway/apps_handlers.go +++ /dev/null @@ -1,170 +0,0 @@ -package gateway - -import ( - "crypto/rand" - "encoding/base64" - "encoding/json" - "net/http" - "strings" - - "github.com/DeBrosOfficial/network/pkg/storage" -) - -// appsHandler implements minimal CRUD for apps within a namespace. -// Routes handled: -// - GET /v1/apps -> list -// - POST /v1/apps -> create -// - GET /v1/apps/{app_id} -> fetch -// - PUT /v1/apps/{app_id} -> update (name/public_key) -// - DELETE /v1/apps/{app_id} -> delete -func (g *Gateway) appsHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - ctx := r.Context() - ns := g.cfg.ClientNamespace - if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil { - if s, ok := v.(string); ok && s != "" { - ns = s - } - } - if strings.TrimSpace(ns) == "" { - ns = "default" - } - db := g.client.Database() - nsID, err := g.resolveNamespaceID(ctx, ns) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - - path := r.URL.Path - // Determine if operating on collection or single resource - if path == "/v1/apps" || path == "/v1/apps/" { - switch r.Method { - case http.MethodGet: - // List apps - res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? ORDER BY created_at DESC", nsID) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - items := make([]map[string]any, 0, res.Count) - for _, row := range res.Rows { - item := map[string]any{ - "app_id": row[0], - "name": row[1], - "public_key": row[2], - "namespace": ns, - "created_at": row[3], - } - items = append(items, item) - } - writeJSON(w, http.StatusOK, map[string]any{"items": items, "count": len(items)}) - return - case http.MethodPost: - // Create app with provided name/public_key - var req struct { - Name string `json:"name"` - PublicKey string `json:"public_key"` - } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid json body") - return - } - // Generate app_id - buf := make([]byte, 12) - if _, err := rand.Read(buf); err != nil { - writeError(w, http.StatusInternalServerError, "failed to generate app id") - return - } - appID := "app_" + base64.RawURLEncoding.EncodeToString(buf) - if _, err := db.Query(ctx, "INSERT INTO apps(namespace_id, app_id, name, public_key) VALUES (?, ?, ?, ?)", nsID, appID, req.Name, req.PublicKey); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusCreated, map[string]any{ - "app_id": appID, - "name": req.Name, - "public_key": req.PublicKey, - "namespace": ns, - }) - return - default: - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - } - - // Single resource: /v1/apps/{app_id} - if strings.HasPrefix(path, "/v1/apps/") { - appID := strings.TrimPrefix(path, "/v1/apps/") - appID = strings.TrimSpace(appID) - if appID == "" { - writeError(w, http.StatusBadRequest, "missing app_id") - return - } - switch r.Method { - case http.MethodGet: - res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? AND app_id = ? LIMIT 1", nsID, appID) - if err != nil || res == nil || res.Count == 0 { - writeError(w, http.StatusNotFound, "app not found") - return - } - row := res.Rows[0] - writeJSON(w, http.StatusOK, map[string]any{ - "app_id": row[0], - "name": row[1], - "public_key": row[2], - "namespace": ns, - "created_at": row[3], - }) - return - case http.MethodPut: - var req struct { - Name *string `json:"name"` - PublicKey *string `json:"public_key"` - } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid json body") - return - } - // Build update dynamically - sets := make([]string, 0, 2) - args := make([]any, 0, 4) - if req.Name != nil { - sets = append(sets, "name = ?") - args = append(args, *req.Name) - } - if req.PublicKey != nil { - sets = append(sets, "public_key = ?") - args = append(args, *req.PublicKey) - } - if len(sets) == 0 { - writeError(w, http.StatusBadRequest, "no fields to update") - return - } - q := "UPDATE apps SET " + strings.Join(sets, ", ") + " WHERE namespace_id = ? AND app_id = ?" - args = append(args, nsID, appID) - if _, err := db.Query(ctx, q, args...); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) - return - case http.MethodDelete: - if _, err := db.Query(ctx, "DELETE FROM apps WHERE namespace_id = ? AND app_id = ?", nsID, appID); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) - return - default: - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - } - - writeError(w, http.StatusNotFound, "not found") -} diff --git a/pkg/gateway/auth_handlers.go b/pkg/gateway/auth_handlers.go index 0504fee..ceec8e7 100644 --- a/pkg/gateway/auth_handlers.go +++ b/pkg/gateway/auth_handlers.go @@ -12,7 +12,6 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/storage" ethcrypto "github.com/ethereum/go-ethereum/crypto" ) @@ -20,7 +19,7 @@ func (g *Gateway) whoamiHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // Determine namespace (may be overridden by auth layer) ns := g.cfg.ClientNamespace - if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil { + if v := ctx.Value(ctxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok && s != "" { ns = s } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index fbb9a10..4e140ed 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -4,13 +4,16 @@ import ( "context" "crypto/rand" "crypto/rsa" - "net/http" + "database/sql" "strconv" "time" "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/logging" + "github.com/DeBrosOfficial/network/pkg/rqlite" "go.uber.org/zap" + + _ "github.com/rqlite/gorqlite/stdlib" ) // Config holds configuration for the gateway server @@ -18,6 +21,10 @@ type Config struct { ListenAddr string ClientNamespace string BootstrapPeers []string + + // Optional DSN for rqlite database/sql driver, e.g. "http://localhost:4001" + // If empty, defaults to "http://localhost:4001". + RQLiteDSN string } type Gateway struct { @@ -27,6 +34,11 @@ type Gateway struct { startedAt time.Time signingKey *rsa.PrivateKey keyID string + + // rqlite SQL connection and HTTP ORM gateway + sqlDB *sql.DB + ormClient rqlite.Client + ormHTTP *rqlite.HTTPGateway } // New creates and initializes a new Gateway instance @@ -75,28 +87,24 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.ComponentWarn(logging.ComponentGeneral, "failed to generate RSA key; jwks will be empty", zap.Error(err)) } - logger.ComponentInfo(logging.ComponentGeneral, "Starting database migrations goroutine...") - // Non-blocking DB migrations: probe RQLite; if reachable, apply migrations asynchronously - go func() { - if gw.probeRQLiteReachable(3 * time.Second) { - internalCtx := gw.withInternalAuth(context.Background()) - if err := gw.applyMigrations(internalCtx); err != nil { - if err == errNoMigrationsFound { - if err2 := gw.applyAutoMigrations(internalCtx); err2 != nil { - logger.ComponentWarn(logging.ComponentDatabase, "auto migrations failed", zap.Error(err2)) - } else { - logger.ComponentInfo(logging.ComponentDatabase, "auto migrations applied") - } - } else { - logger.ComponentWarn(logging.ComponentDatabase, "migrations failed", zap.Error(err)) - } - } else { - logger.ComponentInfo(logging.ComponentDatabase, "migrations applied") - } - } else { - logger.ComponentWarn(logging.ComponentDatabase, "RQLite not reachable; skipping migrations for now") - } - }() + logger.ComponentInfo(logging.ComponentGeneral, "Initializing RQLite ORM HTTP gateway...") + dsn := cfg.RQLiteDSN + if dsn == "" { + dsn = "http://localhost:4001" + } + db, dbErr := sql.Open("rqlite", dsn) + if dbErr != nil { + logger.ComponentWarn(logging.ComponentGeneral, "failed to open rqlite sql db; http orm gateway disabled", zap.Error(dbErr)) + } else { + gw.sqlDB = db + orm := rqlite.NewClient(db) + gw.ormClient = orm + gw.ormHTTP = rqlite.NewHTTPGateway(orm, "/v1/db") + logger.ComponentInfo(logging.ComponentGeneral, "RQLite ORM HTTP gateway ready", + zap.String("dsn", dsn), + zap.String("base_path", "/v1/db"), + ) + } logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...") return gw, nil @@ -107,31 +115,6 @@ func (g *Gateway) withInternalAuth(ctx context.Context) context.Context { return client.WithInternalAuth(ctx) } -// probeRQLiteReachable performs a quick GET /status against candidate endpoints with a short timeout. -func (g *Gateway) probeRQLiteReachable(timeout time.Duration) bool { - endpoints := client.DefaultDatabaseEndpoints() - httpClient := &http.Client{Timeout: timeout} - for _, ep := range endpoints { - url := ep - if url == "" { - continue - } - if url[len(url)-1] == '/' { - url = url[:len(url)-1] - } - reqURL := url + "/status" - resp, err := httpClient.Get(reqURL) - if err != nil { - continue - } - resp.Body.Close() - if resp.StatusCode == http.StatusOK { - return true - } - } - return false -} - // Close disconnects the gateway client func (g *Gateway) Close() { if g.client != nil { @@ -139,4 +122,7 @@ func (g *Gateway) Close() { g.logger.ComponentWarn(logging.ComponentClient, "error during client disconnect", zap.Error(err)) } } + if g.sqlDB != nil { + _ = g.sqlDB.Close() + } } diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index d2fb209..8f786a3 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -11,7 +11,6 @@ import ( "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/logging" - "github.com/DeBrosOfficial/network/pkg/storage" "go.uber.org/zap" ) @@ -19,8 +18,9 @@ import ( type contextKey string const ( - ctxKeyAPIKey contextKey = "api_key" - ctxKeyJWT contextKey = "jwt_claims" + ctxKeyAPIKey contextKey = "api_key" + ctxKeyJWT contextKey = "jwt_claims" + ctxKeyNamespaceOverride contextKey = "namespace_override" ) // withMiddleware adds CORS and logging middleware @@ -78,7 +78,7 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { // Attach JWT claims and namespace to context ctx := context.WithValue(r.Context(), ctxKeyJWT, claims) if ns := strings.TrimSpace(claims.Namespace); ns != "" { - ctx = storage.WithNamespace(ctx, ns) + ctx = context.WithValue(ctx, ctxKeyNamespaceOverride, ns) } next.ServeHTTP(w, r.WithContext(ctx)) return @@ -125,7 +125,7 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { // Attach auth metadata to context for downstream use reqCtx := context.WithValue(r.Context(), ctxKeyAPIKey, key) - reqCtx = storage.WithNamespace(reqCtx, ns) + reqCtx = context.WithValue(reqCtx, ctxKeyNamespaceOverride, ns) next.ServeHTTP(w, r.WithContext(reqCtx)) }) } @@ -190,7 +190,7 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler { // Determine namespace from context ctx := r.Context() ns := "" - if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil { + if v := ctx.Value(ctxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok { ns = strings.TrimSpace(s) } @@ -265,16 +265,13 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler { // requiresNamespaceOwnership returns true if the path should be guarded by // namespace ownership checks. func requiresNamespaceOwnership(p string) bool { - if p == "/storage" || p == "/v1/storage" || strings.HasPrefix(p, "/v1/storage/") { - return true - } - if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") { + if p == "/rqlite" || p == "/v1/rqlite" || strings.HasPrefix(p, "/v1/rqlite/") { return true } if strings.HasPrefix(p, "/v1/pubsub") { return true } - if strings.HasPrefix(p, "/v1/db/") { + if strings.HasPrefix(p, "/v1/rqlite/") { return true } return false diff --git a/pkg/gateway/migrate.go b/pkg/gateway/migrate.go deleted file mode 100644 index a419210..0000000 --- a/pkg/gateway/migrate.go +++ /dev/null @@ -1,188 +0,0 @@ -package gateway - -import ( - "context" - "errors" - "os" - "path/filepath" - "sort" - "strconv" - "strings" - - "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/logging" - "go.uber.org/zap" -) - -var errNoMigrationsFound = errors.New("no migrations found") - -func (g *Gateway) applyAutoMigrations(ctx context.Context) error { - if g.client == nil { - return nil - } - db := g.client.Database() - - // Use internal context to bypass authentication for system migrations - internalCtx := client.WithInternalAuth(ctx) - - stmts := []string{ - // namespaces - "CREATE TABLE IF NOT EXISTS namespaces (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t name TEXT NOT NULL UNIQUE,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP\n)", - // api_keys - "CREATE TABLE IF NOT EXISTS api_keys (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t key TEXT NOT NULL UNIQUE,\n\t name TEXT,\n\t namespace_id INTEGER NOT NULL,\n\t scopes TEXT,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t last_used_at TIMESTAMP,\n\t FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE\n)", - "CREATE INDEX IF NOT EXISTS idx_api_keys_namespace ON api_keys(namespace_id)", - // request_logs - "CREATE TABLE IF NOT EXISTS request_logs (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t method TEXT NOT NULL,\n\t path TEXT NOT NULL,\n\t status_code INTEGER NOT NULL,\n\t bytes_out INTEGER NOT NULL DEFAULT 0,\n\t duration_ms INTEGER NOT NULL DEFAULT 0,\n\t ip TEXT,\n\t api_key_id INTEGER,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE SET NULL\n)", - "CREATE INDEX IF NOT EXISTS idx_request_logs_api_key ON request_logs(api_key_id)", - "CREATE INDEX IF NOT EXISTS idx_request_logs_created_at ON request_logs(created_at)", - // seed default namespace - "INSERT OR IGNORE INTO namespaces(name) VALUES ('default')", - } - - for _, stmt := range stmts { - if _, err := db.Query(internalCtx, stmt); err != nil { - return err - } - } - return nil -} - -func (g *Gateway) applyMigrations(ctx context.Context) error { - if g.client == nil { - return nil - } - db := g.client.Database() - - // Use internal context to bypass authentication for system migrations - internalCtx := client.WithInternalAuth(ctx) - - // Ensure schema_migrations exists first - if _, err := db.Query(internalCtx, "CREATE TABLE IF NOT EXISTS schema_migrations (\n\tversion INTEGER PRIMARY KEY,\n\tapplied_at TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))\n)"); err != nil { - return err - } - - // Locate migrations directory relative to CWD - migDir := "migrations" - if fi, err := os.Stat(migDir); err != nil || !fi.IsDir() { - return errNoMigrationsFound - } - - entries, err := os.ReadDir(migDir) - if err != nil { - return err - } - type mig struct { - ver int - path string - } - migrations := make([]mig, 0) - for _, e := range entries { - if e.IsDir() { - continue - } - name := e.Name() - if !strings.HasSuffix(strings.ToLower(name), ".sql") { - continue - } - if ver, ok := parseMigrationVersion(name); ok { - migrations = append(migrations, mig{ver: ver, path: filepath.Join(migDir, name)}) - } - } - if len(migrations) == 0 { - return errNoMigrationsFound - } - sort.Slice(migrations, func(i, j int) bool { return migrations[i].ver < migrations[j].ver }) - - // Helper to check if version applied - isApplied := func(ctx context.Context, v int) (bool, error) { - res, err := db.Query(ctx, "SELECT 1 FROM schema_migrations WHERE version = ? LIMIT 1", v) - if err != nil { - return false, err - } - return res != nil && res.Count > 0, nil - } - - for _, m := range migrations { - applied, err := isApplied(internalCtx, m.ver) - if err != nil { - return err - } - if applied { - continue - } - // Read and split SQL file into statements - content, err := os.ReadFile(m.path) - if err != nil { - return err - } - stmts := splitSQLStatements(string(content)) - for _, s := range stmts { - if s == "" { - continue - } - if _, err := db.Query(internalCtx, s); err != nil { - return err - } - } - // Mark as applied - if _, err := db.Query(internalCtx, "INSERT INTO schema_migrations (version) VALUES (?)", m.ver); err != nil { - return err - } - g.logger.ComponentInfo(logging.ComponentDatabase, "applied migration", zap.Int("version", m.ver), zap.String("file", m.path)) - } - return nil -} - -func parseMigrationVersion(name string) (int, bool) { - i := 0 - for i < len(name) && name[i] >= '0' && name[i] <= '9' { - i++ - } - if i == 0 { - return 0, false - } - v, err := strconv.Atoi(name[:i]) - if err != nil { - return 0, false - } - return v, true -} - -func splitSQLStatements(sqlText string) []string { - lines := strings.Split(sqlText, "\n") - cleaned := make([]string, 0, len(lines)) - for _, ln := range lines { - s := strings.TrimSpace(ln) - if s == "" { - continue - } - // Handle inline comments by removing everything after -- - if commentIdx := strings.Index(s, "--"); commentIdx >= 0 { - s = strings.TrimSpace(s[:commentIdx]) - if s == "" { - continue // line was only a comment - } - } - upper := strings.ToUpper(s) - if upper == "BEGIN;" || upper == "COMMIT;" || upper == "BEGIN" || upper == "COMMIT" { - continue - } - if strings.HasPrefix(upper, "INSERT") && strings.Contains(upper, "SCHEMA_MIGRATIONS") { - // ignore in-file migration markers - continue - } - cleaned = append(cleaned, s) - } - // Join and split by ';' - joined := strings.Join(cleaned, "\n") - parts := strings.Split(joined, ";") - out := make([]string, 0, len(parts)) - for _, p := range parts { - sp := strings.TrimSpace(p) - if sp == "" { - continue - } - out = append(out, sp+";") - } - return out -} diff --git a/pkg/gateway/migrate_test.go b/pkg/gateway/migrate_test.go deleted file mode 100644 index d78fa7e..0000000 --- a/pkg/gateway/migrate_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package gateway - -import "testing" - -func TestParseMigrationVersion(t *testing.T) { - cases := map[string]struct{ - name string - ok bool - }{ - "001_init.sql": {"001_init.sql", true}, - "10foobar.SQL": {"10foobar.SQL", true}, - "abc.sql": {"abc.sql", false}, - "": {"", false}, - "123_no_ext": {"123_no_ext", true}, - } - for _, c := range cases { - _, ok := parseMigrationVersion(c.name) - if ok != c.ok { - t.Fatalf("for %q expected %v got %v", c.name, c.ok, ok) - } - } -} - -func TestSplitSQLStatements(t *testing.T) { - in := `-- comment -BEGIN; -CREATE TABLE t (id INTEGER); --- another -INSERT INTO t VALUES (1); -- inline comment -COMMIT; -` - out := splitSQLStatements(in) - if len(out) != 2 { - t.Fatalf("expected 2 statements, got %d: %#v", len(out), out) - } - if out[0] != "CREATE TABLE t (id INTEGER);" { - t.Fatalf("unexpected first: %q", out[0]) - } - if out[1] != "INSERT INTO t VALUES (1);" { - t.Fatalf("unexpected second: %q", out[1]) - } -} diff --git a/pkg/gateway/db_helpers.go b/pkg/gateway/namespace_helpers.go similarity index 50% rename from pkg/gateway/db_helpers.go rename to pkg/gateway/namespace_helpers.go index 99334a0..2b3e9e5 100644 --- a/pkg/gateway/db_helpers.go +++ b/pkg/gateway/namespace_helpers.go @@ -2,22 +2,35 @@ package gateway import ( "context" + "errors" + "strings" "github.com/DeBrosOfficial/network/pkg/client" ) +// resolveNamespaceID ensures the given namespace exists and returns its primary key ID. +// Falls back to "default" when ns is empty. Uses internal auth context for system operations. func (g *Gateway) resolveNamespaceID(ctx context.Context, ns string) (interface{}, error) { - // Use internal context to bypass authentication for system operations + if g == nil || g.client == nil { + return nil, errors.New("client not initialized") + } + ns = strings.TrimSpace(ns) + if ns == "" { + ns = "default" + } + internalCtx := client.WithInternalAuth(ctx) db := g.client.Database() + if _, err := db.Query(internalCtx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil { return nil, err } res, err := db.Query(internalCtx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns) - if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + if err != nil { return nil, err } + if res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + return nil, errors.New("failed to resolve namespace") + } return res.Rows[0][0], nil } - -// Deprecated: seeding API keys from config is removed. diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index 84ecaf3..971a09c 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -9,7 +9,7 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/storage" + "github.com/gorilla/websocket" ) @@ -190,7 +190,7 @@ func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) { // resolveNamespaceFromRequest gets namespace from context set by auth middleware func resolveNamespaceFromRequest(r *http.Request) string { - if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil { + if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok { return s } diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index cd24712..46154f1 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -27,16 +27,11 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/auth/logout", g.logoutHandler) mux.HandleFunc("/v1/auth/whoami", g.whoamiHandler) - // apps CRUD - mux.HandleFunc("/v1/apps", g.appsHandler) - mux.HandleFunc("/v1/apps/", g.appsHandler) - - // database - mux.HandleFunc("/v1/db/query", g.dbQueryHandler) - mux.HandleFunc("/v1/db/transaction", g.dbTransactionHandler) - mux.HandleFunc("/v1/db/schema", g.dbSchemaHandler) - mux.HandleFunc("/v1/db/create-table", g.dbCreateTableHandler) - mux.HandleFunc("/v1/db/drop-table", g.dbDropTableHandler) + // rqlite ORM HTTP gateway (mounts /v1/db/* endpoints) + if g.ormHTTP != nil { + g.ormHTTP.BasePath = "/v1/rqlite" + g.ormHTTP.RegisterRoutes(mux) + } // network mux.HandleFunc("/v1/network/status", g.networkStatusHandler) diff --git a/pkg/gateway/storage_handlers.go b/pkg/gateway/storage_handlers.go index f3784fb..01d0ef0 100644 --- a/pkg/gateway/storage_handlers.go +++ b/pkg/gateway/storage_handlers.go @@ -3,127 +3,9 @@ package gateway import ( "encoding/json" "net/http" - - "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/pubsub" ) // Database HTTP handlers -func (g *Gateway) dbQueryHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - var body struct { - SQL string `json:"sql"` - Args []any `json:"args"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.SQL == "" { - writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}") - return - } - ctx := client.WithInternalAuth(r.Context()) - res, err := g.client.Database().Query(ctx, body.SQL, body.Args...) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, res) -} - -func (g *Gateway) dbTransactionHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - var body struct { - Statements []string `json:"statements"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Statements) == 0 { - writeError(w, http.StatusBadRequest, "invalid body: {statements:[...]}") - return - } - ctx := client.WithInternalAuth(r.Context()) - if err := g.client.Database().Transaction(ctx, body.Statements); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) -} - -func (g *Gateway) dbSchemaHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - ctx := client.WithInternalAuth(r.Context()) - schema, err := g.client.Database().GetSchema(ctx) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, schema) -} - -func (g *Gateway) dbCreateTableHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - var body struct { - Schema string `json:"schema"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Schema == "" { - writeError(w, http.StatusBadRequest, "invalid body: {schema}") - return - } - ctx := client.WithInternalAuth(r.Context()) - if err := g.client.Database().CreateTable(ctx, body.Schema); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusCreated, map[string]any{"status": "ok"}) -} - -func (g *Gateway) dbDropTableHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - var body struct { - Table string `json:"table"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Table == "" { - writeError(w, http.StatusBadRequest, "invalid body: {table}") - return - } - ctx := client.WithInternalAuth(r.Context()) - if err := g.client.Database().DropTable(ctx, body.Table); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) -} func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) { if g.client == nil { @@ -204,7 +86,7 @@ func (g *Gateway) validateNamespaceParam(r *http.Request) bool { if qns == "" { return true } - if v := r.Context().Value(pubsub.CtxKeyNamespaceOverride); v != nil { + if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok && s != "" { return s == qns } diff --git a/pkg/rqlite/gateway.go b/pkg/rqlite/gateway.go index e69de29..369b2e1 100644 --- a/pkg/rqlite/gateway.go +++ b/pkg/rqlite/gateway.go @@ -0,0 +1,615 @@ +package rqlite + +// HTTP gateway for the rqlite ORM client. +// +// This file exposes a minimal, SDK-friendly HTTP interface over the ORM-like +// client defined in client.go. It maps high-level operations (Query, Exec, +// FindBy, FindOneBy, QueryBuilder-based SELECTs, Transactions) and a few schema +// helpers into JSON-over-HTTP endpoints that can be called from any language. +// +// Endpoints (under BasePath, default: /v1/db): +// - POST {base}/query -> arbitrary SELECT; returns rows as []map[string]any +// - POST {base}/exec -> write statement (INSERT/UPDATE/DELETE/DDL); returns {rows_affected,last_insert_id} +// - POST {base}/find -> FindBy(table, criteria, opts...) -> returns []map +// - POST {base}/find-one -> FindOneBy(table, criteria, opts...) -> returns map +// - POST {base}/select -> Fluent SELECT builder via JSON (joins, where, order, group, limit, offset); returns []map or one map if one=true +// - POST {base}/transaction -> Execute a sequence of exec/query ops atomically; optionally return results +// +// Schema helpers (convenience; powered via Exec/Query): +// - GET {base}/schema -> list of user tables/views and create SQL +// - POST {base}/create-table -> {schema: "CREATE TABLE ..."} -> status ok +// - POST {base}/drop-table -> {table: "name"} -> status ok (safe-validated identifier) +// +// Notes: +// - All numbers in JSON are decoded as float64 by default; we best-effort coerce +// integral values to int64 for SQL placeholders. +// - The Save/Remove reflection helpers in the ORM require concrete Go structs; +// exposing them generically over HTTP is not portable. Prefer using the Exec +// and Find APIs, or the Select builder for CRUD-like flows. + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "net/http" + "regexp" + "strings" + "time" +) + +// HTTPGateway exposes the ORM Client as a set of HTTP handlers. +type HTTPGateway struct { + // Client is the ORM-like rqlite client to execute operations against. + Client Client + // BasePath is the prefix for all routes, e.g. "/v1/db". + // If empty, defaults to "/v1/db". A trailing slash is trimmed. + BasePath string + + // Optional: Request timeout. If > 0, handlers will use a context with this timeout. + Timeout time.Duration +} + +// NewHTTPGateway constructs a new HTTPGateway with sensible defaults. +func NewHTTPGateway(c Client, base string) *HTTPGateway { + return &HTTPGateway{ + Client: c, + BasePath: base, + } +} + +// RegisterRoutes registers all handlers onto the provided mux under BasePath. +func (g *HTTPGateway) RegisterRoutes(mux *http.ServeMux) { + base := g.base() + mux.HandleFunc(base+"/query", g.handleQuery) + mux.HandleFunc(base+"/exec", g.handleExec) + mux.HandleFunc(base+"/find", g.handleFind) + mux.HandleFunc(base+"/find-one", g.handleFindOne) + mux.HandleFunc(base+"/select", g.handleSelect) + // Keep "transaction" for compatibility with existing routes. + mux.HandleFunc(base+"/transaction", g.handleTransaction) + + // Schema helpers + mux.HandleFunc(base+"/schema", g.handleSchema) + mux.HandleFunc(base+"/create-table", g.handleCreateTable) + mux.HandleFunc(base+"/drop-table", g.handleDropTable) +} + +func (g *HTTPGateway) base() string { + b := strings.TrimSpace(g.BasePath) + if b == "" { + b = "/v1/db" + } + if b != "/" { + b = strings.TrimRight(b, "/") + } + return b +} + +func (g *HTTPGateway) withTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + if g.Timeout > 0 { + return context.WithTimeout(ctx, g.Timeout) + } + return context.WithCancel(ctx) +} + +// -------------------- +// Common HTTP helpers +// -------------------- + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +func writeError(w http.ResponseWriter, code int, msg string) { + writeJSON(w, code, map[string]any{"error": msg}) +} + +func onlyMethod(w http.ResponseWriter, r *http.Request, method string) bool { + if r.Method != method { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return false + } + return true +} + +// Normalize JSON-decoded args for SQL placeholders. +// - Convert float64 with integral value to int64 to better match SQLite expectations. +// - Leave strings, bools and nulls as-is. +// - Recursively normalizes nested arrays if present. +func normalizeArgs(args []any) []any { + out := make([]any, len(args)) + for i, a := range args { + switch v := a.(type) { + case float64: + // If v is integral (within epsilon), convert to int64 + if v == float64(int64(v)) { + out[i] = int64(v) + } else { + out[i] = v + } + case []any: + out[i] = normalizeArgs(v) + default: + out[i] = a + } + } + return out +} + +// -------------------- +// Request DTOs +// -------------------- + +type queryRequest struct { + SQL string `json:"sql"` + Args []any `json:"args"` +} + +type execRequest struct { + SQL string `json:"sql"` + Args []any `json:"args"` +} + +type findOptions struct { + Select []string `json:"select"` + OrderBy []string `json:"order_by"` + GroupBy []string `json:"group_by"` + Limit *int `json:"limit"` + Offset *int `json:"offset"` + Joins []joinBody `json:"joins"` +} + +type findRequest struct { + Table string `json:"table"` + Criteria map[string]any `json:"criteria"` + Options findOptions `json:"options"` + // Back-compat: allow options at top-level too + Select []string `json:"select"` + OrderBy []string `json:"order_by"` + GroupBy []string `json:"group_by"` + Limit *int `json:"limit"` + Offset *int `json:"offset"` + Joins []joinBody `json:"joins"` +} + +type findOneRequest = findRequest + +type joinBody struct { + Kind string `json:"kind"` // "INNER" | "LEFT" | "JOIN" + Table string `json:"table"` // table name + On string `json:"on"` // join condition +} + +type whereBody struct { + Conj string `json:"conj"` // "AND" | "OR" (default AND) + Expr string `json:"expr"` // e.g., "a = ? AND b > ?" + Args []any `json:"args"` +} + +type selectRequest struct { + Table string `json:"table"` + Alias string `json:"alias"` + Select []string `json:"select"` + Joins []joinBody `json:"joins"` + Where []whereBody `json:"where"` + GroupBy []string `json:"group_by"` + OrderBy []string `json:"order_by"` + Limit *int `json:"limit"` + Offset *int `json:"offset"` + One bool `json:"one"` // if true, returns a single row (object) +} + +type txOp struct { + Kind string `json:"kind"` // "exec" | "query" + SQL string `json:"sql"` + Args []any `json:"args"` +} + +type transactionRequest struct { + Ops []txOp `json:"ops"` + ReturnResults bool `json:"return_results"` // if true, returns per-op results + StopOnError bool `json:"stop_on_error"` // default true in tx + PartialResults bool `json:"partial_results"` // ignored for actual TX (atomic); kept for API symmetry +} + +// -------------------- +// Handlers +// -------------------- + +func (g *HTTPGateway) handleQuery(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body queryRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.SQL) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}") + return + } + args := normalizeArgs(body.Args) + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + out := make([]map[string]any, 0, 16) + if err := g.Client.Query(ctx, &out, body.SQL, args...); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "items": out, + "count": len(out), + }) +} + +func (g *HTTPGateway) handleExec(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body execRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.SQL) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}") + return + } + args := normalizeArgs(body.Args) + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + res, err := g.Client.Exec(ctx, body.SQL, args...) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + liid, _ := res.LastInsertId() + ra, _ := res.RowsAffected() + writeJSON(w, http.StatusOK, map[string]any{ + "rows_affected": ra, + "last_insert_id": liid, + "execution_state": "ok", + }) +} + +func (g *HTTPGateway) handleFind(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body findRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {table, criteria, options?}") + return + } + opts := makeFindOptions(mergeFindOptions(body)) + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + out := make([]map[string]any, 0, 32) + if err := g.Client.FindBy(ctx, &out, body.Table, body.Criteria, opts...); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "items": out, + "count": len(out), + }) +} + +func (g *HTTPGateway) handleFindOne(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body findOneRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {table, criteria, options?}") + return + } + opts := makeFindOptions(mergeFindOptions(body)) + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + row := make(map[string]any) + if err := g.Client.FindOneBy(ctx, &row, body.Table, body.Criteria, opts...); err != nil { + if errors.Is(err, sql.ErrNoRows) { + writeError(w, http.StatusNotFound, "not found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, row) +} + +func (g *HTTPGateway) handleSelect(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body selectRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {table, select?, where?, joins?, order_by?, group_by?, limit?, offset?, one?}") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + qb := g.Client.CreateQueryBuilder(body.Table) + if alias := strings.TrimSpace(body.Alias); alias != "" { + qb = qb.Alias(alias) + } + if len(body.Select) > 0 { + qb = qb.Select(body.Select...) + } + // joins + for _, j := range body.Joins { + switch strings.ToUpper(strings.TrimSpace(j.Kind)) { + case "INNER": + qb = qb.InnerJoin(j.Table, j.On) + case "LEFT": + qb = qb.LeftJoin(j.Table, j.On) + default: + qb = qb.Join(j.Table, j.On) + } + } + // where + for _, wcl := range body.Where { + switch strings.ToUpper(strings.TrimSpace(wcl.Conj)) { + case "OR": + qb = qb.OrWhere(wcl.Expr, normalizeArgs(wcl.Args)...) + default: + qb = qb.AndWhere(wcl.Expr, normalizeArgs(wcl.Args)...) + } + } + // group/order/limit/offset + if len(body.GroupBy) > 0 { + qb = qb.GroupBy(body.GroupBy...) + } + if len(body.OrderBy) > 0 { + qb = qb.OrderBy(body.OrderBy...) + } + if body.Limit != nil { + qb = qb.Limit(*body.Limit) + } + if body.Offset != nil { + qb = qb.Offset(*body.Offset) + } + + if body.One { + row := make(map[string]any) + if err := qb.GetOne(ctx, &row); err != nil { + if errors.Is(err, sql.ErrNoRows) { + writeError(w, http.StatusNotFound, "not found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, row) + return + } + + rows := make([]map[string]any, 0, 32) + if err := qb.GetMany(ctx, &rows); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "items": rows, + "count": len(rows), + }) +} + +func (g *HTTPGateway) handleTransaction(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body transactionRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Ops) == 0 { + writeError(w, http.StatusBadRequest, "invalid body: {ops:[{kind,sql,args?}], return_results?}") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + results := make([]any, 0, len(body.Ops)) + err := g.Client.Tx(ctx, func(tx Tx) error { + for _, op := range body.Ops { + switch strings.ToLower(strings.TrimSpace(op.Kind)) { + case "exec": + res, err := tx.Exec(ctx, op.SQL, normalizeArgs(op.Args)...) + if err != nil { + return err + } + if body.ReturnResults { + li, _ := res.LastInsertId() + ra, _ := res.RowsAffected() + results = append(results, map[string]any{ + "rows_affected": ra, + "last_insert_id": li, + }) + } + case "query": + var rows []map[string]any + if err := tx.Query(ctx, &rows, op.SQL, normalizeArgs(op.Args)...); err != nil { + return err + } + if body.ReturnResults { + results = append(results, rows) + } + default: + return fmt.Errorf("invalid op kind: %s", op.Kind) + } + } + return nil + }) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + if body.ReturnResults { + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + "results": results, + }) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) +} + +// -------------------- +// Schema helpers +// -------------------- + +func (g *HTTPGateway) handleSchema(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodGet) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + sqlText := `SELECT name, type, sql FROM sqlite_master WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' ORDER BY name` + var rows []map[string]any + if err := g.Client.Query(ctx, &rows, sqlText); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "objects": rows, + "count": len(rows), + }) +} + +func (g *HTTPGateway) handleCreateTable(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body struct { + Schema string `json:"schema"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Schema) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {schema}") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + if _, err := g.Client.Exec(ctx, body.Schema); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusCreated, map[string]any{"status": "ok"}) +} + +var identRe = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`) + +func (g *HTTPGateway) handleDropTable(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body struct { + Table string `json:"table"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {table}") + return + } + tbl := strings.TrimSpace(body.Table) + if !identRe.MatchString(tbl) { + writeError(w, http.StatusBadRequest, "invalid table identifier") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + stmt := "DROP TABLE IF EXISTS " + tbl + if _, err := g.Client.Exec(ctx, stmt); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) +} + +// -------------------- +// Helpers +// -------------------- + +func mergeFindOptions(fr findRequest) findOptions { + // Prefer nested Options; fallback to top-level legacy fields + if (len(fr.Options.Select)+len(fr.Options.OrderBy)+len(fr.Options.GroupBy)) > 0 || + fr.Options.Limit != nil || fr.Options.Offset != nil || len(fr.Options.Joins) > 0 { + return fr.Options + } + return findOptions{ + Select: fr.Select, + OrderBy: fr.OrderBy, + GroupBy: fr.GroupBy, + Limit: fr.Limit, + Offset: fr.Offset, + Joins: fr.Joins, + } +} + +func makeFindOptions(o findOptions) []FindOption { + opts := make([]FindOption, 0, 6) + if len(o.OrderBy) > 0 { + opts = append(opts, WithOrderBy(o.OrderBy...)) + } + if len(o.GroupBy) > 0 { + opts = append(opts, WithGroupBy(o.GroupBy...)) + } + if o.Limit != nil { + opts = append(opts, WithLimit(*o.Limit)) + } + if o.Offset != nil { + opts = append(opts, WithOffset(*o.Offset)) + } + if len(o.Select) > 0 { + opts = append(opts, WithSelect(o.Select...)) + } + for _, j := range o.Joins { + opts = append(opts, WithJoin(justOrDefault(strings.ToUpper(j.Kind), "JOIN"), j.Table, j.On)) + } + return opts +} + +func justOrDefault(s, def string) string { + if strings.TrimSpace(s) == "" { + return def + } + return s +} diff --git a/pkg/rqlite/migrations.go b/pkg/rqlite/migrations.go index 1c43ed9..60efc9b 100644 --- a/pkg/rqlite/migrations.go +++ b/pkg/rqlite/migrations.go @@ -254,47 +254,26 @@ func isNoSuchTable(err error) bool { return strings.Contains(msg, "no such table") || strings.Contains(msg, "does not exist") } -// applySQL tries to run the entire script in one Exec. -// If the driver rejects multi-statement Exec, it falls back to splitting statements and executing sequentially. +// applySQL splits the script into individual statements, strips explicit +// transaction control (BEGIN/COMMIT/ROLLBACK/END), and executes statements +// sequentially to avoid nested transaction issues with rqlite. func applySQL(ctx context.Context, db *sql.DB, script string) error { s := strings.TrimSpace(script) if s == "" { return nil } - if _, err := db.ExecContext(ctx, s); err == nil { - return nil - } else { - // Fall back to splitting into statements and executing sequentially (respecting BEGIN/COMMIT if present). - stmts := splitSQLStatements(s) - // If the script already contains explicit BEGIN/COMMIT, we just run as-is. - // Otherwise, we attempt to wrap in a transaction; if BeginTx fails, execute one-by-one. - hasExplicitTxn := containsToken(stmts, "BEGIN") || containsToken(stmts, "BEGIN;") - if !hasExplicitTxn { - if tx, txErr := db.BeginTx(ctx, nil); txErr == nil { - for _, stmt := range stmts { - if stmt == "" { - continue - } - if _, execErr := tx.ExecContext(ctx, stmt); execErr != nil { - _ = tx.Rollback() - return fmt.Errorf("exec stmt failed: %w (stmt: %s)", execErr, snippet(stmt)) - } - } - return tx.Commit() - } - // Fall through to plain sequential exec if BeginTx not supported. - } + stmts := splitSQLStatements(s) + stmts = filterOutTxnControls(stmts) - for _, stmt := range stmts { - if stmt == "" { - continue - } - if _, execErr := db.ExecContext(ctx, stmt); execErr != nil { - return fmt.Errorf("exec stmt failed: %w (stmt: %s)", execErr, snippet(stmt)) - } + for _, stmt := range stmts { + if strings.TrimSpace(stmt) == "" { + continue + } + if _, err := db.ExecContext(ctx, stmt); err != nil { + return fmt.Errorf("exec stmt failed: %w (stmt: %s)", err, snippet(stmt)) } - return nil } + return nil } func containsToken(stmts []string, token string) bool { @@ -306,6 +285,33 @@ func containsToken(stmts []string, token string) bool { return false } +// removed duplicate helper + +// removed duplicate helper + +// isTxnControl returns true if the statement is a transaction control command. +func isTxnControl(s string) bool { + t := strings.ToUpper(strings.TrimSpace(s)) + switch t { + case "BEGIN", "BEGIN TRANSACTION", "COMMIT", "END", "ROLLBACK": + return true + default: + return false + } +} + +// filterOutTxnControls removes BEGIN/COMMIT/ROLLBACK/END statements. +func filterOutTxnControls(stmts []string) []string { + out := make([]string, 0, len(stmts)) + for _, s := range stmts { + if isTxnControl(s) { + continue + } + out = append(out, s) + } + return out +} + func snippet(s string) string { s = strings.TrimSpace(s) if len(s) > 120 { diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 740e887..eca678d 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -175,7 +175,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } // After waitForLeadership / waitForSQLAvailable succeeds, before returning: - migrationsDir := "network/migrations" + migrationsDir := "migrations" if err := r.ApplyMigrations(ctx, migrationsDir); err != nil { r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir)) @@ -325,9 +325,6 @@ func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress strin } } - if lastErr == nil { - lastErr = fmt.Errorf("join target not reachable within %s", timeout) - } return lastErr }