mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 06:39:07 +00:00
- Created namespace_helpers on gateway
- Removed old pkg/gatway storage and migrated to new rqlite - Updated readme - Created new rqlite implementation - Updated changelog - Fixed migration error on migrations.go applySQL
This commit is contained in:
parent
16845b758d
commit
c0d8fcb895
@ -10,17 +10,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
|
||||
|
||||
- Created new rqlite folder
|
||||
- Created rqlite adapter, client, gateway, migrations and rqlite init
|
||||
- Created namespace_helpers on gateway
|
||||
- Created new rqlite implementation
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated node.go to support new rqlite architecture
|
||||
- Updated readme
|
||||
|
||||
|
||||
### Deprecated
|
||||
|
||||
### Removed
|
||||
|
||||
- Removed old storage folder
|
||||
- Removed old pkg/gatway storage and migrated to new rqlite
|
||||
|
||||
### Fixed
|
||||
|
||||
|
94
README.md
94
README.md
@ -414,33 +414,65 @@ bootstrap_peers:
|
||||
### Database Operations (Gateway REST)
|
||||
|
||||
```http
|
||||
POST /v1/db/exec # Body: {"sql": "INSERT/UPDATE/DELETE/DDL ...", "args": [...]}
|
||||
POST /v1/db/find # Body: {"table":"...", "criteria":{"col":val,...}, "options":{...}}
|
||||
POST /v1/db/find-one # Body: same as /find, returns a single row (404 if not found)
|
||||
POST /v1/db/select # Body: {"table":"...", "select":[...], "where":[...], "joins":[...], "order_by":[...], "limit":N, "offset":N, "one":false}
|
||||
POST /v1/db/transaction # Body: {"ops":[{"kind":"exec|query","sql":"...","args":[...]}], "return_results": true}
|
||||
POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]} (legacy-friendly SELECT)
|
||||
GET /v1/db/schema # Returns tables/views + create SQL
|
||||
POST /v1/db/create-table # Body: {"schema": "CREATE TABLE ..."}
|
||||
POST /v1/db/drop-table # Body: {"table": "table_name"}
|
||||
POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]}
|
||||
POST /v1/db/transaction # Body: {"statements": ["SQL 1", "SQL 2", ...]}
|
||||
GET /v1/db/schema # Returns current tables and columns
|
||||
```
|
||||
|
||||
Common migration workflow:
|
||||
Common workflows:
|
||||
|
||||
```bash
|
||||
# Add a new table
|
||||
curl -X POST "$GW/v1/db/create-table" \
|
||||
# Exec (INSERT/UPDATE/DELETE/DDL)
|
||||
curl -X POST "$GW/v1/db/exec" \
|
||||
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
|
||||
-d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)"}'
|
||||
-d '{"sql":"INSERT INTO users(name,email) VALUES(?,?)","args":["Alice","alice@example.com"]}'
|
||||
|
||||
# Apply multiple statements atomically
|
||||
# Find (criteria + options)
|
||||
curl -X POST "$GW/v1/db/find" \
|
||||
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
|
||||
-d '{
|
||||
"table":"users",
|
||||
"criteria":{"active":true},
|
||||
"options":{"select":["id","email"],"order_by":["created_at DESC"],"limit":25}
|
||||
}'
|
||||
|
||||
# Select (fluent builder via JSON)
|
||||
curl -X POST "$GW/v1/db/select" \
|
||||
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
|
||||
-d '{
|
||||
"table":"orders o",
|
||||
"select":["o.id","o.total","u.email AS user_email"],
|
||||
"joins":[{"kind":"INNER","table":"users u","on":"u.id = o.user_id"}],
|
||||
"where":[{"conj":"AND","expr":"o.total > ?","args":[100]}],
|
||||
"order_by":["o.created_at DESC"],
|
||||
"limit":10
|
||||
}'
|
||||
|
||||
# Transaction (atomic batch)
|
||||
curl -X POST "$GW/v1/db/transaction" \
|
||||
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
|
||||
-d '{"statements":[
|
||||
"ALTER TABLE users ADD COLUMN email TEXT",
|
||||
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)"
|
||||
]}'
|
||||
-d '{
|
||||
"return_results": true,
|
||||
"ops": [
|
||||
{"kind":"exec","sql":"INSERT INTO users(email) VALUES(?)","args":["bob@example.com"]},
|
||||
{"kind":"query","sql":"SELECT last_insert_rowid() AS id","args":[]}
|
||||
]
|
||||
}'
|
||||
|
||||
# Verify
|
||||
curl -X POST "$GW/v1/db/query" \
|
||||
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
|
||||
-d '{"sql":"PRAGMA table_info(users)"}'
|
||||
# Schema
|
||||
curl "$GW/v1/db/schema" -H "Authorization: Bearer $API_KEY"
|
||||
|
||||
# DDL helpers
|
||||
curl -X POST "$GW/v1/db/create-table" -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
|
||||
-d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)"}'
|
||||
curl -X POST "$GW/v1/db/drop-table" -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
|
||||
-d '{"table":"users"}'
|
||||
```
|
||||
|
||||
### Authentication
|
||||
@ -540,7 +572,23 @@ GET /v1/auth/whoami # Current auth status
|
||||
POST /v1/auth/api-key # Generate API key (authenticated)
|
||||
```
|
||||
|
||||
#### RQLite HTTP ORM Gateway (/v1/db)
|
||||
|
||||
The gateway now exposes a full HTTP interface over the Go ORM-like client (see `pkg/rqlite/gateway.go`) so you can build SDKs in any language.
|
||||
|
||||
- Base path: `/v1/db`
|
||||
- Endpoints:
|
||||
- `POST /v1/db/exec` — Execute write/DDL SQL; returns `{ rows_affected, last_insert_id }`
|
||||
- `POST /v1/db/find` — Map-based criteria; returns `{ items: [...], count: N }`
|
||||
- `POST /v1/db/find-one` — Single row; 404 if not found
|
||||
- `POST /v1/db/select` — Fluent SELECT via JSON (joins, where, order, group, limit, offset)
|
||||
- `POST /v1/db/transaction` — Atomic batch of exec/query ops, optional per-op results
|
||||
- `POST /v1/db/query` — Arbitrary SELECT (legacy-friendly), returns `items`
|
||||
- `GET /v1/db/schema` — List user tables/views + create SQL
|
||||
- `POST /v1/db/create-table` — Convenience for DDL
|
||||
- `POST /v1/db/drop-table` — Safe drop (identifier validated)
|
||||
|
||||
Payload examples are shown in the [Database Operations (Gateway REST)](#database-operations-gateway-rest) section.
|
||||
|
||||
#### Network Operations
|
||||
```http
|
||||
@ -575,11 +623,15 @@ GET /v1/pubsub/topics # List active topics
|
||||
|
||||
### Key HTTP endpoints for SDKs
|
||||
- **Database**
|
||||
- Create Table: `POST /v1/db/create-table` `{schema}` → `{status:"ok"}`
|
||||
- Drop Table: `POST /v1/db/drop-table` `{table}` → `{status:"ok"}`
|
||||
- Query: `POST /v1/db/query` `{sql, args?}` → `{columns, rows, count}`
|
||||
- Transaction: `POST /v1/db/transaction` `{statements:[...]}` → `{status:"ok"}`
|
||||
- Schema: `GET /v1/db/schema` → schema JSON
|
||||
- Exec: `POST /v1/db/exec` `{sql, args?}` → `{rows_affected,last_insert_id}`
|
||||
- Find: `POST /v1/db/find` `{table, criteria, options?}` → `{items,count}`
|
||||
- FindOne: `POST /v1/db/find-one` `{table, criteria, options?}` → single object or 404
|
||||
- Select: `POST /v1/db/select` `{table, select?, joins?, where?, order_by?, group_by?, limit?, offset?, one?}`
|
||||
- Transaction: `POST /v1/db/transaction` `{ops:[{kind,sql,args?}], return_results?}`
|
||||
- Query: `POST /v1/db/query` `{sql, args?}` → `{items,count}`
|
||||
- Schema: `GET /v1/db/schema`
|
||||
- Create Table: `POST /v1/db/create-table` `{schema}`
|
||||
- Drop Table: `POST /v1/db/drop-table` `{table}`
|
||||
- **PubSub**
|
||||
- WS Subscribe: `GET /v1/pubsub/ws?topic=<topic>`
|
||||
- Publish: `POST /v1/pubsub/publish` `{topic, data_base64}` → `{status:"ok"}`
|
||||
|
@ -109,12 +109,11 @@ func DefaultConfig() *Config {
|
||||
},
|
||||
Discovery: DiscoveryConfig{
|
||||
BootstrapPeers: []string{
|
||||
"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWSHHwEY6cga3ng7tD1rzStAU58ogQXVMX3LZJ6Gqf6dee",
|
||||
// "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx",
|
||||
// "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1",
|
||||
// "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR",
|
||||
// "/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1",
|
||||
// "/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4",
|
||||
"/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx",
|
||||
"/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1",
|
||||
"/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR",
|
||||
"/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1",
|
||||
"/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4",
|
||||
},
|
||||
BootstrapPort: 4001, // Default LibP2P port
|
||||
DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing
|
||||
|
@ -1,170 +0,0 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/storage"
|
||||
)
|
||||
|
||||
// appsHandler implements minimal CRUD for apps within a namespace.
|
||||
// Routes handled:
|
||||
// - GET /v1/apps -> list
|
||||
// - POST /v1/apps -> create
|
||||
// - GET /v1/apps/{app_id} -> fetch
|
||||
// - PUT /v1/apps/{app_id} -> update (name/public_key)
|
||||
// - DELETE /v1/apps/{app_id} -> delete
|
||||
func (g *Gateway) appsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
ctx := r.Context()
|
||||
ns := g.cfg.ClientNamespace
|
||||
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
ns = s
|
||||
}
|
||||
}
|
||||
if strings.TrimSpace(ns) == "" {
|
||||
ns = "default"
|
||||
}
|
||||
db := g.client.Database()
|
||||
nsID, err := g.resolveNamespaceID(ctx, ns)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
path := r.URL.Path
|
||||
// Determine if operating on collection or single resource
|
||||
if path == "/v1/apps" || path == "/v1/apps/" {
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
// List apps
|
||||
res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? ORDER BY created_at DESC", nsID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
items := make([]map[string]any, 0, res.Count)
|
||||
for _, row := range res.Rows {
|
||||
item := map[string]any{
|
||||
"app_id": row[0],
|
||||
"name": row[1],
|
||||
"public_key": row[2],
|
||||
"namespace": ns,
|
||||
"created_at": row[3],
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"items": items, "count": len(items)})
|
||||
return
|
||||
case http.MethodPost:
|
||||
// Create app with provided name/public_key
|
||||
var req struct {
|
||||
Name string `json:"name"`
|
||||
PublicKey string `json:"public_key"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid json body")
|
||||
return
|
||||
}
|
||||
// Generate app_id
|
||||
buf := make([]byte, 12)
|
||||
if _, err := rand.Read(buf); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to generate app id")
|
||||
return
|
||||
}
|
||||
appID := "app_" + base64.RawURLEncoding.EncodeToString(buf)
|
||||
if _, err := db.Query(ctx, "INSERT INTO apps(namespace_id, app_id, name, public_key) VALUES (?, ?, ?, ?)", nsID, appID, req.Name, req.PublicKey); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, map[string]any{
|
||||
"app_id": appID,
|
||||
"name": req.Name,
|
||||
"public_key": req.PublicKey,
|
||||
"namespace": ns,
|
||||
})
|
||||
return
|
||||
default:
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Single resource: /v1/apps/{app_id}
|
||||
if strings.HasPrefix(path, "/v1/apps/") {
|
||||
appID := strings.TrimPrefix(path, "/v1/apps/")
|
||||
appID = strings.TrimSpace(appID)
|
||||
if appID == "" {
|
||||
writeError(w, http.StatusBadRequest, "missing app_id")
|
||||
return
|
||||
}
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? AND app_id = ? LIMIT 1", nsID, appID)
|
||||
if err != nil || res == nil || res.Count == 0 {
|
||||
writeError(w, http.StatusNotFound, "app not found")
|
||||
return
|
||||
}
|
||||
row := res.Rows[0]
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"app_id": row[0],
|
||||
"name": row[1],
|
||||
"public_key": row[2],
|
||||
"namespace": ns,
|
||||
"created_at": row[3],
|
||||
})
|
||||
return
|
||||
case http.MethodPut:
|
||||
var req struct {
|
||||
Name *string `json:"name"`
|
||||
PublicKey *string `json:"public_key"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid json body")
|
||||
return
|
||||
}
|
||||
// Build update dynamically
|
||||
sets := make([]string, 0, 2)
|
||||
args := make([]any, 0, 4)
|
||||
if req.Name != nil {
|
||||
sets = append(sets, "name = ?")
|
||||
args = append(args, *req.Name)
|
||||
}
|
||||
if req.PublicKey != nil {
|
||||
sets = append(sets, "public_key = ?")
|
||||
args = append(args, *req.PublicKey)
|
||||
}
|
||||
if len(sets) == 0 {
|
||||
writeError(w, http.StatusBadRequest, "no fields to update")
|
||||
return
|
||||
}
|
||||
q := "UPDATE apps SET " + strings.Join(sets, ", ") + " WHERE namespace_id = ? AND app_id = ?"
|
||||
args = append(args, nsID, appID)
|
||||
if _, err := db.Query(ctx, q, args...); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||
return
|
||||
case http.MethodDelete:
|
||||
if _, err := db.Query(ctx, "DELETE FROM apps WHERE namespace_id = ? AND app_id = ?", nsID, appID); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||
return
|
||||
default:
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
writeError(w, http.StatusNotFound, "not found")
|
||||
}
|
@ -12,7 +12,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/storage"
|
||||
ethcrypto "github.com/ethereum/go-ethereum/crypto"
|
||||
)
|
||||
|
||||
@ -20,7 +19,7 @@ func (g *Gateway) whoamiHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
// Determine namespace (may be overridden by auth layer)
|
||||
ns := g.cfg.ClientNamespace
|
||||
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
|
||||
if v := ctx.Value(ctxKeyNamespaceOverride); v != nil {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
ns = s
|
||||
}
|
||||
|
@ -4,13 +4,16 @@ import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/rsa"
|
||||
"net/http"
|
||||
"database/sql"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"go.uber.org/zap"
|
||||
|
||||
_ "github.com/rqlite/gorqlite/stdlib"
|
||||
)
|
||||
|
||||
// Config holds configuration for the gateway server
|
||||
@ -18,6 +21,10 @@ type Config struct {
|
||||
ListenAddr string
|
||||
ClientNamespace string
|
||||
BootstrapPeers []string
|
||||
|
||||
// Optional DSN for rqlite database/sql driver, e.g. "http://localhost:4001"
|
||||
// If empty, defaults to "http://localhost:4001".
|
||||
RQLiteDSN string
|
||||
}
|
||||
|
||||
type Gateway struct {
|
||||
@ -27,6 +34,11 @@ type Gateway struct {
|
||||
startedAt time.Time
|
||||
signingKey *rsa.PrivateKey
|
||||
keyID string
|
||||
|
||||
// rqlite SQL connection and HTTP ORM gateway
|
||||
sqlDB *sql.DB
|
||||
ormClient rqlite.Client
|
||||
ormHTTP *rqlite.HTTPGateway
|
||||
}
|
||||
|
||||
// New creates and initializes a new Gateway instance
|
||||
@ -75,28 +87,24 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
|
||||
logger.ComponentWarn(logging.ComponentGeneral, "failed to generate RSA key; jwks will be empty", zap.Error(err))
|
||||
}
|
||||
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "Starting database migrations goroutine...")
|
||||
// Non-blocking DB migrations: probe RQLite; if reachable, apply migrations asynchronously
|
||||
go func() {
|
||||
if gw.probeRQLiteReachable(3 * time.Second) {
|
||||
internalCtx := gw.withInternalAuth(context.Background())
|
||||
if err := gw.applyMigrations(internalCtx); err != nil {
|
||||
if err == errNoMigrationsFound {
|
||||
if err2 := gw.applyAutoMigrations(internalCtx); err2 != nil {
|
||||
logger.ComponentWarn(logging.ComponentDatabase, "auto migrations failed", zap.Error(err2))
|
||||
} else {
|
||||
logger.ComponentInfo(logging.ComponentDatabase, "auto migrations applied")
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "Initializing RQLite ORM HTTP gateway...")
|
||||
dsn := cfg.RQLiteDSN
|
||||
if dsn == "" {
|
||||
dsn = "http://localhost:4001"
|
||||
}
|
||||
db, dbErr := sql.Open("rqlite", dsn)
|
||||
if dbErr != nil {
|
||||
logger.ComponentWarn(logging.ComponentGeneral, "failed to open rqlite sql db; http orm gateway disabled", zap.Error(dbErr))
|
||||
} else {
|
||||
logger.ComponentWarn(logging.ComponentDatabase, "migrations failed", zap.Error(err))
|
||||
gw.sqlDB = db
|
||||
orm := rqlite.NewClient(db)
|
||||
gw.ormClient = orm
|
||||
gw.ormHTTP = rqlite.NewHTTPGateway(orm, "/v1/db")
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "RQLite ORM HTTP gateway ready",
|
||||
zap.String("dsn", dsn),
|
||||
zap.String("base_path", "/v1/db"),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
logger.ComponentInfo(logging.ComponentDatabase, "migrations applied")
|
||||
}
|
||||
} else {
|
||||
logger.ComponentWarn(logging.ComponentDatabase, "RQLite not reachable; skipping migrations for now")
|
||||
}
|
||||
}()
|
||||
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...")
|
||||
return gw, nil
|
||||
@ -107,31 +115,6 @@ func (g *Gateway) withInternalAuth(ctx context.Context) context.Context {
|
||||
return client.WithInternalAuth(ctx)
|
||||
}
|
||||
|
||||
// probeRQLiteReachable performs a quick GET /status against candidate endpoints with a short timeout.
|
||||
func (g *Gateway) probeRQLiteReachable(timeout time.Duration) bool {
|
||||
endpoints := client.DefaultDatabaseEndpoints()
|
||||
httpClient := &http.Client{Timeout: timeout}
|
||||
for _, ep := range endpoints {
|
||||
url := ep
|
||||
if url == "" {
|
||||
continue
|
||||
}
|
||||
if url[len(url)-1] == '/' {
|
||||
url = url[:len(url)-1]
|
||||
}
|
||||
reqURL := url + "/status"
|
||||
resp, err := httpClient.Get(reqURL)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Close disconnects the gateway client
|
||||
func (g *Gateway) Close() {
|
||||
if g.client != nil {
|
||||
@ -139,4 +122,7 @@ func (g *Gateway) Close() {
|
||||
g.logger.ComponentWarn(logging.ComponentClient, "error during client disconnect", zap.Error(err))
|
||||
}
|
||||
}
|
||||
if g.sqlDB != nil {
|
||||
_ = g.sqlDB.Close()
|
||||
}
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"github.com/DeBrosOfficial/network/pkg/storage"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -21,6 +20,7 @@ type contextKey string
|
||||
const (
|
||||
ctxKeyAPIKey contextKey = "api_key"
|
||||
ctxKeyJWT contextKey = "jwt_claims"
|
||||
ctxKeyNamespaceOverride contextKey = "namespace_override"
|
||||
)
|
||||
|
||||
// withMiddleware adds CORS and logging middleware
|
||||
@ -78,7 +78,7 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
|
||||
// Attach JWT claims and namespace to context
|
||||
ctx := context.WithValue(r.Context(), ctxKeyJWT, claims)
|
||||
if ns := strings.TrimSpace(claims.Namespace); ns != "" {
|
||||
ctx = storage.WithNamespace(ctx, ns)
|
||||
ctx = context.WithValue(ctx, ctxKeyNamespaceOverride, ns)
|
||||
}
|
||||
next.ServeHTTP(w, r.WithContext(ctx))
|
||||
return
|
||||
@ -125,7 +125,7 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
|
||||
|
||||
// Attach auth metadata to context for downstream use
|
||||
reqCtx := context.WithValue(r.Context(), ctxKeyAPIKey, key)
|
||||
reqCtx = storage.WithNamespace(reqCtx, ns)
|
||||
reqCtx = context.WithValue(reqCtx, ctxKeyNamespaceOverride, ns)
|
||||
next.ServeHTTP(w, r.WithContext(reqCtx))
|
||||
})
|
||||
}
|
||||
@ -190,7 +190,7 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler {
|
||||
// Determine namespace from context
|
||||
ctx := r.Context()
|
||||
ns := ""
|
||||
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
|
||||
if v := ctx.Value(ctxKeyNamespaceOverride); v != nil {
|
||||
if s, ok := v.(string); ok {
|
||||
ns = strings.TrimSpace(s)
|
||||
}
|
||||
@ -265,16 +265,13 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler {
|
||||
// requiresNamespaceOwnership returns true if the path should be guarded by
|
||||
// namespace ownership checks.
|
||||
func requiresNamespaceOwnership(p string) bool {
|
||||
if p == "/storage" || p == "/v1/storage" || strings.HasPrefix(p, "/v1/storage/") {
|
||||
return true
|
||||
}
|
||||
if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") {
|
||||
if p == "/rqlite" || p == "/v1/rqlite" || strings.HasPrefix(p, "/v1/rqlite/") {
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(p, "/v1/pubsub") {
|
||||
return true
|
||||
}
|
||||
if strings.HasPrefix(p, "/v1/db/") {
|
||||
if strings.HasPrefix(p, "/v1/rqlite/") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -1,188 +0,0 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errNoMigrationsFound = errors.New("no migrations found")
|
||||
|
||||
func (g *Gateway) applyAutoMigrations(ctx context.Context) error {
|
||||
if g.client == nil {
|
||||
return nil
|
||||
}
|
||||
db := g.client.Database()
|
||||
|
||||
// Use internal context to bypass authentication for system migrations
|
||||
internalCtx := client.WithInternalAuth(ctx)
|
||||
|
||||
stmts := []string{
|
||||
// namespaces
|
||||
"CREATE TABLE IF NOT EXISTS namespaces (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t name TEXT NOT NULL UNIQUE,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP\n)",
|
||||
// api_keys
|
||||
"CREATE TABLE IF NOT EXISTS api_keys (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t key TEXT NOT NULL UNIQUE,\n\t name TEXT,\n\t namespace_id INTEGER NOT NULL,\n\t scopes TEXT,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t last_used_at TIMESTAMP,\n\t FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE\n)",
|
||||
"CREATE INDEX IF NOT EXISTS idx_api_keys_namespace ON api_keys(namespace_id)",
|
||||
// request_logs
|
||||
"CREATE TABLE IF NOT EXISTS request_logs (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t method TEXT NOT NULL,\n\t path TEXT NOT NULL,\n\t status_code INTEGER NOT NULL,\n\t bytes_out INTEGER NOT NULL DEFAULT 0,\n\t duration_ms INTEGER NOT NULL DEFAULT 0,\n\t ip TEXT,\n\t api_key_id INTEGER,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE SET NULL\n)",
|
||||
"CREATE INDEX IF NOT EXISTS idx_request_logs_api_key ON request_logs(api_key_id)",
|
||||
"CREATE INDEX IF NOT EXISTS idx_request_logs_created_at ON request_logs(created_at)",
|
||||
// seed default namespace
|
||||
"INSERT OR IGNORE INTO namespaces(name) VALUES ('default')",
|
||||
}
|
||||
|
||||
for _, stmt := range stmts {
|
||||
if _, err := db.Query(internalCtx, stmt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Gateway) applyMigrations(ctx context.Context) error {
|
||||
if g.client == nil {
|
||||
return nil
|
||||
}
|
||||
db := g.client.Database()
|
||||
|
||||
// Use internal context to bypass authentication for system migrations
|
||||
internalCtx := client.WithInternalAuth(ctx)
|
||||
|
||||
// Ensure schema_migrations exists first
|
||||
if _, err := db.Query(internalCtx, "CREATE TABLE IF NOT EXISTS schema_migrations (\n\tversion INTEGER PRIMARY KEY,\n\tapplied_at TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))\n)"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Locate migrations directory relative to CWD
|
||||
migDir := "migrations"
|
||||
if fi, err := os.Stat(migDir); err != nil || !fi.IsDir() {
|
||||
return errNoMigrationsFound
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(migDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
type mig struct {
|
||||
ver int
|
||||
path string
|
||||
}
|
||||
migrations := make([]mig, 0)
|
||||
for _, e := range entries {
|
||||
if e.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := e.Name()
|
||||
if !strings.HasSuffix(strings.ToLower(name), ".sql") {
|
||||
continue
|
||||
}
|
||||
if ver, ok := parseMigrationVersion(name); ok {
|
||||
migrations = append(migrations, mig{ver: ver, path: filepath.Join(migDir, name)})
|
||||
}
|
||||
}
|
||||
if len(migrations) == 0 {
|
||||
return errNoMigrationsFound
|
||||
}
|
||||
sort.Slice(migrations, func(i, j int) bool { return migrations[i].ver < migrations[j].ver })
|
||||
|
||||
// Helper to check if version applied
|
||||
isApplied := func(ctx context.Context, v int) (bool, error) {
|
||||
res, err := db.Query(ctx, "SELECT 1 FROM schema_migrations WHERE version = ? LIMIT 1", v)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return res != nil && res.Count > 0, nil
|
||||
}
|
||||
|
||||
for _, m := range migrations {
|
||||
applied, err := isApplied(internalCtx, m.ver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if applied {
|
||||
continue
|
||||
}
|
||||
// Read and split SQL file into statements
|
||||
content, err := os.ReadFile(m.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stmts := splitSQLStatements(string(content))
|
||||
for _, s := range stmts {
|
||||
if s == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := db.Query(internalCtx, s); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Mark as applied
|
||||
if _, err := db.Query(internalCtx, "INSERT INTO schema_migrations (version) VALUES (?)", m.ver); err != nil {
|
||||
return err
|
||||
}
|
||||
g.logger.ComponentInfo(logging.ComponentDatabase, "applied migration", zap.Int("version", m.ver), zap.String("file", m.path))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseMigrationVersion(name string) (int, bool) {
|
||||
i := 0
|
||||
for i < len(name) && name[i] >= '0' && name[i] <= '9' {
|
||||
i++
|
||||
}
|
||||
if i == 0 {
|
||||
return 0, false
|
||||
}
|
||||
v, err := strconv.Atoi(name[:i])
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return v, true
|
||||
}
|
||||
|
||||
func splitSQLStatements(sqlText string) []string {
|
||||
lines := strings.Split(sqlText, "\n")
|
||||
cleaned := make([]string, 0, len(lines))
|
||||
for _, ln := range lines {
|
||||
s := strings.TrimSpace(ln)
|
||||
if s == "" {
|
||||
continue
|
||||
}
|
||||
// Handle inline comments by removing everything after --
|
||||
if commentIdx := strings.Index(s, "--"); commentIdx >= 0 {
|
||||
s = strings.TrimSpace(s[:commentIdx])
|
||||
if s == "" {
|
||||
continue // line was only a comment
|
||||
}
|
||||
}
|
||||
upper := strings.ToUpper(s)
|
||||
if upper == "BEGIN;" || upper == "COMMIT;" || upper == "BEGIN" || upper == "COMMIT" {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(upper, "INSERT") && strings.Contains(upper, "SCHEMA_MIGRATIONS") {
|
||||
// ignore in-file migration markers
|
||||
continue
|
||||
}
|
||||
cleaned = append(cleaned, s)
|
||||
}
|
||||
// Join and split by ';'
|
||||
joined := strings.Join(cleaned, "\n")
|
||||
parts := strings.Split(joined, ";")
|
||||
out := make([]string, 0, len(parts))
|
||||
for _, p := range parts {
|
||||
sp := strings.TrimSpace(p)
|
||||
if sp == "" {
|
||||
continue
|
||||
}
|
||||
out = append(out, sp+";")
|
||||
}
|
||||
return out
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
package gateway
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestParseMigrationVersion(t *testing.T) {
|
||||
cases := map[string]struct{
|
||||
name string
|
||||
ok bool
|
||||
}{
|
||||
"001_init.sql": {"001_init.sql", true},
|
||||
"10foobar.SQL": {"10foobar.SQL", true},
|
||||
"abc.sql": {"abc.sql", false},
|
||||
"": {"", false},
|
||||
"123_no_ext": {"123_no_ext", true},
|
||||
}
|
||||
for _, c := range cases {
|
||||
_, ok := parseMigrationVersion(c.name)
|
||||
if ok != c.ok {
|
||||
t.Fatalf("for %q expected %v got %v", c.name, c.ok, ok)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitSQLStatements(t *testing.T) {
|
||||
in := `-- comment
|
||||
BEGIN;
|
||||
CREATE TABLE t (id INTEGER);
|
||||
-- another
|
||||
INSERT INTO t VALUES (1); -- inline comment
|
||||
COMMIT;
|
||||
`
|
||||
out := splitSQLStatements(in)
|
||||
if len(out) != 2 {
|
||||
t.Fatalf("expected 2 statements, got %d: %#v", len(out), out)
|
||||
}
|
||||
if out[0] != "CREATE TABLE t (id INTEGER);" {
|
||||
t.Fatalf("unexpected first: %q", out[0])
|
||||
}
|
||||
if out[1] != "INSERT INTO t VALUES (1);" {
|
||||
t.Fatalf("unexpected second: %q", out[1])
|
||||
}
|
||||
}
|
@ -2,22 +2,35 @@ package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
)
|
||||
|
||||
// resolveNamespaceID ensures the given namespace exists and returns its primary key ID.
|
||||
// Falls back to "default" when ns is empty. Uses internal auth context for system operations.
|
||||
func (g *Gateway) resolveNamespaceID(ctx context.Context, ns string) (interface{}, error) {
|
||||
// Use internal context to bypass authentication for system operations
|
||||
if g == nil || g.client == nil {
|
||||
return nil, errors.New("client not initialized")
|
||||
}
|
||||
ns = strings.TrimSpace(ns)
|
||||
if ns == "" {
|
||||
ns = "default"
|
||||
}
|
||||
|
||||
internalCtx := client.WithInternalAuth(ctx)
|
||||
db := g.client.Database()
|
||||
|
||||
if _, err := db.Query(internalCtx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := db.Query(internalCtx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns)
|
||||
if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 {
|
||||
return nil, errors.New("failed to resolve namespace")
|
||||
}
|
||||
return res.Rows[0][0], nil
|
||||
}
|
||||
|
||||
// Deprecated: seeding API keys from config is removed.
|
@ -9,7 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/storage"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
@ -190,7 +190,7 @@ func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// resolveNamespaceFromRequest gets namespace from context set by auth middleware
|
||||
func resolveNamespaceFromRequest(r *http.Request) string {
|
||||
if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil {
|
||||
if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil {
|
||||
if s, ok := v.(string); ok {
|
||||
return s
|
||||
}
|
||||
|
@ -27,16 +27,11 @@ func (g *Gateway) Routes() http.Handler {
|
||||
mux.HandleFunc("/v1/auth/logout", g.logoutHandler)
|
||||
mux.HandleFunc("/v1/auth/whoami", g.whoamiHandler)
|
||||
|
||||
// apps CRUD
|
||||
mux.HandleFunc("/v1/apps", g.appsHandler)
|
||||
mux.HandleFunc("/v1/apps/", g.appsHandler)
|
||||
|
||||
// database
|
||||
mux.HandleFunc("/v1/db/query", g.dbQueryHandler)
|
||||
mux.HandleFunc("/v1/db/transaction", g.dbTransactionHandler)
|
||||
mux.HandleFunc("/v1/db/schema", g.dbSchemaHandler)
|
||||
mux.HandleFunc("/v1/db/create-table", g.dbCreateTableHandler)
|
||||
mux.HandleFunc("/v1/db/drop-table", g.dbDropTableHandler)
|
||||
// rqlite ORM HTTP gateway (mounts /v1/db/* endpoints)
|
||||
if g.ormHTTP != nil {
|
||||
g.ormHTTP.BasePath = "/v1/rqlite"
|
||||
g.ormHTTP.RegisterRoutes(mux)
|
||||
}
|
||||
|
||||
// network
|
||||
mux.HandleFunc("/v1/network/status", g.networkStatusHandler)
|
||||
|
@ -3,127 +3,9 @@ package gateway
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/pubsub"
|
||||
)
|
||||
|
||||
// Database HTTP handlers
|
||||
func (g *Gateway) dbQueryHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
var body struct {
|
||||
SQL string `json:"sql"`
|
||||
Args []any `json:"args"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.SQL == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}")
|
||||
return
|
||||
}
|
||||
ctx := client.WithInternalAuth(r.Context())
|
||||
res, err := g.client.Database().Query(ctx, body.SQL, body.Args...)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, res)
|
||||
}
|
||||
|
||||
func (g *Gateway) dbTransactionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
var body struct {
|
||||
Statements []string `json:"statements"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Statements) == 0 {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {statements:[...]}")
|
||||
return
|
||||
}
|
||||
ctx := client.WithInternalAuth(r.Context())
|
||||
if err := g.client.Database().Transaction(ctx, body.Statements); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||
}
|
||||
|
||||
func (g *Gateway) dbSchemaHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
if r.Method != http.MethodGet {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
ctx := client.WithInternalAuth(r.Context())
|
||||
schema, err := g.client.Database().GetSchema(ctx)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, schema)
|
||||
}
|
||||
|
||||
func (g *Gateway) dbCreateTableHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
var body struct {
|
||||
Schema string `json:"schema"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Schema == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {schema}")
|
||||
return
|
||||
}
|
||||
ctx := client.WithInternalAuth(r.Context())
|
||||
if err := g.client.Database().CreateTable(ctx, body.Schema); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, map[string]any{"status": "ok"})
|
||||
}
|
||||
|
||||
func (g *Gateway) dbDropTableHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
var body struct {
|
||||
Table string `json:"table"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Table == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {table}")
|
||||
return
|
||||
}
|
||||
ctx := client.WithInternalAuth(r.Context())
|
||||
if err := g.client.Database().DropTable(ctx, body.Table); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||
}
|
||||
|
||||
func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.client == nil {
|
||||
@ -204,7 +86,7 @@ func (g *Gateway) validateNamespaceParam(r *http.Request) bool {
|
||||
if qns == "" {
|
||||
return true
|
||||
}
|
||||
if v := r.Context().Value(pubsub.CtxKeyNamespaceOverride); v != nil {
|
||||
if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
return s == qns
|
||||
}
|
||||
|
@ -0,0 +1,615 @@
|
||||
package rqlite
|
||||
|
||||
// HTTP gateway for the rqlite ORM client.
|
||||
//
|
||||
// This file exposes a minimal, SDK-friendly HTTP interface over the ORM-like
|
||||
// client defined in client.go. It maps high-level operations (Query, Exec,
|
||||
// FindBy, FindOneBy, QueryBuilder-based SELECTs, Transactions) and a few schema
|
||||
// helpers into JSON-over-HTTP endpoints that can be called from any language.
|
||||
//
|
||||
// Endpoints (under BasePath, default: /v1/db):
|
||||
// - POST {base}/query -> arbitrary SELECT; returns rows as []map[string]any
|
||||
// - POST {base}/exec -> write statement (INSERT/UPDATE/DELETE/DDL); returns {rows_affected,last_insert_id}
|
||||
// - POST {base}/find -> FindBy(table, criteria, opts...) -> returns []map
|
||||
// - POST {base}/find-one -> FindOneBy(table, criteria, opts...) -> returns map
|
||||
// - POST {base}/select -> Fluent SELECT builder via JSON (joins, where, order, group, limit, offset); returns []map or one map if one=true
|
||||
// - POST {base}/transaction -> Execute a sequence of exec/query ops atomically; optionally return results
|
||||
//
|
||||
// Schema helpers (convenience; powered via Exec/Query):
|
||||
// - GET {base}/schema -> list of user tables/views and create SQL
|
||||
// - POST {base}/create-table -> {schema: "CREATE TABLE ..."} -> status ok
|
||||
// - POST {base}/drop-table -> {table: "name"} -> status ok (safe-validated identifier)
|
||||
//
|
||||
// Notes:
|
||||
// - All numbers in JSON are decoded as float64 by default; we best-effort coerce
|
||||
// integral values to int64 for SQL placeholders.
|
||||
// - The Save/Remove reflection helpers in the ORM require concrete Go structs;
|
||||
// exposing them generically over HTTP is not portable. Prefer using the Exec
|
||||
// and Find APIs, or the Select builder for CRUD-like flows.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// HTTPGateway exposes the ORM Client as a set of HTTP handlers.
|
||||
type HTTPGateway struct {
|
||||
// Client is the ORM-like rqlite client to execute operations against.
|
||||
Client Client
|
||||
// BasePath is the prefix for all routes, e.g. "/v1/db".
|
||||
// If empty, defaults to "/v1/db". A trailing slash is trimmed.
|
||||
BasePath string
|
||||
|
||||
// Optional: Request timeout. If > 0, handlers will use a context with this timeout.
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
// NewHTTPGateway constructs a new HTTPGateway with sensible defaults.
|
||||
func NewHTTPGateway(c Client, base string) *HTTPGateway {
|
||||
return &HTTPGateway{
|
||||
Client: c,
|
||||
BasePath: base,
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterRoutes registers all handlers onto the provided mux under BasePath.
|
||||
func (g *HTTPGateway) RegisterRoutes(mux *http.ServeMux) {
|
||||
base := g.base()
|
||||
mux.HandleFunc(base+"/query", g.handleQuery)
|
||||
mux.HandleFunc(base+"/exec", g.handleExec)
|
||||
mux.HandleFunc(base+"/find", g.handleFind)
|
||||
mux.HandleFunc(base+"/find-one", g.handleFindOne)
|
||||
mux.HandleFunc(base+"/select", g.handleSelect)
|
||||
// Keep "transaction" for compatibility with existing routes.
|
||||
mux.HandleFunc(base+"/transaction", g.handleTransaction)
|
||||
|
||||
// Schema helpers
|
||||
mux.HandleFunc(base+"/schema", g.handleSchema)
|
||||
mux.HandleFunc(base+"/create-table", g.handleCreateTable)
|
||||
mux.HandleFunc(base+"/drop-table", g.handleDropTable)
|
||||
}
|
||||
|
||||
func (g *HTTPGateway) base() string {
|
||||
b := strings.TrimSpace(g.BasePath)
|
||||
if b == "" {
|
||||
b = "/v1/db"
|
||||
}
|
||||
if b != "/" {
|
||||
b = strings.TrimRight(b, "/")
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (g *HTTPGateway) withTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||
if g.Timeout > 0 {
|
||||
return context.WithTimeout(ctx, g.Timeout)
|
||||
}
|
||||
return context.WithCancel(ctx)
|
||||
}
|
||||
|
||||
// --------------------
|
||||
// Common HTTP helpers
|
||||
// --------------------
|
||||
|
||||
func writeJSON(w http.ResponseWriter, code int, v any) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(code)
|
||||
_ = json.NewEncoder(w).Encode(v)
|
||||
}
|
||||
|
||||
func writeError(w http.ResponseWriter, code int, msg string) {
|
||||
writeJSON(w, code, map[string]any{"error": msg})
|
||||
}
|
||||
|
||||
func onlyMethod(w http.ResponseWriter, r *http.Request, method string) bool {
|
||||
if r.Method != method {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Normalize JSON-decoded args for SQL placeholders.
|
||||
// - Convert float64 with integral value to int64 to better match SQLite expectations.
|
||||
// - Leave strings, bools and nulls as-is.
|
||||
// - Recursively normalizes nested arrays if present.
|
||||
func normalizeArgs(args []any) []any {
|
||||
out := make([]any, len(args))
|
||||
for i, a := range args {
|
||||
switch v := a.(type) {
|
||||
case float64:
|
||||
// If v is integral (within epsilon), convert to int64
|
||||
if v == float64(int64(v)) {
|
||||
out[i] = int64(v)
|
||||
} else {
|
||||
out[i] = v
|
||||
}
|
||||
case []any:
|
||||
out[i] = normalizeArgs(v)
|
||||
default:
|
||||
out[i] = a
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// --------------------
|
||||
// Request DTOs
|
||||
// --------------------
|
||||
|
||||
type queryRequest struct {
|
||||
SQL string `json:"sql"`
|
||||
Args []any `json:"args"`
|
||||
}
|
||||
|
||||
type execRequest struct {
|
||||
SQL string `json:"sql"`
|
||||
Args []any `json:"args"`
|
||||
}
|
||||
|
||||
type findOptions struct {
|
||||
Select []string `json:"select"`
|
||||
OrderBy []string `json:"order_by"`
|
||||
GroupBy []string `json:"group_by"`
|
||||
Limit *int `json:"limit"`
|
||||
Offset *int `json:"offset"`
|
||||
Joins []joinBody `json:"joins"`
|
||||
}
|
||||
|
||||
type findRequest struct {
|
||||
Table string `json:"table"`
|
||||
Criteria map[string]any `json:"criteria"`
|
||||
Options findOptions `json:"options"`
|
||||
// Back-compat: allow options at top-level too
|
||||
Select []string `json:"select"`
|
||||
OrderBy []string `json:"order_by"`
|
||||
GroupBy []string `json:"group_by"`
|
||||
Limit *int `json:"limit"`
|
||||
Offset *int `json:"offset"`
|
||||
Joins []joinBody `json:"joins"`
|
||||
}
|
||||
|
||||
type findOneRequest = findRequest
|
||||
|
||||
type joinBody struct {
|
||||
Kind string `json:"kind"` // "INNER" | "LEFT" | "JOIN"
|
||||
Table string `json:"table"` // table name
|
||||
On string `json:"on"` // join condition
|
||||
}
|
||||
|
||||
type whereBody struct {
|
||||
Conj string `json:"conj"` // "AND" | "OR" (default AND)
|
||||
Expr string `json:"expr"` // e.g., "a = ? AND b > ?"
|
||||
Args []any `json:"args"`
|
||||
}
|
||||
|
||||
type selectRequest struct {
|
||||
Table string `json:"table"`
|
||||
Alias string `json:"alias"`
|
||||
Select []string `json:"select"`
|
||||
Joins []joinBody `json:"joins"`
|
||||
Where []whereBody `json:"where"`
|
||||
GroupBy []string `json:"group_by"`
|
||||
OrderBy []string `json:"order_by"`
|
||||
Limit *int `json:"limit"`
|
||||
Offset *int `json:"offset"`
|
||||
One bool `json:"one"` // if true, returns a single row (object)
|
||||
}
|
||||
|
||||
type txOp struct {
|
||||
Kind string `json:"kind"` // "exec" | "query"
|
||||
SQL string `json:"sql"`
|
||||
Args []any `json:"args"`
|
||||
}
|
||||
|
||||
type transactionRequest struct {
|
||||
Ops []txOp `json:"ops"`
|
||||
ReturnResults bool `json:"return_results"` // if true, returns per-op results
|
||||
StopOnError bool `json:"stop_on_error"` // default true in tx
|
||||
PartialResults bool `json:"partial_results"` // ignored for actual TX (atomic); kept for API symmetry
|
||||
}
|
||||
|
||||
// --------------------
|
||||
// Handlers
|
||||
// --------------------
|
||||
|
||||
func (g *HTTPGateway) handleQuery(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodPost) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
var body queryRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.SQL) == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}")
|
||||
return
|
||||
}
|
||||
args := normalizeArgs(body.Args)
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
out := make([]map[string]any, 0, 16)
|
||||
if err := g.Client.Query(ctx, &out, body.SQL, args...); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"items": out,
|
||||
"count": len(out),
|
||||
})
|
||||
}
|
||||
|
||||
func (g *HTTPGateway) handleExec(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodPost) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
var body execRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.SQL) == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}")
|
||||
return
|
||||
}
|
||||
args := normalizeArgs(body.Args)
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
res, err := g.Client.Exec(ctx, body.SQL, args...)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
liid, _ := res.LastInsertId()
|
||||
ra, _ := res.RowsAffected()
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"rows_affected": ra,
|
||||
"last_insert_id": liid,
|
||||
"execution_state": "ok",
|
||||
})
|
||||
}
|
||||
|
||||
func (g *HTTPGateway) handleFind(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodPost) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
var body findRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {table, criteria, options?}")
|
||||
return
|
||||
}
|
||||
opts := makeFindOptions(mergeFindOptions(body))
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
out := make([]map[string]any, 0, 32)
|
||||
if err := g.Client.FindBy(ctx, &out, body.Table, body.Criteria, opts...); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"items": out,
|
||||
"count": len(out),
|
||||
})
|
||||
}
|
||||
|
||||
func (g *HTTPGateway) handleFindOne(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodPost) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
var body findOneRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {table, criteria, options?}")
|
||||
return
|
||||
}
|
||||
opts := makeFindOptions(mergeFindOptions(body))
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
row := make(map[string]any)
|
||||
if err := g.Client.FindOneBy(ctx, &row, body.Table, body.Criteria, opts...); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
writeError(w, http.StatusNotFound, "not found")
|
||||
return
|
||||
}
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, row)
|
||||
}
|
||||
|
||||
func (g *HTTPGateway) handleSelect(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodPost) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
var body selectRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {table, select?, where?, joins?, order_by?, group_by?, limit?, offset?, one?}")
|
||||
return
|
||||
}
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
qb := g.Client.CreateQueryBuilder(body.Table)
|
||||
if alias := strings.TrimSpace(body.Alias); alias != "" {
|
||||
qb = qb.Alias(alias)
|
||||
}
|
||||
if len(body.Select) > 0 {
|
||||
qb = qb.Select(body.Select...)
|
||||
}
|
||||
// joins
|
||||
for _, j := range body.Joins {
|
||||
switch strings.ToUpper(strings.TrimSpace(j.Kind)) {
|
||||
case "INNER":
|
||||
qb = qb.InnerJoin(j.Table, j.On)
|
||||
case "LEFT":
|
||||
qb = qb.LeftJoin(j.Table, j.On)
|
||||
default:
|
||||
qb = qb.Join(j.Table, j.On)
|
||||
}
|
||||
}
|
||||
// where
|
||||
for _, wcl := range body.Where {
|
||||
switch strings.ToUpper(strings.TrimSpace(wcl.Conj)) {
|
||||
case "OR":
|
||||
qb = qb.OrWhere(wcl.Expr, normalizeArgs(wcl.Args)...)
|
||||
default:
|
||||
qb = qb.AndWhere(wcl.Expr, normalizeArgs(wcl.Args)...)
|
||||
}
|
||||
}
|
||||
// group/order/limit/offset
|
||||
if len(body.GroupBy) > 0 {
|
||||
qb = qb.GroupBy(body.GroupBy...)
|
||||
}
|
||||
if len(body.OrderBy) > 0 {
|
||||
qb = qb.OrderBy(body.OrderBy...)
|
||||
}
|
||||
if body.Limit != nil {
|
||||
qb = qb.Limit(*body.Limit)
|
||||
}
|
||||
if body.Offset != nil {
|
||||
qb = qb.Offset(*body.Offset)
|
||||
}
|
||||
|
||||
if body.One {
|
||||
row := make(map[string]any)
|
||||
if err := qb.GetOne(ctx, &row); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
writeError(w, http.StatusNotFound, "not found")
|
||||
return
|
||||
}
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, row)
|
||||
return
|
||||
}
|
||||
|
||||
rows := make([]map[string]any, 0, 32)
|
||||
if err := qb.GetMany(ctx, &rows); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"items": rows,
|
||||
"count": len(rows),
|
||||
})
|
||||
}
|
||||
|
||||
func (g *HTTPGateway) handleTransaction(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodPost) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
var body transactionRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Ops) == 0 {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {ops:[{kind,sql,args?}], return_results?}")
|
||||
return
|
||||
}
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
results := make([]any, 0, len(body.Ops))
|
||||
err := g.Client.Tx(ctx, func(tx Tx) error {
|
||||
for _, op := range body.Ops {
|
||||
switch strings.ToLower(strings.TrimSpace(op.Kind)) {
|
||||
case "exec":
|
||||
res, err := tx.Exec(ctx, op.SQL, normalizeArgs(op.Args)...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if body.ReturnResults {
|
||||
li, _ := res.LastInsertId()
|
||||
ra, _ := res.RowsAffected()
|
||||
results = append(results, map[string]any{
|
||||
"rows_affected": ra,
|
||||
"last_insert_id": li,
|
||||
})
|
||||
}
|
||||
case "query":
|
||||
var rows []map[string]any
|
||||
if err := tx.Query(ctx, &rows, op.SQL, normalizeArgs(op.Args)...); err != nil {
|
||||
return err
|
||||
}
|
||||
if body.ReturnResults {
|
||||
results = append(results, rows)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("invalid op kind: %s", op.Kind)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if body.ReturnResults {
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"status": "ok",
|
||||
"results": results,
|
||||
})
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||
}
|
||||
|
||||
// --------------------
|
||||
// Schema helpers
|
||||
// --------------------
|
||||
|
||||
func (g *HTTPGateway) handleSchema(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodGet) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
sqlText := `SELECT name, type, sql FROM sqlite_master WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' ORDER BY name`
|
||||
var rows []map[string]any
|
||||
if err := g.Client.Query(ctx, &rows, sqlText); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"objects": rows,
|
||||
"count": len(rows),
|
||||
})
|
||||
}
|
||||
|
||||
func (g *HTTPGateway) handleCreateTable(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodPost) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
var body struct {
|
||||
Schema string `json:"schema"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Schema) == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {schema}")
|
||||
return
|
||||
}
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
if _, err := g.Client.Exec(ctx, body.Schema); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, map[string]any{"status": "ok"})
|
||||
}
|
||||
|
||||
var identRe = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`)
|
||||
|
||||
func (g *HTTPGateway) handleDropTable(w http.ResponseWriter, r *http.Request) {
|
||||
if !onlyMethod(w, r, http.MethodPost) {
|
||||
return
|
||||
}
|
||||
if g.Client == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "client not initialized")
|
||||
return
|
||||
}
|
||||
var body struct {
|
||||
Table string `json:"table"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" {
|
||||
writeError(w, http.StatusBadRequest, "invalid body: {table}")
|
||||
return
|
||||
}
|
||||
tbl := strings.TrimSpace(body.Table)
|
||||
if !identRe.MatchString(tbl) {
|
||||
writeError(w, http.StatusBadRequest, "invalid table identifier")
|
||||
return
|
||||
}
|
||||
ctx, cancel := g.withTimeout(r.Context())
|
||||
defer cancel()
|
||||
|
||||
stmt := "DROP TABLE IF EXISTS " + tbl
|
||||
if _, err := g.Client.Exec(ctx, stmt); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||
}
|
||||
|
||||
// --------------------
|
||||
// Helpers
|
||||
// --------------------
|
||||
|
||||
func mergeFindOptions(fr findRequest) findOptions {
|
||||
// Prefer nested Options; fallback to top-level legacy fields
|
||||
if (len(fr.Options.Select)+len(fr.Options.OrderBy)+len(fr.Options.GroupBy)) > 0 ||
|
||||
fr.Options.Limit != nil || fr.Options.Offset != nil || len(fr.Options.Joins) > 0 {
|
||||
return fr.Options
|
||||
}
|
||||
return findOptions{
|
||||
Select: fr.Select,
|
||||
OrderBy: fr.OrderBy,
|
||||
GroupBy: fr.GroupBy,
|
||||
Limit: fr.Limit,
|
||||
Offset: fr.Offset,
|
||||
Joins: fr.Joins,
|
||||
}
|
||||
}
|
||||
|
||||
func makeFindOptions(o findOptions) []FindOption {
|
||||
opts := make([]FindOption, 0, 6)
|
||||
if len(o.OrderBy) > 0 {
|
||||
opts = append(opts, WithOrderBy(o.OrderBy...))
|
||||
}
|
||||
if len(o.GroupBy) > 0 {
|
||||
opts = append(opts, WithGroupBy(o.GroupBy...))
|
||||
}
|
||||
if o.Limit != nil {
|
||||
opts = append(opts, WithLimit(*o.Limit))
|
||||
}
|
||||
if o.Offset != nil {
|
||||
opts = append(opts, WithOffset(*o.Offset))
|
||||
}
|
||||
if len(o.Select) > 0 {
|
||||
opts = append(opts, WithSelect(o.Select...))
|
||||
}
|
||||
for _, j := range o.Joins {
|
||||
opts = append(opts, WithJoin(justOrDefault(strings.ToUpper(j.Kind), "JOIN"), j.Table, j.On))
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
func justOrDefault(s, def string) string {
|
||||
if strings.TrimSpace(s) == "" {
|
||||
return def
|
||||
}
|
||||
return s
|
||||
}
|
@ -254,48 +254,27 @@ func isNoSuchTable(err error) bool {
|
||||
return strings.Contains(msg, "no such table") || strings.Contains(msg, "does not exist")
|
||||
}
|
||||
|
||||
// applySQL tries to run the entire script in one Exec.
|
||||
// If the driver rejects multi-statement Exec, it falls back to splitting statements and executing sequentially.
|
||||
// applySQL splits the script into individual statements, strips explicit
|
||||
// transaction control (BEGIN/COMMIT/ROLLBACK/END), and executes statements
|
||||
// sequentially to avoid nested transaction issues with rqlite.
|
||||
func applySQL(ctx context.Context, db *sql.DB, script string) error {
|
||||
s := strings.TrimSpace(script)
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
if _, err := db.ExecContext(ctx, s); err == nil {
|
||||
return nil
|
||||
} else {
|
||||
// Fall back to splitting into statements and executing sequentially (respecting BEGIN/COMMIT if present).
|
||||
stmts := splitSQLStatements(s)
|
||||
// If the script already contains explicit BEGIN/COMMIT, we just run as-is.
|
||||
// Otherwise, we attempt to wrap in a transaction; if BeginTx fails, execute one-by-one.
|
||||
hasExplicitTxn := containsToken(stmts, "BEGIN") || containsToken(stmts, "BEGIN;")
|
||||
if !hasExplicitTxn {
|
||||
if tx, txErr := db.BeginTx(ctx, nil); txErr == nil {
|
||||
for _, stmt := range stmts {
|
||||
if stmt == "" {
|
||||
continue
|
||||
}
|
||||
if _, execErr := tx.ExecContext(ctx, stmt); execErr != nil {
|
||||
_ = tx.Rollback()
|
||||
return fmt.Errorf("exec stmt failed: %w (stmt: %s)", execErr, snippet(stmt))
|
||||
}
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
// Fall through to plain sequential exec if BeginTx not supported.
|
||||
}
|
||||
stmts = filterOutTxnControls(stmts)
|
||||
|
||||
for _, stmt := range stmts {
|
||||
if stmt == "" {
|
||||
if strings.TrimSpace(stmt) == "" {
|
||||
continue
|
||||
}
|
||||
if _, execErr := db.ExecContext(ctx, stmt); execErr != nil {
|
||||
return fmt.Errorf("exec stmt failed: %w (stmt: %s)", execErr, snippet(stmt))
|
||||
if _, err := db.ExecContext(ctx, stmt); err != nil {
|
||||
return fmt.Errorf("exec stmt failed: %w (stmt: %s)", err, snippet(stmt))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func containsToken(stmts []string, token string) bool {
|
||||
for _, s := range stmts {
|
||||
@ -306,6 +285,33 @@ func containsToken(stmts []string, token string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// removed duplicate helper
|
||||
|
||||
// removed duplicate helper
|
||||
|
||||
// isTxnControl returns true if the statement is a transaction control command.
|
||||
func isTxnControl(s string) bool {
|
||||
t := strings.ToUpper(strings.TrimSpace(s))
|
||||
switch t {
|
||||
case "BEGIN", "BEGIN TRANSACTION", "COMMIT", "END", "ROLLBACK":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// filterOutTxnControls removes BEGIN/COMMIT/ROLLBACK/END statements.
|
||||
func filterOutTxnControls(stmts []string) []string {
|
||||
out := make([]string, 0, len(stmts))
|
||||
for _, s := range stmts {
|
||||
if isTxnControl(s) {
|
||||
continue
|
||||
}
|
||||
out = append(out, s)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func snippet(s string) string {
|
||||
s = strings.TrimSpace(s)
|
||||
if len(s) > 120 {
|
||||
|
@ -175,7 +175,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// After waitForLeadership / waitForSQLAvailable succeeds, before returning:
|
||||
migrationsDir := "network/migrations"
|
||||
migrationsDir := "migrations"
|
||||
|
||||
if err := r.ApplyMigrations(ctx, migrationsDir); err != nil {
|
||||
r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir))
|
||||
@ -325,9 +325,6 @@ func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress strin
|
||||
}
|
||||
}
|
||||
|
||||
if lastErr == nil {
|
||||
lastErr = fmt.Errorf("join target not reachable within %s", timeout)
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user