Merge pull request #36 from DeBrosOfficial/storage-refactor

Storage refactor
This commit is contained in:
anonpenguin 2025-09-23 07:20:50 +03:00 committed by GitHub
commit 05f2e61822
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 2301 additions and 1261 deletions

View File

@ -8,12 +8,24 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
### Added ### Added
- Created new rqlite folder
- Created rqlite adapter, client, gateway, migrations and rqlite init
- Created namespace_helpers on gateway
- Created new rqlite implementation
### Changed ### Changed
- Updated node.go to support new rqlite architecture
- Updated readme
### Deprecated ### Deprecated
### Removed ### Removed
- Removed old storage folder
- Removed old pkg/gatway storage and migrated to new rqlite
### Fixed ### Fixed
### Security ### Security

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.44.0-beta VERSION := 0.50.0-beta
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

331
README.md
View File

@ -15,6 +15,7 @@ A robust, decentralized peer-to-peer network built in Go, providing distributed
- [CLI Usage](#cli-usage) - [CLI Usage](#cli-usage)
- [HTTP Gateway](#http-gateway) - [HTTP Gateway](#http-gateway)
- [Development](#development) - [Development](#development)
- [Database Client (Go ORM-like)](#database-client-go-orm-like)
- [Troubleshooting](#troubleshooting) - [Troubleshooting](#troubleshooting)
- [License](#license) - [License](#license)
@ -413,33 +414,65 @@ bootstrap_peers:
### Database Operations (Gateway REST) ### Database Operations (Gateway REST)
```http ```http
POST /v1/db/exec # Body: {"sql": "INSERT/UPDATE/DELETE/DDL ...", "args": [...]}
POST /v1/db/find # Body: {"table":"...", "criteria":{"col":val,...}, "options":{...}}
POST /v1/db/find-one # Body: same as /find, returns a single row (404 if not found)
POST /v1/db/select # Body: {"table":"...", "select":[...], "where":[...], "joins":[...], "order_by":[...], "limit":N, "offset":N, "one":false}
POST /v1/db/transaction # Body: {"ops":[{"kind":"exec|query","sql":"...","args":[...]}], "return_results": true}
POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]} (legacy-friendly SELECT)
GET /v1/db/schema # Returns tables/views + create SQL
POST /v1/db/create-table # Body: {"schema": "CREATE TABLE ..."} POST /v1/db/create-table # Body: {"schema": "CREATE TABLE ..."}
POST /v1/db/drop-table # Body: {"table": "table_name"} POST /v1/db/drop-table # Body: {"table": "table_name"}
POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]}
POST /v1/db/transaction # Body: {"statements": ["SQL 1", "SQL 2", ...]}
GET /v1/db/schema # Returns current tables and columns
``` ```
Common migration workflow: Common workflows:
```bash ```bash
# Add a new table # Exec (INSERT/UPDATE/DELETE/DDL)
curl -X POST "$GW/v1/db/create-table" \ curl -X POST "$GW/v1/db/exec" \
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
-d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)"}' -d '{"sql":"INSERT INTO users(name,email) VALUES(?,?)","args":["Alice","alice@example.com"]}'
# Apply multiple statements atomically # Find (criteria + options)
curl -X POST "$GW/v1/db/find" \
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
-d '{
"table":"users",
"criteria":{"active":true},
"options":{"select":["id","email"],"order_by":["created_at DESC"],"limit":25}
}'
# Select (fluent builder via JSON)
curl -X POST "$GW/v1/db/select" \
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
-d '{
"table":"orders o",
"select":["o.id","o.total","u.email AS user_email"],
"joins":[{"kind":"INNER","table":"users u","on":"u.id = o.user_id"}],
"where":[{"conj":"AND","expr":"o.total > ?","args":[100]}],
"order_by":["o.created_at DESC"],
"limit":10
}'
# Transaction (atomic batch)
curl -X POST "$GW/v1/db/transaction" \ curl -X POST "$GW/v1/db/transaction" \
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
-d '{"statements":[ -d '{
"ALTER TABLE users ADD COLUMN email TEXT", "return_results": true,
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)" "ops": [
]}' {"kind":"exec","sql":"INSERT INTO users(email) VALUES(?)","args":["bob@example.com"]},
{"kind":"query","sql":"SELECT last_insert_rowid() AS id","args":[]}
]
}'
# Verify # Schema
curl -X POST "$GW/v1/db/query" \ curl "$GW/v1/db/schema" -H "Authorization: Bearer $API_KEY"
-H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
-d '{"sql":"PRAGMA table_info(users)"}' # DDL helpers
curl -X POST "$GW/v1/db/create-table" -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
-d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)"}'
curl -X POST "$GW/v1/db/drop-table" -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \
-d '{"table":"users"}'
``` ```
### Authentication ### Authentication
@ -539,7 +572,23 @@ GET /v1/auth/whoami # Current auth status
POST /v1/auth/api-key # Generate API key (authenticated) POST /v1/auth/api-key # Generate API key (authenticated)
``` ```
#### RQLite HTTP ORM Gateway (/v1/db)
The gateway now exposes a full HTTP interface over the Go ORM-like client (see `pkg/rqlite/gateway.go`) so you can build SDKs in any language.
- Base path: `/v1/db`
- Endpoints:
- `POST /v1/db/exec` — Execute write/DDL SQL; returns `{ rows_affected, last_insert_id }`
- `POST /v1/db/find` — Map-based criteria; returns `{ items: [...], count: N }`
- `POST /v1/db/find-one` — Single row; 404 if not found
- `POST /v1/db/select` — Fluent SELECT via JSON (joins, where, order, group, limit, offset)
- `POST /v1/db/transaction` — Atomic batch of exec/query ops, optional per-op results
- `POST /v1/db/query` — Arbitrary SELECT (legacy-friendly), returns `items`
- `GET /v1/db/schema` — List user tables/views + create SQL
- `POST /v1/db/create-table` — Convenience for DDL
- `POST /v1/db/drop-table` — Safe drop (identifier validated)
Payload examples are shown in the [Database Operations (Gateway REST)](#database-operations-gateway-rest) section.
#### Network Operations #### Network Operations
```http ```http
@ -574,11 +623,15 @@ GET /v1/pubsub/topics # List active topics
### Key HTTP endpoints for SDKs ### Key HTTP endpoints for SDKs
- **Database** - **Database**
- Create Table: `POST /v1/db/create-table` `{schema}``{status:"ok"}` - Exec: `POST /v1/db/exec` `{sql, args?}``{rows_affected,last_insert_id}`
- Drop Table: `POST /v1/db/drop-table` `{table}``{status:"ok"}` - Find: `POST /v1/db/find` `{table, criteria, options?}``{items,count}`
- Query: `POST /v1/db/query` `{sql, args?}``{columns, rows, count}` - FindOne: `POST /v1/db/find-one` `{table, criteria, options?}` → single object or 404
- Transaction: `POST /v1/db/transaction` `{statements:[...]}``{status:"ok"}` - Select: `POST /v1/db/select` `{table, select?, joins?, where?, order_by?, group_by?, limit?, offset?, one?}`
- Schema: `GET /v1/db/schema` → schema JSON - Transaction: `POST /v1/db/transaction` `{ops:[{kind,sql,args?}], return_results?}`
- Query: `POST /v1/db/query` `{sql, args?}``{items,count}`
- Schema: `GET /v1/db/schema`
- Create Table: `POST /v1/db/create-table` `{schema}`
- Drop Table: `POST /v1/db/drop-table` `{table}`
- **PubSub** - **PubSub**
- WS Subscribe: `GET /v1/pubsub/ws?topic=<topic>` - WS Subscribe: `GET /v1/pubsub/ws?topic=<topic>`
- Publish: `POST /v1/pubsub/publish` `{topic, data_base64}``{status:"ok"}` - Publish: `POST /v1/pubsub/publish` `{topic, data_base64}``{status:"ok"}`
@ -700,6 +753,242 @@ make clean # Clean build artifacts
scripts/test-multinode.sh scripts/test-multinode.sh
``` ```
---
## Database Client (Go ORM-like)
A lightweight ORM-like client over rqlite using Gos `database/sql`. It provides:
- Query/Exec for raw SQL
- A fluent QueryBuilder (`Where`, `InnerJoin`, `LeftJoin`, `OrderBy`, `GroupBy`, `Limit`, `Offset`)
- Simple repositories with `Find`/`FindOne`
- `Save`/`Remove` for entities with primary keys
- Transaction support via `Tx`
### Installation
- Ensure rqlite is running (the node starts and manages rqlite automatically).
- Import the client:
- Package: `github.com/DeBrosOfficial/network/pkg/rqlite`
### Quick Start
```go
package main
import (
"context"
"database/sql"
"time"
"github.com/DeBrosOfficial/network/pkg/rqlite"
_ "github.com/rqlite/gorqlite/stdlib"
)
type User struct {
ID int64 `db:"id,pk,auto"`
Email string `db:"email"`
FirstName string `db:"first_name"`
LastName string `db:"last_name"`
CreatedAt time.Time `db:"created_at"`
}
func (User) TableName() string { return "users" }
func main() {
ctx := context.Background()
adapter, _ := rqlite.NewRQLiteAdapter(manager)
client := rqlite.NewClientFromAdapter(adapter)
// Save (INSERT)
u := &User{Email: "alice@example.com", FirstName: "Alice", LastName: "A"}
_ = client.Save(ctx, u) // auto-sets u.ID if autoincrement is available
// FindOneBy
var one User
_ = client.FindOneBy(ctx, &one, "users", map[string]any{"email": "alice@example.com"})
// QueryBuilder
var users []User
_ = client.CreateQueryBuilder("users").
Where("email LIKE ?", "%@example.com").
OrderBy("created_at DESC").
Limit(10).
GetMany(ctx, &users)
}
### Entities and Mapping
- Use struct tags: `db:"column_name"`; the first tag value is the column name.
- Mark primary key: `db:"id,pk"` (and `auto` if autoincrement): `db:"id,pk,auto"`.
- Fallbacks:
- If no `db` tag is provided, the field name is used as the column (case-insensitive).
- If a field is named `ID`, it is treated as the primary key by default.
```go
type Post struct {
ID int64 `db:"id,pk,auto"`
UserID int64 `db:"user_id"`
Title string `db:"title"`
Body string `db:"body"`
CreatedAt time.Time `db:"created_at"`
}
func (Post) TableName() string { return "posts" }
```
### Basic queries
Raw SQL with scanning into structs or maps:
```go
var users []User
err := client.Query(ctx, &users, "SELECT id, email, first_name, last_name, created_at FROM users WHERE email LIKE ?", "%@example.com")
if err != nil {
// handle
}
var rows []map[string]any
_ = client.Query(ctx, &rows, "SELECT id, email FROM users WHERE id IN (?,?)", 1, 2)
```
### Query Buider
Build complex SELECTs with joins, filters, grouping, ordering, and pagination.
```go
var results []User
qb := client.CreateQueryBuilder("users u").
InnerJoin("posts p", "p.user_id = u.id").
Where("u.email LIKE ?", "%@example.com").
AndWhere("p.created_at >= ?", "2024-01-01T00:00:00Z").
GroupBy("u.id").
OrderBy("u.created_at DESC").
Limit(20).
Offset(0)
if err := qb.GetMany(ctx, &results); err != nil {
// handle
}
// Single row (LIMIT 1)
var one User
if err := qb.Limit(1).GetOne(ctx, &one); err != nil {
// handle sql.ErrNoRows, etc.
}
```
### FindBy / FindOneBy
Simple map-based filters:
```go
var active []User
_ = client.FindBy(ctx, &active, "users", map[string]any{"last_name": "A"}, rqlite.WithOrderBy("created_at DESC"), rqlite.WithLimit(50))
var u User
if err := client.FindOneBy(ctx, &u, "users", map[string]any{"email": "alice@example.com"}); err != nil {
// sql.ErrNoRows if not found
}
```
### Save / Remove
`Save` inserts if PK is zero, otherwise updates by PK.
`Remove` deletes by PK.
```go
// Insert (ID is zero)
u := &User{Email: "bob@example.com", FirstName: "Bob"}
_ = client.Save(ctx, u) // INSERT; sets u.ID if autoincrement
// Update (ID is non-zero)
u.FirstName = "Bobby"
_ = client.Save(ctx, u) // UPDATE ... WHERE id = ?
// Remove
_ = client.Remove(ctx, u) // DELETE ... WHERE id = ?
```
### transactions
Run multiple operations atomically. If your function returns an error, the transaction is rolled back; otherwise it commits.
```go
err := client.Tx(ctx, func(tx rqlite.Tx) error {
// Read inside the same transaction
var me User
if err := tx.Query(ctx, &me, "SELECT * FROM users WHERE id = ?", 1); err != nil {
return err
}
// Write inside the same transaction
me.LastName = "Updated"
if err := tx.Save(ctx, &me); err != nil {
return err
}
// Complex query via builder
var recent []User
if err := tx.CreateQueryBuilder("users").
OrderBy("created_at DESC").
Limit(5).
GetMany(ctx, &recent); err != nil {
return err
}
return nil // commit
})
```
### Repositories (optional, generic)
Strongly-typed convenience layer bound to a table:
```go
repo := client.Repository[User]("users")
var many []User
_ = repo.Find(ctx, &many, map[string]any{"last_name": "A"}, rqlite.WithOrderBy("created_at DESC"), rqlite.WithLimit(10))
var one User
_ = repo.FindOne(ctx, &one, map[string]any{"email": "alice@example.com"})
_ = repo.Save(ctx, &one)
_ = repo.Remove(ctx, &one)
```
### Migrations
Option A: From the node (after rqlite is ready)
```go
ctx := context.Background()
dirs := []string{
"network/migrations", // default
"path/to/your/app/migrations", // extra
}
if err := rqliteManager.ApplyMigrationsDirs(ctx, dirs); err != nil {
logger.Fatal("apply migrations failed", zap.Error(err))
}
```
Option B: Using the adapter sql.DB
```go
ctx := context.Background()
db := adapter.GetSQLDB()
dirs := []string{"network/migrations", "app/migrations"}
if err := rqlite.ApplyMigrationsDirs(ctx, db, dirs, logger); err != nil {
logger.Fatal("apply migrations failed", zap.Error(err))
}
```
--- ---
## Troubleshooting ## Troubleshooting

View File

@ -109,12 +109,11 @@ func DefaultConfig() *Config {
}, },
Discovery: DiscoveryConfig{ Discovery: DiscoveryConfig{
BootstrapPeers: []string{ BootstrapPeers: []string{
"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWSHHwEY6cga3ng7tD1rzStAU58ogQXVMX3LZJ6Gqf6dee", "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx",
// "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx", "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1",
// "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1", "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR",
// "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR", "/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1",
// "/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1", "/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4",
// "/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4",
}, },
BootstrapPort: 4001, // Default LibP2P port BootstrapPort: 4001, // Default LibP2P port
DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing

View File

@ -1,170 +0,0 @@
package gateway
import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"net/http"
"strings"
"github.com/DeBrosOfficial/network/pkg/storage"
)
// appsHandler implements minimal CRUD for apps within a namespace.
// Routes handled:
// - GET /v1/apps -> list
// - POST /v1/apps -> create
// - GET /v1/apps/{app_id} -> fetch
// - PUT /v1/apps/{app_id} -> update (name/public_key)
// - DELETE /v1/apps/{app_id} -> delete
func (g *Gateway) appsHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
ns := g.cfg.ClientNamespace
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
if strings.TrimSpace(ns) == "" {
ns = "default"
}
db := g.client.Database()
nsID, err := g.resolveNamespaceID(ctx, ns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
path := r.URL.Path
// Determine if operating on collection or single resource
if path == "/v1/apps" || path == "/v1/apps/" {
switch r.Method {
case http.MethodGet:
// List apps
res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? ORDER BY created_at DESC", nsID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
items := make([]map[string]any, 0, res.Count)
for _, row := range res.Rows {
item := map[string]any{
"app_id": row[0],
"name": row[1],
"public_key": row[2],
"namespace": ns,
"created_at": row[3],
}
items = append(items, item)
}
writeJSON(w, http.StatusOK, map[string]any{"items": items, "count": len(items)})
return
case http.MethodPost:
// Create app with provided name/public_key
var req struct {
Name string `json:"name"`
PublicKey string `json:"public_key"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
// Generate app_id
buf := make([]byte, 12)
if _, err := rand.Read(buf); err != nil {
writeError(w, http.StatusInternalServerError, "failed to generate app id")
return
}
appID := "app_" + base64.RawURLEncoding.EncodeToString(buf)
if _, err := db.Query(ctx, "INSERT INTO apps(namespace_id, app_id, name, public_key) VALUES (?, ?, ?, ?)", nsID, appID, req.Name, req.PublicKey); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{
"app_id": appID,
"name": req.Name,
"public_key": req.PublicKey,
"namespace": ns,
})
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
// Single resource: /v1/apps/{app_id}
if strings.HasPrefix(path, "/v1/apps/") {
appID := strings.TrimPrefix(path, "/v1/apps/")
appID = strings.TrimSpace(appID)
if appID == "" {
writeError(w, http.StatusBadRequest, "missing app_id")
return
}
switch r.Method {
case http.MethodGet:
res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? AND app_id = ? LIMIT 1", nsID, appID)
if err != nil || res == nil || res.Count == 0 {
writeError(w, http.StatusNotFound, "app not found")
return
}
row := res.Rows[0]
writeJSON(w, http.StatusOK, map[string]any{
"app_id": row[0],
"name": row[1],
"public_key": row[2],
"namespace": ns,
"created_at": row[3],
})
return
case http.MethodPut:
var req struct {
Name *string `json:"name"`
PublicKey *string `json:"public_key"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
// Build update dynamically
sets := make([]string, 0, 2)
args := make([]any, 0, 4)
if req.Name != nil {
sets = append(sets, "name = ?")
args = append(args, *req.Name)
}
if req.PublicKey != nil {
sets = append(sets, "public_key = ?")
args = append(args, *req.PublicKey)
}
if len(sets) == 0 {
writeError(w, http.StatusBadRequest, "no fields to update")
return
}
q := "UPDATE apps SET " + strings.Join(sets, ", ") + " WHERE namespace_id = ? AND app_id = ?"
args = append(args, nsID, appID)
if _, err := db.Query(ctx, q, args...); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
return
case http.MethodDelete:
if _, err := db.Query(ctx, "DELETE FROM apps WHERE namespace_id = ? AND app_id = ?", nsID, appID); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
writeError(w, http.StatusNotFound, "not found")
}

View File

@ -12,7 +12,6 @@ import (
"time" "time"
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/storage"
ethcrypto "github.com/ethereum/go-ethereum/crypto" ethcrypto "github.com/ethereum/go-ethereum/crypto"
) )
@ -20,7 +19,7 @@ func (g *Gateway) whoamiHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
// Determine namespace (may be overridden by auth layer) // Determine namespace (may be overridden by auth layer)
ns := g.cfg.ClientNamespace ns := g.cfg.ClientNamespace
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil { if v := ctx.Value(ctxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" { if s, ok := v.(string); ok && s != "" {
ns = s ns = s
} }

View File

@ -4,13 +4,16 @@ import (
"context" "context"
"crypto/rand" "crypto/rand"
"crypto/rsa" "crypto/rsa"
"net/http" "database/sql"
"strconv" "strconv"
"time" "time"
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/logging"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"go.uber.org/zap" "go.uber.org/zap"
_ "github.com/rqlite/gorqlite/stdlib"
) )
// Config holds configuration for the gateway server // Config holds configuration for the gateway server
@ -18,6 +21,10 @@ type Config struct {
ListenAddr string ListenAddr string
ClientNamespace string ClientNamespace string
BootstrapPeers []string BootstrapPeers []string
// Optional DSN for rqlite database/sql driver, e.g. "http://localhost:4001"
// If empty, defaults to "http://localhost:4001".
RQLiteDSN string
} }
type Gateway struct { type Gateway struct {
@ -27,6 +34,11 @@ type Gateway struct {
startedAt time.Time startedAt time.Time
signingKey *rsa.PrivateKey signingKey *rsa.PrivateKey
keyID string keyID string
// rqlite SQL connection and HTTP ORM gateway
sqlDB *sql.DB
ormClient rqlite.Client
ormHTTP *rqlite.HTTPGateway
} }
// New creates and initializes a new Gateway instance // New creates and initializes a new Gateway instance
@ -75,28 +87,24 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
logger.ComponentWarn(logging.ComponentGeneral, "failed to generate RSA key; jwks will be empty", zap.Error(err)) logger.ComponentWarn(logging.ComponentGeneral, "failed to generate RSA key; jwks will be empty", zap.Error(err))
} }
logger.ComponentInfo(logging.ComponentGeneral, "Starting database migrations goroutine...") logger.ComponentInfo(logging.ComponentGeneral, "Initializing RQLite ORM HTTP gateway...")
// Non-blocking DB migrations: probe RQLite; if reachable, apply migrations asynchronously dsn := cfg.RQLiteDSN
go func() { if dsn == "" {
if gw.probeRQLiteReachable(3 * time.Second) { dsn = "http://localhost:4001"
internalCtx := gw.withInternalAuth(context.Background())
if err := gw.applyMigrations(internalCtx); err != nil {
if err == errNoMigrationsFound {
if err2 := gw.applyAutoMigrations(internalCtx); err2 != nil {
logger.ComponentWarn(logging.ComponentDatabase, "auto migrations failed", zap.Error(err2))
} else {
logger.ComponentInfo(logging.ComponentDatabase, "auto migrations applied")
} }
db, dbErr := sql.Open("rqlite", dsn)
if dbErr != nil {
logger.ComponentWarn(logging.ComponentGeneral, "failed to open rqlite sql db; http orm gateway disabled", zap.Error(dbErr))
} else { } else {
logger.ComponentWarn(logging.ComponentDatabase, "migrations failed", zap.Error(err)) gw.sqlDB = db
orm := rqlite.NewClient(db)
gw.ormClient = orm
gw.ormHTTP = rqlite.NewHTTPGateway(orm, "/v1/db")
logger.ComponentInfo(logging.ComponentGeneral, "RQLite ORM HTTP gateway ready",
zap.String("dsn", dsn),
zap.String("base_path", "/v1/db"),
)
} }
} else {
logger.ComponentInfo(logging.ComponentDatabase, "migrations applied")
}
} else {
logger.ComponentWarn(logging.ComponentDatabase, "RQLite not reachable; skipping migrations for now")
}
}()
logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...") logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...")
return gw, nil return gw, nil
@ -107,31 +115,6 @@ func (g *Gateway) withInternalAuth(ctx context.Context) context.Context {
return client.WithInternalAuth(ctx) return client.WithInternalAuth(ctx)
} }
// probeRQLiteReachable performs a quick GET /status against candidate endpoints with a short timeout.
func (g *Gateway) probeRQLiteReachable(timeout time.Duration) bool {
endpoints := client.DefaultDatabaseEndpoints()
httpClient := &http.Client{Timeout: timeout}
for _, ep := range endpoints {
url := ep
if url == "" {
continue
}
if url[len(url)-1] == '/' {
url = url[:len(url)-1]
}
reqURL := url + "/status"
resp, err := httpClient.Get(reqURL)
if err != nil {
continue
}
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return true
}
}
return false
}
// Close disconnects the gateway client // Close disconnects the gateway client
func (g *Gateway) Close() { func (g *Gateway) Close() {
if g.client != nil { if g.client != nil {
@ -139,4 +122,7 @@ func (g *Gateway) Close() {
g.logger.ComponentWarn(logging.ComponentClient, "error during client disconnect", zap.Error(err)) g.logger.ComponentWarn(logging.ComponentClient, "error during client disconnect", zap.Error(err))
} }
} }
if g.sqlDB != nil {
_ = g.sqlDB.Close()
}
} }

View File

@ -11,7 +11,6 @@ import (
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/logging"
"github.com/DeBrosOfficial/network/pkg/storage"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -21,6 +20,7 @@ type contextKey string
const ( const (
ctxKeyAPIKey contextKey = "api_key" ctxKeyAPIKey contextKey = "api_key"
ctxKeyJWT contextKey = "jwt_claims" ctxKeyJWT contextKey = "jwt_claims"
ctxKeyNamespaceOverride contextKey = "namespace_override"
) )
// withMiddleware adds CORS and logging middleware // withMiddleware adds CORS and logging middleware
@ -78,7 +78,7 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
// Attach JWT claims and namespace to context // Attach JWT claims and namespace to context
ctx := context.WithValue(r.Context(), ctxKeyJWT, claims) ctx := context.WithValue(r.Context(), ctxKeyJWT, claims)
if ns := strings.TrimSpace(claims.Namespace); ns != "" { if ns := strings.TrimSpace(claims.Namespace); ns != "" {
ctx = storage.WithNamespace(ctx, ns) ctx = context.WithValue(ctx, ctxKeyNamespaceOverride, ns)
} }
next.ServeHTTP(w, r.WithContext(ctx)) next.ServeHTTP(w, r.WithContext(ctx))
return return
@ -125,7 +125,7 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
// Attach auth metadata to context for downstream use // Attach auth metadata to context for downstream use
reqCtx := context.WithValue(r.Context(), ctxKeyAPIKey, key) reqCtx := context.WithValue(r.Context(), ctxKeyAPIKey, key)
reqCtx = storage.WithNamespace(reqCtx, ns) reqCtx = context.WithValue(reqCtx, ctxKeyNamespaceOverride, ns)
next.ServeHTTP(w, r.WithContext(reqCtx)) next.ServeHTTP(w, r.WithContext(reqCtx))
}) })
} }
@ -190,7 +190,7 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler {
// Determine namespace from context // Determine namespace from context
ctx := r.Context() ctx := r.Context()
ns := "" ns := ""
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil { if v := ctx.Value(ctxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok { if s, ok := v.(string); ok {
ns = strings.TrimSpace(s) ns = strings.TrimSpace(s)
} }
@ -265,16 +265,13 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler {
// requiresNamespaceOwnership returns true if the path should be guarded by // requiresNamespaceOwnership returns true if the path should be guarded by
// namespace ownership checks. // namespace ownership checks.
func requiresNamespaceOwnership(p string) bool { func requiresNamespaceOwnership(p string) bool {
if p == "/storage" || p == "/v1/storage" || strings.HasPrefix(p, "/v1/storage/") { if p == "/rqlite" || p == "/v1/rqlite" || strings.HasPrefix(p, "/v1/rqlite/") {
return true
}
if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") {
return true return true
} }
if strings.HasPrefix(p, "/v1/pubsub") { if strings.HasPrefix(p, "/v1/pubsub") {
return true return true
} }
if strings.HasPrefix(p, "/v1/db/") { if strings.HasPrefix(p, "/v1/rqlite/") {
return true return true
} }
return false return false

View File

@ -1,188 +0,0 @@
package gateway
import (
"context"
"errors"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/logging"
"go.uber.org/zap"
)
var errNoMigrationsFound = errors.New("no migrations found")
func (g *Gateway) applyAutoMigrations(ctx context.Context) error {
if g.client == nil {
return nil
}
db := g.client.Database()
// Use internal context to bypass authentication for system migrations
internalCtx := client.WithInternalAuth(ctx)
stmts := []string{
// namespaces
"CREATE TABLE IF NOT EXISTS namespaces (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t name TEXT NOT NULL UNIQUE,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP\n)",
// api_keys
"CREATE TABLE IF NOT EXISTS api_keys (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t key TEXT NOT NULL UNIQUE,\n\t name TEXT,\n\t namespace_id INTEGER NOT NULL,\n\t scopes TEXT,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t last_used_at TIMESTAMP,\n\t FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE\n)",
"CREATE INDEX IF NOT EXISTS idx_api_keys_namespace ON api_keys(namespace_id)",
// request_logs
"CREATE TABLE IF NOT EXISTS request_logs (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t method TEXT NOT NULL,\n\t path TEXT NOT NULL,\n\t status_code INTEGER NOT NULL,\n\t bytes_out INTEGER NOT NULL DEFAULT 0,\n\t duration_ms INTEGER NOT NULL DEFAULT 0,\n\t ip TEXT,\n\t api_key_id INTEGER,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE SET NULL\n)",
"CREATE INDEX IF NOT EXISTS idx_request_logs_api_key ON request_logs(api_key_id)",
"CREATE INDEX IF NOT EXISTS idx_request_logs_created_at ON request_logs(created_at)",
// seed default namespace
"INSERT OR IGNORE INTO namespaces(name) VALUES ('default')",
}
for _, stmt := range stmts {
if _, err := db.Query(internalCtx, stmt); err != nil {
return err
}
}
return nil
}
func (g *Gateway) applyMigrations(ctx context.Context) error {
if g.client == nil {
return nil
}
db := g.client.Database()
// Use internal context to bypass authentication for system migrations
internalCtx := client.WithInternalAuth(ctx)
// Ensure schema_migrations exists first
if _, err := db.Query(internalCtx, "CREATE TABLE IF NOT EXISTS schema_migrations (\n\tversion INTEGER PRIMARY KEY,\n\tapplied_at TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))\n)"); err != nil {
return err
}
// Locate migrations directory relative to CWD
migDir := "migrations"
if fi, err := os.Stat(migDir); err != nil || !fi.IsDir() {
return errNoMigrationsFound
}
entries, err := os.ReadDir(migDir)
if err != nil {
return err
}
type mig struct {
ver int
path string
}
migrations := make([]mig, 0)
for _, e := range entries {
if e.IsDir() {
continue
}
name := e.Name()
if !strings.HasSuffix(strings.ToLower(name), ".sql") {
continue
}
if ver, ok := parseMigrationVersion(name); ok {
migrations = append(migrations, mig{ver: ver, path: filepath.Join(migDir, name)})
}
}
if len(migrations) == 0 {
return errNoMigrationsFound
}
sort.Slice(migrations, func(i, j int) bool { return migrations[i].ver < migrations[j].ver })
// Helper to check if version applied
isApplied := func(ctx context.Context, v int) (bool, error) {
res, err := db.Query(ctx, "SELECT 1 FROM schema_migrations WHERE version = ? LIMIT 1", v)
if err != nil {
return false, err
}
return res != nil && res.Count > 0, nil
}
for _, m := range migrations {
applied, err := isApplied(internalCtx, m.ver)
if err != nil {
return err
}
if applied {
continue
}
// Read and split SQL file into statements
content, err := os.ReadFile(m.path)
if err != nil {
return err
}
stmts := splitSQLStatements(string(content))
for _, s := range stmts {
if s == "" {
continue
}
if _, err := db.Query(internalCtx, s); err != nil {
return err
}
}
// Mark as applied
if _, err := db.Query(internalCtx, "INSERT INTO schema_migrations (version) VALUES (?)", m.ver); err != nil {
return err
}
g.logger.ComponentInfo(logging.ComponentDatabase, "applied migration", zap.Int("version", m.ver), zap.String("file", m.path))
}
return nil
}
func parseMigrationVersion(name string) (int, bool) {
i := 0
for i < len(name) && name[i] >= '0' && name[i] <= '9' {
i++
}
if i == 0 {
return 0, false
}
v, err := strconv.Atoi(name[:i])
if err != nil {
return 0, false
}
return v, true
}
func splitSQLStatements(sqlText string) []string {
lines := strings.Split(sqlText, "\n")
cleaned := make([]string, 0, len(lines))
for _, ln := range lines {
s := strings.TrimSpace(ln)
if s == "" {
continue
}
// Handle inline comments by removing everything after --
if commentIdx := strings.Index(s, "--"); commentIdx >= 0 {
s = strings.TrimSpace(s[:commentIdx])
if s == "" {
continue // line was only a comment
}
}
upper := strings.ToUpper(s)
if upper == "BEGIN;" || upper == "COMMIT;" || upper == "BEGIN" || upper == "COMMIT" {
continue
}
if strings.HasPrefix(upper, "INSERT") && strings.Contains(upper, "SCHEMA_MIGRATIONS") {
// ignore in-file migration markers
continue
}
cleaned = append(cleaned, s)
}
// Join and split by ';'
joined := strings.Join(cleaned, "\n")
parts := strings.Split(joined, ";")
out := make([]string, 0, len(parts))
for _, p := range parts {
sp := strings.TrimSpace(p)
if sp == "" {
continue
}
out = append(out, sp+";")
}
return out
}

View File

@ -1,42 +0,0 @@
package gateway
import "testing"
func TestParseMigrationVersion(t *testing.T) {
cases := map[string]struct{
name string
ok bool
}{
"001_init.sql": {"001_init.sql", true},
"10foobar.SQL": {"10foobar.SQL", true},
"abc.sql": {"abc.sql", false},
"": {"", false},
"123_no_ext": {"123_no_ext", true},
}
for _, c := range cases {
_, ok := parseMigrationVersion(c.name)
if ok != c.ok {
t.Fatalf("for %q expected %v got %v", c.name, c.ok, ok)
}
}
}
func TestSplitSQLStatements(t *testing.T) {
in := `-- comment
BEGIN;
CREATE TABLE t (id INTEGER);
-- another
INSERT INTO t VALUES (1); -- inline comment
COMMIT;
`
out := splitSQLStatements(in)
if len(out) != 2 {
t.Fatalf("expected 2 statements, got %d: %#v", len(out), out)
}
if out[0] != "CREATE TABLE t (id INTEGER);" {
t.Fatalf("unexpected first: %q", out[0])
}
if out[1] != "INSERT INTO t VALUES (1);" {
t.Fatalf("unexpected second: %q", out[1])
}
}

View File

@ -2,22 +2,35 @@ package gateway
import ( import (
"context" "context"
"errors"
"strings"
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
) )
// resolveNamespaceID ensures the given namespace exists and returns its primary key ID.
// Falls back to "default" when ns is empty. Uses internal auth context for system operations.
func (g *Gateway) resolveNamespaceID(ctx context.Context, ns string) (interface{}, error) { func (g *Gateway) resolveNamespaceID(ctx context.Context, ns string) (interface{}, error) {
// Use internal context to bypass authentication for system operations if g == nil || g.client == nil {
return nil, errors.New("client not initialized")
}
ns = strings.TrimSpace(ns)
if ns == "" {
ns = "default"
}
internalCtx := client.WithInternalAuth(ctx) internalCtx := client.WithInternalAuth(ctx)
db := g.client.Database() db := g.client.Database()
if _, err := db.Query(internalCtx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil { if _, err := db.Query(internalCtx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil {
return nil, err return nil, err
} }
res, err := db.Query(internalCtx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns) res, err := db.Query(internalCtx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns)
if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { if err != nil {
return nil, err return nil, err
} }
if res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 {
return nil, errors.New("failed to resolve namespace")
}
return res.Rows[0][0], nil return res.Rows[0][0], nil
} }
// Deprecated: seeding API keys from config is removed.

View File

@ -9,7 +9,7 @@ import (
"time" "time"
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/storage"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -190,7 +190,7 @@ func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) {
// resolveNamespaceFromRequest gets namespace from context set by auth middleware // resolveNamespaceFromRequest gets namespace from context set by auth middleware
func resolveNamespaceFromRequest(r *http.Request) string { func resolveNamespaceFromRequest(r *http.Request) string {
if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil { if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok { if s, ok := v.(string); ok {
return s return s
} }

View File

@ -27,16 +27,11 @@ func (g *Gateway) Routes() http.Handler {
mux.HandleFunc("/v1/auth/logout", g.logoutHandler) mux.HandleFunc("/v1/auth/logout", g.logoutHandler)
mux.HandleFunc("/v1/auth/whoami", g.whoamiHandler) mux.HandleFunc("/v1/auth/whoami", g.whoamiHandler)
// apps CRUD // rqlite ORM HTTP gateway (mounts /v1/db/* endpoints)
mux.HandleFunc("/v1/apps", g.appsHandler) if g.ormHTTP != nil {
mux.HandleFunc("/v1/apps/", g.appsHandler) g.ormHTTP.BasePath = "/v1/rqlite"
g.ormHTTP.RegisterRoutes(mux)
// database }
mux.HandleFunc("/v1/db/query", g.dbQueryHandler)
mux.HandleFunc("/v1/db/transaction", g.dbTransactionHandler)
mux.HandleFunc("/v1/db/schema", g.dbSchemaHandler)
mux.HandleFunc("/v1/db/create-table", g.dbCreateTableHandler)
mux.HandleFunc("/v1/db/drop-table", g.dbDropTableHandler)
// network // network
mux.HandleFunc("/v1/network/status", g.networkStatusHandler) mux.HandleFunc("/v1/network/status", g.networkStatusHandler)

View File

@ -3,127 +3,9 @@ package gateway
import ( import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/pubsub"
) )
// Database HTTP handlers // Database HTTP handlers
func (g *Gateway) dbQueryHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var body struct {
SQL string `json:"sql"`
Args []any `json:"args"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.SQL == "" {
writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}")
return
}
ctx := client.WithInternalAuth(r.Context())
res, err := g.client.Database().Query(ctx, body.SQL, body.Args...)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, res)
}
func (g *Gateway) dbTransactionHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var body struct {
Statements []string `json:"statements"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Statements) == 0 {
writeError(w, http.StatusBadRequest, "invalid body: {statements:[...]}")
return
}
ctx := client.WithInternalAuth(r.Context())
if err := g.client.Database().Transaction(ctx, body.Statements); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
func (g *Gateway) dbSchemaHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
ctx := client.WithInternalAuth(r.Context())
schema, err := g.client.Database().GetSchema(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, schema)
}
func (g *Gateway) dbCreateTableHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var body struct {
Schema string `json:"schema"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Schema == "" {
writeError(w, http.StatusBadRequest, "invalid body: {schema}")
return
}
ctx := client.WithInternalAuth(r.Context())
if err := g.client.Database().CreateTable(ctx, body.Schema); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{"status": "ok"})
}
func (g *Gateway) dbDropTableHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var body struct {
Table string `json:"table"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Table == "" {
writeError(w, http.StatusBadRequest, "invalid body: {table}")
return
}
ctx := client.WithInternalAuth(r.Context())
if err := g.client.Database().DropTable(ctx, body.Table); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) { func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil { if g.client == nil {
@ -204,7 +86,7 @@ func (g *Gateway) validateNamespaceParam(r *http.Request) bool {
if qns == "" { if qns == "" {
return true return true
} }
if v := r.Context().Value(pubsub.CtxKeyNamespaceOverride); v != nil { if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" { if s, ok := v.(string); ok && s != "" {
return s == qns return s == qns
} }

View File

@ -23,9 +23,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/DeBrosOfficial/network/pkg/config" "github.com/DeBrosOfficial/network/pkg/config"
"github.com/DeBrosOfficial/network/pkg/database"
"github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/logging"
"github.com/DeBrosOfficial/network/pkg/pubsub" "github.com/DeBrosOfficial/network/pkg/pubsub"
database "github.com/DeBrosOfficial/network/pkg/rqlite"
) )
// Node represents a network node with RQLite database // Node represents a network node with RQLite database

View File

@ -1,4 +1,4 @@
package database package rqlite
import ( import (
"database/sql" "database/sql"

835
pkg/rqlite/client.go Normal file
View File

@ -0,0 +1,835 @@
package rqlite
// client.go defines the ORM-like interfaces and a minimal implementation over database/sql.
// It builds on the rqlite stdlib driver so it behaves like a regular SQL-backed ORM.
import (
"context"
"database/sql"
"errors"
"fmt"
"reflect"
"strings"
"time"
)
// TableNamer lets a struct provide its table name.
type TableNamer interface {
TableName() string
}
// Client is the high-level ORM-like API.
type Client interface {
// Query runs an arbitrary SELECT and scans rows into dest (pointer to slice of structs or []map[string]any).
Query(ctx context.Context, dest any, query string, args ...any) error
// Exec runs a write statement (INSERT/UPDATE/DELETE).
Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
// FindBy/FindOneBy provide simple map-based criteria filtering.
FindBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error
FindOneBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error
// Save inserts or updates an entity (single-PK).
Save(ctx context.Context, entity any) error
// Remove deletes by PK (single-PK).
Remove(ctx context.Context, entity any) error
// Repositories (generic layer). Optional but convenient if you use Go generics.
Repository(table string) any
// Fluent query builder for advanced querying.
CreateQueryBuilder(table string) *QueryBuilder
// Tx executes a function within a transaction.
Tx(ctx context.Context, fn func(tx Tx) error) error
}
// Tx mirrors Client but executes within a transaction.
type Tx interface {
Query(ctx context.Context, dest any, query string, args ...any) error
Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
CreateQueryBuilder(table string) *QueryBuilder
// Optional: scoped Save/Remove inside tx
Save(ctx context.Context, entity any) error
Remove(ctx context.Context, entity any) error
}
// Repository provides typed entity operations for a table.
type Repository[T any] interface {
Find(ctx context.Context, dest *[]T, criteria map[string]any, opts ...FindOption) error
FindOne(ctx context.Context, dest *T, criteria map[string]any, opts ...FindOption) error
Save(ctx context.Context, entity *T) error
Remove(ctx context.Context, entity *T) error
// Builder helpers
Q() *QueryBuilder
}
// NewClient wires the ORM client to a *sql.DB (from your RQLiteAdapter).
func NewClient(db *sql.DB) Client {
return &client{db: db}
}
// NewClientFromAdapter is convenient if you already created the adapter.
func NewClientFromAdapter(adapter *RQLiteAdapter) Client {
return NewClient(adapter.GetSQLDB())
}
// client implements Client over *sql.DB.
type client struct {
db *sql.DB
}
func (c *client) Query(ctx context.Context, dest any, query string, args ...any) error {
rows, err := c.db.QueryContext(ctx, query, args...)
if err != nil {
return err
}
defer rows.Close()
return scanIntoDest(rows, dest)
}
func (c *client) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) {
return c.db.ExecContext(ctx, query, args...)
}
func (c *client) FindBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error {
qb := c.CreateQueryBuilder(table)
for k, v := range criteria {
qb = qb.AndWhere(fmt.Sprintf("%s = ?", k), v)
}
for _, opt := range opts {
opt(qb)
}
return qb.GetMany(ctx, dest)
}
func (c *client) FindOneBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error {
qb := c.CreateQueryBuilder(table)
for k, v := range criteria {
qb = qb.AndWhere(fmt.Sprintf("%s = ?", k), v)
}
for _, opt := range opts {
opt(qb)
}
return qb.GetOne(ctx, dest)
}
func (c *client) Save(ctx context.Context, entity any) error {
return saveEntity(ctx, c.db, entity)
}
func (c *client) Remove(ctx context.Context, entity any) error {
return removeEntity(ctx, c.db, entity)
}
func (c *client) Repository(table string) any {
// This returns an untyped interface since Go methods cannot have type parameters
// Users will need to type assert the result to Repository[T]
return func() any {
return &repository[any]{c: c, table: table}
}()
}
func (c *client) CreateQueryBuilder(table string) *QueryBuilder {
return newQueryBuilder(c.db, table)
}
func (c *client) Tx(ctx context.Context, fn func(tx Tx) error) error {
sqlTx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return err
}
txc := &txClient{tx: sqlTx}
if err := fn(txc); err != nil {
_ = sqlTx.Rollback()
return err
}
return sqlTx.Commit()
}
// txClient implements Tx over *sql.Tx.
type txClient struct {
tx *sql.Tx
}
func (t *txClient) Query(ctx context.Context, dest any, query string, args ...any) error {
rows, err := t.tx.QueryContext(ctx, query, args...)
if err != nil {
return err
}
defer rows.Close()
return scanIntoDest(rows, dest)
}
func (t *txClient) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) {
return t.tx.ExecContext(ctx, query, args...)
}
func (t *txClient) CreateQueryBuilder(table string) *QueryBuilder {
return newQueryBuilder(t.tx, table)
}
func (t *txClient) Save(ctx context.Context, entity any) error {
return saveEntity(ctx, t.tx, entity)
}
func (t *txClient) Remove(ctx context.Context, entity any) error {
return removeEntity(ctx, t.tx, entity)
}
// executor is implemented by *sql.DB and *sql.Tx.
type executor interface {
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
}
// QueryBuilder implements a fluent SELECT builder with joins, where, etc.
type QueryBuilder struct {
exec executor
table string
alias string
selects []string
joins []joinClause
wheres []whereClause
groupBys []string
orderBys []string
limit *int
offset *int
}
// joinClause represents INNER/LEFT/etc joins.
type joinClause struct {
kind string // "INNER", "LEFT", "JOIN" (default)
table string
on string
}
// whereClause holds an expression and args with a conjunction.
type whereClause struct {
conj string // "AND" or "OR"
expr string
args []any
}
func newQueryBuilder(exec executor, table string) *QueryBuilder {
return &QueryBuilder{
exec: exec,
table: table,
}
}
func (qb *QueryBuilder) Select(cols ...string) *QueryBuilder {
qb.selects = append(qb.selects, cols...)
return qb
}
func (qb *QueryBuilder) Alias(a string) *QueryBuilder {
qb.alias = a
return qb
}
func (qb *QueryBuilder) Where(expr string, args ...any) *QueryBuilder {
return qb.AndWhere(expr, args...)
}
func (qb *QueryBuilder) AndWhere(expr string, args ...any) *QueryBuilder {
qb.wheres = append(qb.wheres, whereClause{conj: "AND", expr: expr, args: args})
return qb
}
func (qb *QueryBuilder) OrWhere(expr string, args ...any) *QueryBuilder {
qb.wheres = append(qb.wheres, whereClause{conj: "OR", expr: expr, args: args})
return qb
}
func (qb *QueryBuilder) InnerJoin(table string, on string) *QueryBuilder {
qb.joins = append(qb.joins, joinClause{kind: "INNER", table: table, on: on})
return qb
}
func (qb *QueryBuilder) LeftJoin(table string, on string) *QueryBuilder {
qb.joins = append(qb.joins, joinClause{kind: "LEFT", table: table, on: on})
return qb
}
func (qb *QueryBuilder) Join(table string, on string) *QueryBuilder {
qb.joins = append(qb.joins, joinClause{kind: "JOIN", table: table, on: on})
return qb
}
func (qb *QueryBuilder) GroupBy(cols ...string) *QueryBuilder {
qb.groupBys = append(qb.groupBys, cols...)
return qb
}
func (qb *QueryBuilder) OrderBy(exprs ...string) *QueryBuilder {
qb.orderBys = append(qb.orderBys, exprs...)
return qb
}
func (qb *QueryBuilder) Limit(n int) *QueryBuilder {
qb.limit = &n
return qb
}
func (qb *QueryBuilder) Offset(n int) *QueryBuilder {
qb.offset = &n
return qb
}
// Build returns the SQL string and args for a SELECT.
func (qb *QueryBuilder) Build() (string, []any) {
cols := "*"
if len(qb.selects) > 0 {
cols = strings.Join(qb.selects, ", ")
}
base := fmt.Sprintf("SELECT %s FROM %s", cols, qb.table)
if qb.alias != "" {
base += " AS " + qb.alias
}
args := make([]any, 0, 16)
for _, j := range qb.joins {
base += fmt.Sprintf(" %s JOIN %s ON %s", j.kind, j.table, j.on)
}
if len(qb.wheres) > 0 {
base += " WHERE "
for i, w := range qb.wheres {
if i > 0 {
base += " " + w.conj + " "
}
base += "(" + w.expr + ")"
args = append(args, w.args...)
}
}
if len(qb.groupBys) > 0 {
base += " GROUP BY " + strings.Join(qb.groupBys, ", ")
}
if len(qb.orderBys) > 0 {
base += " ORDER BY " + strings.Join(qb.orderBys, ", ")
}
if qb.limit != nil {
base += fmt.Sprintf(" LIMIT %d", *qb.limit)
}
if qb.offset != nil {
base += fmt.Sprintf(" OFFSET %d", *qb.offset)
}
return base, args
}
// GetMany executes the built query and scans into dest (pointer to slice).
func (qb *QueryBuilder) GetMany(ctx context.Context, dest any) error {
sqlStr, args := qb.Build()
rows, err := qb.exec.QueryContext(ctx, sqlStr, args...)
if err != nil {
return err
}
defer rows.Close()
return scanIntoDest(rows, dest)
}
// GetOne executes the built query and scans into dest (pointer to struct or map) with LIMIT 1.
func (qb *QueryBuilder) GetOne(ctx context.Context, dest any) error {
limit := 1
if qb.limit == nil {
qb.limit = &limit
} else if qb.limit != nil && *qb.limit > 1 {
qb.limit = &limit
}
sqlStr, args := qb.Build()
rows, err := qb.exec.QueryContext(ctx, sqlStr, args...)
if err != nil {
return err
}
defer rows.Close()
if !rows.Next() {
return sql.ErrNoRows
}
return scanIntoSingle(rows, dest)
}
// FindOption customizes Find queries.
type FindOption func(q *QueryBuilder)
func WithOrderBy(exprs ...string) FindOption {
return func(q *QueryBuilder) { q.OrderBy(exprs...) }
}
func WithGroupBy(cols ...string) FindOption {
return func(q *QueryBuilder) { q.GroupBy(cols...) }
}
func WithLimit(n int) FindOption {
return func(q *QueryBuilder) { q.Limit(n) }
}
func WithOffset(n int) FindOption {
return func(q *QueryBuilder) { q.Offset(n) }
}
func WithSelect(cols ...string) FindOption {
return func(q *QueryBuilder) { q.Select(cols...) }
}
func WithJoin(kind, table, on string) FindOption {
return func(q *QueryBuilder) {
switch strings.ToUpper(kind) {
case "INNER":
q.InnerJoin(table, on)
case "LEFT":
q.LeftJoin(table, on)
default:
q.Join(table, on)
}
}
}
// repository is a generic table repository for type T.
type repository[T any] struct {
c *client
table string
}
func (r *repository[T]) Find(ctx context.Context, dest *[]T, criteria map[string]any, opts ...FindOption) error {
qb := r.c.CreateQueryBuilder(r.table)
for k, v := range criteria {
qb.AndWhere(fmt.Sprintf("%s = ?", k), v)
}
for _, opt := range opts {
opt(qb)
}
return qb.GetMany(ctx, dest)
}
func (r *repository[T]) FindOne(ctx context.Context, dest *T, criteria map[string]any, opts ...FindOption) error {
qb := r.c.CreateQueryBuilder(r.table)
for k, v := range criteria {
qb.AndWhere(fmt.Sprintf("%s = ?", k), v)
}
for _, opt := range opts {
opt(qb)
}
return qb.GetOne(ctx, dest)
}
func (r *repository[T]) Save(ctx context.Context, entity *T) error {
return saveEntity(ctx, r.c.db, entity)
}
func (r *repository[T]) Remove(ctx context.Context, entity *T) error {
return removeEntity(ctx, r.c.db, entity)
}
func (r *repository[T]) Q() *QueryBuilder {
return r.c.CreateQueryBuilder(r.table)
}
// -----------------------
// Reflection + scanning
// -----------------------
func scanIntoDest(rows *sql.Rows, dest any) error {
// dest must be pointer to slice (of struct or map)
rv := reflect.ValueOf(dest)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return errors.New("dest must be a non-nil pointer")
}
sliceVal := rv.Elem()
if sliceVal.Kind() != reflect.Slice {
return errors.New("dest must be pointer to a slice")
}
elemType := sliceVal.Type().Elem()
cols, err := rows.Columns()
if err != nil {
return err
}
for rows.Next() {
itemPtr := reflect.New(elemType)
// Support map[string]any and struct
if elemType.Kind() == reflect.Map {
m, err := scanRowToMap(rows, cols)
if err != nil {
return err
}
sliceVal.Set(reflect.Append(sliceVal, reflect.ValueOf(m)))
continue
}
if elemType.Kind() == reflect.Struct {
if err := scanCurrentRowIntoStruct(rows, cols, itemPtr.Elem()); err != nil {
return err
}
sliceVal.Set(reflect.Append(sliceVal, itemPtr.Elem()))
continue
}
return fmt.Errorf("unsupported slice element type: %s", elemType.Kind())
}
return rows.Err()
}
func scanIntoSingle(rows *sql.Rows, dest any) error {
rv := reflect.ValueOf(dest)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return errors.New("dest must be a non-nil pointer")
}
cols, err := rows.Columns()
if err != nil {
return err
}
switch rv.Elem().Kind() {
case reflect.Map:
m, err := scanRowToMap(rows, cols)
if err != nil {
return err
}
rv.Elem().Set(reflect.ValueOf(m))
return nil
case reflect.Struct:
return scanCurrentRowIntoStruct(rows, cols, rv.Elem())
default:
return fmt.Errorf("unsupported dest kind: %s", rv.Elem().Kind())
}
}
func scanRowToMap(rows *sql.Rows, cols []string) (map[string]any, error) {
raw := make([]any, len(cols))
ptrs := make([]any, len(cols))
for i := range raw {
ptrs[i] = &raw[i]
}
if err := rows.Scan(ptrs...); err != nil {
return nil, err
}
out := make(map[string]any, len(cols))
for i, c := range cols {
out[c] = normalizeSQLValue(raw[i])
}
return out, nil
}
func scanCurrentRowIntoStruct(rows *sql.Rows, cols []string, destStruct reflect.Value) error {
raw := make([]any, len(cols))
ptrs := make([]any, len(cols))
for i := range raw {
ptrs[i] = &raw[i]
}
if err := rows.Scan(ptrs...); err != nil {
return err
}
fieldIndex := buildFieldIndex(destStruct.Type())
for i, c := range cols {
if idx, ok := fieldIndex[strings.ToLower(c)]; ok {
field := destStruct.Field(idx)
if field.CanSet() {
if err := setReflectValue(field, raw[i]); err != nil {
return fmt.Errorf("column %s: %w", c, err)
}
}
}
}
return nil
}
func normalizeSQLValue(v any) any {
switch t := v.(type) {
case []byte:
return string(t)
default:
return v
}
}
func buildFieldIndex(t reflect.Type) map[string]int {
m := make(map[string]int)
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
if f.IsExported() == false {
continue
}
tag := f.Tag.Get("db")
col := ""
if tag != "" {
col = strings.Split(tag, ",")[0]
}
if col == "" {
col = f.Name
}
m[strings.ToLower(col)] = i
}
return m
}
func setReflectValue(field reflect.Value, raw any) error {
if raw == nil {
// leave zero value
return nil
}
switch field.Kind() {
case reflect.String:
switch v := raw.(type) {
case string:
field.SetString(v)
case []byte:
field.SetString(string(v))
default:
field.SetString(fmt.Sprint(v))
}
case reflect.Bool:
switch v := raw.(type) {
case bool:
field.SetBool(v)
case int64:
field.SetBool(v != 0)
case []byte:
s := string(v)
field.SetBool(s == "1" || strings.EqualFold(s, "true"))
default:
field.SetBool(false)
}
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
switch v := raw.(type) {
case int64:
field.SetInt(v)
case []byte:
var n int64
fmt.Sscan(string(v), &n)
field.SetInt(n)
default:
return fmt.Errorf("cannot convert %T to int", raw)
}
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
switch v := raw.(type) {
case int64:
if v < 0 {
v = 0
}
field.SetUint(uint64(v))
case []byte:
var n uint64
fmt.Sscan(string(v), &n)
field.SetUint(n)
default:
return fmt.Errorf("cannot convert %T to uint", raw)
}
case reflect.Float32, reflect.Float64:
switch v := raw.(type) {
case float64:
field.SetFloat(v)
case []byte:
var fv float64
fmt.Sscan(string(v), &fv)
field.SetFloat(fv)
default:
return fmt.Errorf("cannot convert %T to float", raw)
}
case reflect.Struct:
// Support time.Time; extend as needed.
if field.Type() == reflect.TypeOf(time.Time{}) {
switch v := raw.(type) {
case time.Time:
field.Set(reflect.ValueOf(v))
case []byte:
// Try RFC3339
if tt, err := time.Parse(time.RFC3339, string(v)); err == nil {
field.Set(reflect.ValueOf(tt))
}
}
return nil
}
fallthrough
default:
// Not supported yet
return fmt.Errorf("unsupported dest field kind: %s", field.Kind())
}
return nil
}
// -----------------------
// Save/Remove (basic PK)
// -----------------------
type fieldMeta struct {
index int
column string
isPK bool
auto bool
}
func collectMeta(t reflect.Type) (fields []fieldMeta, pk fieldMeta, hasPK bool) {
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
if !f.IsExported() {
continue
}
tag := f.Tag.Get("db")
if tag == "-" {
continue
}
opts := strings.Split(tag, ",")
col := opts[0]
if col == "" {
col = f.Name
}
meta := fieldMeta{index: i, column: col}
for _, o := range opts[1:] {
switch strings.ToLower(strings.TrimSpace(o)) {
case "pk":
meta.isPK = true
case "auto", "autoincrement":
meta.auto = true
}
}
// If not tagged as pk, fallback to field name "ID"
if !meta.isPK && f.Name == "ID" {
meta.isPK = true
if col == "" {
meta.column = "id"
}
}
fields = append(fields, meta)
if meta.isPK {
pk = meta
hasPK = true
}
}
return
}
func getTableNameFromEntity(v reflect.Value) (string, bool) {
// If entity implements TableNamer
if v.CanInterface() {
if tn, ok := v.Interface().(TableNamer); ok {
return tn.TableName(), true
}
}
// Fallback: very naive pluralization (append 's')
typ := v.Type()
if typ.Kind() == reflect.Pointer {
typ = typ.Elem()
}
if typ.Kind() == reflect.Struct {
return strings.ToLower(typ.Name()) + "s", true
}
return "", false
}
func saveEntity(ctx context.Context, exec executor, entity any) error {
rv := reflect.ValueOf(entity)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return errors.New("entity must be a non-nil pointer to struct")
}
ev := rv.Elem()
if ev.Kind() != reflect.Struct {
return errors.New("entity must point to a struct")
}
fields, pkMeta, hasPK := collectMeta(ev.Type())
if !hasPK {
return errors.New("no primary key field found (tag db:\"...,pk\" or field named ID)")
}
table, ok := getTableNameFromEntity(ev)
if !ok || table == "" {
return errors.New("unable to resolve table name; implement TableNamer or set up a repository with explicit table")
}
// Build lists
cols := make([]string, 0, len(fields))
vals := make([]any, 0, len(fields))
setParts := make([]string, 0, len(fields))
var pkVal any
var pkIsZero bool
for _, fm := range fields {
f := ev.Field(fm.index)
if fm.isPK {
pkVal = f.Interface()
pkIsZero = isZeroValue(f)
continue
}
cols = append(cols, fm.column)
vals = append(vals, f.Interface())
setParts = append(setParts, fmt.Sprintf("%s = ?", fm.column))
}
if pkIsZero {
// INSERT
placeholders := strings.Repeat("?,", len(cols))
if len(placeholders) > 0 {
placeholders = placeholders[:len(placeholders)-1]
}
sqlStr := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", table, strings.Join(cols, ", "), placeholders)
res, err := exec.ExecContext(ctx, sqlStr, vals...)
if err != nil {
return err
}
// Set auto ID if needed
if pkMeta.auto {
if id, err := res.LastInsertId(); err == nil {
ev.Field(pkMeta.index).SetInt(id)
}
}
return nil
}
// UPDATE ... WHERE pk = ?
sqlStr := fmt.Sprintf("UPDATE %s SET %s WHERE %s = ?", table, strings.Join(setParts, ", "), pkMeta.column)
valsWithPK := append(vals, pkVal)
_, err := exec.ExecContext(ctx, sqlStr, valsWithPK...)
return err
}
func removeEntity(ctx context.Context, exec executor, entity any) error {
rv := reflect.ValueOf(entity)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return errors.New("entity must be a non-nil pointer to struct")
}
ev := rv.Elem()
if ev.Kind() != reflect.Struct {
return errors.New("entity must point to a struct")
}
_, pkMeta, hasPK := collectMeta(ev.Type())
if !hasPK {
return errors.New("no primary key field found")
}
table, ok := getTableNameFromEntity(ev)
if !ok || table == "" {
return errors.New("unable to resolve table name")
}
pkVal := ev.Field(pkMeta.index).Interface()
sqlStr := fmt.Sprintf("DELETE FROM %s WHERE %s = ?", table, pkMeta.column)
_, err := exec.ExecContext(ctx, sqlStr, pkVal)
return err
}
func isZeroValue(v reflect.Value) bool {
switch v.Kind() {
case reflect.String:
return v.Len() == 0
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return v.Int() == 0
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return v.Uint() == 0
case reflect.Bool:
return v.Bool() == false
case reflect.Pointer, reflect.Interface:
return v.IsNil()
case reflect.Slice, reflect.Map:
return v.Len() == 0
case reflect.Struct:
// Special-case time.Time
if v.Type() == reflect.TypeOf(time.Time{}) {
t := v.Interface().(time.Time)
return t.IsZero()
}
zero := reflect.Zero(v.Type())
return reflect.DeepEqual(v.Interface(), zero.Interface())
default:
return false
}
}

615
pkg/rqlite/gateway.go Normal file
View File

@ -0,0 +1,615 @@
package rqlite
// HTTP gateway for the rqlite ORM client.
//
// This file exposes a minimal, SDK-friendly HTTP interface over the ORM-like
// client defined in client.go. It maps high-level operations (Query, Exec,
// FindBy, FindOneBy, QueryBuilder-based SELECTs, Transactions) and a few schema
// helpers into JSON-over-HTTP endpoints that can be called from any language.
//
// Endpoints (under BasePath, default: /v1/db):
// - POST {base}/query -> arbitrary SELECT; returns rows as []map[string]any
// - POST {base}/exec -> write statement (INSERT/UPDATE/DELETE/DDL); returns {rows_affected,last_insert_id}
// - POST {base}/find -> FindBy(table, criteria, opts...) -> returns []map
// - POST {base}/find-one -> FindOneBy(table, criteria, opts...) -> returns map
// - POST {base}/select -> Fluent SELECT builder via JSON (joins, where, order, group, limit, offset); returns []map or one map if one=true
// - POST {base}/transaction -> Execute a sequence of exec/query ops atomically; optionally return results
//
// Schema helpers (convenience; powered via Exec/Query):
// - GET {base}/schema -> list of user tables/views and create SQL
// - POST {base}/create-table -> {schema: "CREATE TABLE ..."} -> status ok
// - POST {base}/drop-table -> {table: "name"} -> status ok (safe-validated identifier)
//
// Notes:
// - All numbers in JSON are decoded as float64 by default; we best-effort coerce
// integral values to int64 for SQL placeholders.
// - The Save/Remove reflection helpers in the ORM require concrete Go structs;
// exposing them generically over HTTP is not portable. Prefer using the Exec
// and Find APIs, or the Select builder for CRUD-like flows.
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"time"
)
// HTTPGateway exposes the ORM Client as a set of HTTP handlers.
type HTTPGateway struct {
// Client is the ORM-like rqlite client to execute operations against.
Client Client
// BasePath is the prefix for all routes, e.g. "/v1/db".
// If empty, defaults to "/v1/db". A trailing slash is trimmed.
BasePath string
// Optional: Request timeout. If > 0, handlers will use a context with this timeout.
Timeout time.Duration
}
// NewHTTPGateway constructs a new HTTPGateway with sensible defaults.
func NewHTTPGateway(c Client, base string) *HTTPGateway {
return &HTTPGateway{
Client: c,
BasePath: base,
}
}
// RegisterRoutes registers all handlers onto the provided mux under BasePath.
func (g *HTTPGateway) RegisterRoutes(mux *http.ServeMux) {
base := g.base()
mux.HandleFunc(base+"/query", g.handleQuery)
mux.HandleFunc(base+"/exec", g.handleExec)
mux.HandleFunc(base+"/find", g.handleFind)
mux.HandleFunc(base+"/find-one", g.handleFindOne)
mux.HandleFunc(base+"/select", g.handleSelect)
// Keep "transaction" for compatibility with existing routes.
mux.HandleFunc(base+"/transaction", g.handleTransaction)
// Schema helpers
mux.HandleFunc(base+"/schema", g.handleSchema)
mux.HandleFunc(base+"/create-table", g.handleCreateTable)
mux.HandleFunc(base+"/drop-table", g.handleDropTable)
}
func (g *HTTPGateway) base() string {
b := strings.TrimSpace(g.BasePath)
if b == "" {
b = "/v1/db"
}
if b != "/" {
b = strings.TrimRight(b, "/")
}
return b
}
func (g *HTTPGateway) withTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
if g.Timeout > 0 {
return context.WithTimeout(ctx, g.Timeout)
}
return context.WithCancel(ctx)
}
// --------------------
// Common HTTP helpers
// --------------------
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
func writeError(w http.ResponseWriter, code int, msg string) {
writeJSON(w, code, map[string]any{"error": msg})
}
func onlyMethod(w http.ResponseWriter, r *http.Request, method string) bool {
if r.Method != method {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return false
}
return true
}
// Normalize JSON-decoded args for SQL placeholders.
// - Convert float64 with integral value to int64 to better match SQLite expectations.
// - Leave strings, bools and nulls as-is.
// - Recursively normalizes nested arrays if present.
func normalizeArgs(args []any) []any {
out := make([]any, len(args))
for i, a := range args {
switch v := a.(type) {
case float64:
// If v is integral (within epsilon), convert to int64
if v == float64(int64(v)) {
out[i] = int64(v)
} else {
out[i] = v
}
case []any:
out[i] = normalizeArgs(v)
default:
out[i] = a
}
}
return out
}
// --------------------
// Request DTOs
// --------------------
type queryRequest struct {
SQL string `json:"sql"`
Args []any `json:"args"`
}
type execRequest struct {
SQL string `json:"sql"`
Args []any `json:"args"`
}
type findOptions struct {
Select []string `json:"select"`
OrderBy []string `json:"order_by"`
GroupBy []string `json:"group_by"`
Limit *int `json:"limit"`
Offset *int `json:"offset"`
Joins []joinBody `json:"joins"`
}
type findRequest struct {
Table string `json:"table"`
Criteria map[string]any `json:"criteria"`
Options findOptions `json:"options"`
// Back-compat: allow options at top-level too
Select []string `json:"select"`
OrderBy []string `json:"order_by"`
GroupBy []string `json:"group_by"`
Limit *int `json:"limit"`
Offset *int `json:"offset"`
Joins []joinBody `json:"joins"`
}
type findOneRequest = findRequest
type joinBody struct {
Kind string `json:"kind"` // "INNER" | "LEFT" | "JOIN"
Table string `json:"table"` // table name
On string `json:"on"` // join condition
}
type whereBody struct {
Conj string `json:"conj"` // "AND" | "OR" (default AND)
Expr string `json:"expr"` // e.g., "a = ? AND b > ?"
Args []any `json:"args"`
}
type selectRequest struct {
Table string `json:"table"`
Alias string `json:"alias"`
Select []string `json:"select"`
Joins []joinBody `json:"joins"`
Where []whereBody `json:"where"`
GroupBy []string `json:"group_by"`
OrderBy []string `json:"order_by"`
Limit *int `json:"limit"`
Offset *int `json:"offset"`
One bool `json:"one"` // if true, returns a single row (object)
}
type txOp struct {
Kind string `json:"kind"` // "exec" | "query"
SQL string `json:"sql"`
Args []any `json:"args"`
}
type transactionRequest struct {
Ops []txOp `json:"ops"`
ReturnResults bool `json:"return_results"` // if true, returns per-op results
StopOnError bool `json:"stop_on_error"` // default true in tx
PartialResults bool `json:"partial_results"` // ignored for actual TX (atomic); kept for API symmetry
}
// --------------------
// Handlers
// --------------------
func (g *HTTPGateway) handleQuery(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodPost) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
var body queryRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.SQL) == "" {
writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}")
return
}
args := normalizeArgs(body.Args)
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
out := make([]map[string]any, 0, 16)
if err := g.Client.Query(ctx, &out, body.SQL, args...); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"items": out,
"count": len(out),
})
}
func (g *HTTPGateway) handleExec(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodPost) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
var body execRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.SQL) == "" {
writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}")
return
}
args := normalizeArgs(body.Args)
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
res, err := g.Client.Exec(ctx, body.SQL, args...)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
liid, _ := res.LastInsertId()
ra, _ := res.RowsAffected()
writeJSON(w, http.StatusOK, map[string]any{
"rows_affected": ra,
"last_insert_id": liid,
"execution_state": "ok",
})
}
func (g *HTTPGateway) handleFind(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodPost) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
var body findRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" {
writeError(w, http.StatusBadRequest, "invalid body: {table, criteria, options?}")
return
}
opts := makeFindOptions(mergeFindOptions(body))
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
out := make([]map[string]any, 0, 32)
if err := g.Client.FindBy(ctx, &out, body.Table, body.Criteria, opts...); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"items": out,
"count": len(out),
})
}
func (g *HTTPGateway) handleFindOne(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodPost) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
var body findOneRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" {
writeError(w, http.StatusBadRequest, "invalid body: {table, criteria, options?}")
return
}
opts := makeFindOptions(mergeFindOptions(body))
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
row := make(map[string]any)
if err := g.Client.FindOneBy(ctx, &row, body.Table, body.Criteria, opts...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
writeError(w, http.StatusNotFound, "not found")
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, row)
}
func (g *HTTPGateway) handleSelect(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodPost) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
var body selectRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" {
writeError(w, http.StatusBadRequest, "invalid body: {table, select?, where?, joins?, order_by?, group_by?, limit?, offset?, one?}")
return
}
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
qb := g.Client.CreateQueryBuilder(body.Table)
if alias := strings.TrimSpace(body.Alias); alias != "" {
qb = qb.Alias(alias)
}
if len(body.Select) > 0 {
qb = qb.Select(body.Select...)
}
// joins
for _, j := range body.Joins {
switch strings.ToUpper(strings.TrimSpace(j.Kind)) {
case "INNER":
qb = qb.InnerJoin(j.Table, j.On)
case "LEFT":
qb = qb.LeftJoin(j.Table, j.On)
default:
qb = qb.Join(j.Table, j.On)
}
}
// where
for _, wcl := range body.Where {
switch strings.ToUpper(strings.TrimSpace(wcl.Conj)) {
case "OR":
qb = qb.OrWhere(wcl.Expr, normalizeArgs(wcl.Args)...)
default:
qb = qb.AndWhere(wcl.Expr, normalizeArgs(wcl.Args)...)
}
}
// group/order/limit/offset
if len(body.GroupBy) > 0 {
qb = qb.GroupBy(body.GroupBy...)
}
if len(body.OrderBy) > 0 {
qb = qb.OrderBy(body.OrderBy...)
}
if body.Limit != nil {
qb = qb.Limit(*body.Limit)
}
if body.Offset != nil {
qb = qb.Offset(*body.Offset)
}
if body.One {
row := make(map[string]any)
if err := qb.GetOne(ctx, &row); err != nil {
if errors.Is(err, sql.ErrNoRows) {
writeError(w, http.StatusNotFound, "not found")
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, row)
return
}
rows := make([]map[string]any, 0, 32)
if err := qb.GetMany(ctx, &rows); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"items": rows,
"count": len(rows),
})
}
func (g *HTTPGateway) handleTransaction(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodPost) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
var body transactionRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Ops) == 0 {
writeError(w, http.StatusBadRequest, "invalid body: {ops:[{kind,sql,args?}], return_results?}")
return
}
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
results := make([]any, 0, len(body.Ops))
err := g.Client.Tx(ctx, func(tx Tx) error {
for _, op := range body.Ops {
switch strings.ToLower(strings.TrimSpace(op.Kind)) {
case "exec":
res, err := tx.Exec(ctx, op.SQL, normalizeArgs(op.Args)...)
if err != nil {
return err
}
if body.ReturnResults {
li, _ := res.LastInsertId()
ra, _ := res.RowsAffected()
results = append(results, map[string]any{
"rows_affected": ra,
"last_insert_id": li,
})
}
case "query":
var rows []map[string]any
if err := tx.Query(ctx, &rows, op.SQL, normalizeArgs(op.Args)...); err != nil {
return err
}
if body.ReturnResults {
results = append(results, rows)
}
default:
return fmt.Errorf("invalid op kind: %s", op.Kind)
}
}
return nil
})
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if body.ReturnResults {
writeJSON(w, http.StatusOK, map[string]any{
"status": "ok",
"results": results,
})
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
// --------------------
// Schema helpers
// --------------------
func (g *HTTPGateway) handleSchema(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodGet) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
sqlText := `SELECT name, type, sql FROM sqlite_master WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' ORDER BY name`
var rows []map[string]any
if err := g.Client.Query(ctx, &rows, sqlText); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{
"objects": rows,
"count": len(rows),
})
}
func (g *HTTPGateway) handleCreateTable(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodPost) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
var body struct {
Schema string `json:"schema"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Schema) == "" {
writeError(w, http.StatusBadRequest, "invalid body: {schema}")
return
}
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
if _, err := g.Client.Exec(ctx, body.Schema); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{"status": "ok"})
}
var identRe = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`)
func (g *HTTPGateway) handleDropTable(w http.ResponseWriter, r *http.Request) {
if !onlyMethod(w, r, http.MethodPost) {
return
}
if g.Client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
var body struct {
Table string `json:"table"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" {
writeError(w, http.StatusBadRequest, "invalid body: {table}")
return
}
tbl := strings.TrimSpace(body.Table)
if !identRe.MatchString(tbl) {
writeError(w, http.StatusBadRequest, "invalid table identifier")
return
}
ctx, cancel := g.withTimeout(r.Context())
defer cancel()
stmt := "DROP TABLE IF EXISTS " + tbl
if _, err := g.Client.Exec(ctx, stmt); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
// --------------------
// Helpers
// --------------------
func mergeFindOptions(fr findRequest) findOptions {
// Prefer nested Options; fallback to top-level legacy fields
if (len(fr.Options.Select)+len(fr.Options.OrderBy)+len(fr.Options.GroupBy)) > 0 ||
fr.Options.Limit != nil || fr.Options.Offset != nil || len(fr.Options.Joins) > 0 {
return fr.Options
}
return findOptions{
Select: fr.Select,
OrderBy: fr.OrderBy,
GroupBy: fr.GroupBy,
Limit: fr.Limit,
Offset: fr.Offset,
Joins: fr.Joins,
}
}
func makeFindOptions(o findOptions) []FindOption {
opts := make([]FindOption, 0, 6)
if len(o.OrderBy) > 0 {
opts = append(opts, WithOrderBy(o.OrderBy...))
}
if len(o.GroupBy) > 0 {
opts = append(opts, WithGroupBy(o.GroupBy...))
}
if o.Limit != nil {
opts = append(opts, WithLimit(*o.Limit))
}
if o.Offset != nil {
opts = append(opts, WithOffset(*o.Offset))
}
if len(o.Select) > 0 {
opts = append(opts, WithSelect(o.Select...))
}
for _, j := range o.Joins {
opts = append(opts, WithJoin(justOrDefault(strings.ToUpper(j.Kind), "JOIN"), j.Table, j.On))
}
return opts
}
func justOrDefault(s, def string) string {
if strings.TrimSpace(s) == "" {
return def
}
return s
}

442
pkg/rqlite/migrations.go Normal file
View File

@ -0,0 +1,442 @@
package rqlite
import (
"context"
"database/sql"
"fmt"
"io/fs"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"unicode"
_ "github.com/rqlite/gorqlite/stdlib"
"go.uber.org/zap"
)
// ApplyMigrations scans a directory for *.sql files, orders them by numeric prefix,
// and applies any that are not yet recorded in schema_migrations(version).
func ApplyMigrations(ctx context.Context, db *sql.DB, dir string, logger *zap.Logger) error {
if logger == nil {
logger = zap.NewNop()
}
if err := ensureMigrationsTable(ctx, db); err != nil {
return fmt.Errorf("ensure schema_migrations: %w", err)
}
files, err := readMigrationFiles(dir)
if err != nil {
return fmt.Errorf("read migration files: %w", err)
}
if len(files) == 0 {
logger.Info("No migrations found", zap.String("dir", dir))
return nil
}
applied, err := loadAppliedVersions(ctx, db)
if err != nil {
return fmt.Errorf("load applied versions: %w", err)
}
for _, mf := range files {
if applied[mf.Version] {
logger.Info("Migration already applied; skipping", zap.Int("version", mf.Version), zap.String("name", mf.Name))
continue
}
sqlBytes, err := os.ReadFile(mf.Path)
if err != nil {
return fmt.Errorf("read migration %s: %w", mf.Path, err)
}
logger.Info("Applying migration", zap.Int("version", mf.Version), zap.String("name", mf.Name))
if err := applySQL(ctx, db, string(sqlBytes)); err != nil {
return fmt.Errorf("apply migration %d (%s): %w", mf.Version, mf.Name, err)
}
if _, err := db.ExecContext(ctx, `INSERT OR IGNORE INTO schema_migrations(version) VALUES (?)`, mf.Version); err != nil {
return fmt.Errorf("record migration %d: %w", mf.Version, err)
}
logger.Info("Migration applied", zap.Int("version", mf.Version), zap.String("name", mf.Name))
}
return nil
}
// ApplyMigrationsDirs applies migrations from multiple directories.
// - Gathers *.sql files from each dir
// - Parses numeric prefix as the version
// - Errors if the same version appears in more than one dir (to avoid ambiguity)
// - Sorts globally by version and applies those not yet in schema_migrations
func ApplyMigrationsDirs(ctx context.Context, db *sql.DB, dirs []string, logger *zap.Logger) error {
if logger == nil {
logger = zap.NewNop()
}
if err := ensureMigrationsTable(ctx, db); err != nil {
return fmt.Errorf("ensure schema_migrations: %w", err)
}
files, err := readMigrationFilesFromDirs(dirs)
if err != nil {
return err
}
if len(files) == 0 {
logger.Info("No migrations found in provided directories", zap.Strings("dirs", dirs))
return nil
}
applied, err := loadAppliedVersions(ctx, db)
if err != nil {
return fmt.Errorf("load applied versions: %w", err)
}
for _, mf := range files {
if applied[mf.Version] {
logger.Info("Migration already applied; skipping", zap.Int("version", mf.Version), zap.String("name", mf.Name), zap.String("path", mf.Path))
continue
}
sqlBytes, err := os.ReadFile(mf.Path)
if err != nil {
return fmt.Errorf("read migration %s: %w", mf.Path, err)
}
logger.Info("Applying migration", zap.Int("version", mf.Version), zap.String("name", mf.Name), zap.String("path", mf.Path))
if err := applySQL(ctx, db, string(sqlBytes)); err != nil {
return fmt.Errorf("apply migration %d (%s): %w", mf.Version, mf.Name, err)
}
if _, err := db.ExecContext(ctx, `INSERT OR IGNORE INTO schema_migrations(version) VALUES (?)`, mf.Version); err != nil {
return fmt.Errorf("record migration %d: %w", mf.Version, err)
}
logger.Info("Migration applied", zap.Int("version", mf.Version), zap.String("name", mf.Name))
}
return nil
}
// ApplyMigrationsFromManager is a convenience helper bound to RQLiteManager.
func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error {
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
if err != nil {
return fmt.Errorf("open rqlite db: %w", err)
}
defer db.Close()
return ApplyMigrations(ctx, db, dir, r.logger)
}
// ApplyMigrationsDirs is the multi-dir variant on RQLiteManager.
func (r *RQLiteManager) ApplyMigrationsDirs(ctx context.Context, dirs []string) error {
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
if err != nil {
return fmt.Errorf("open rqlite db: %w", err)
}
defer db.Close()
return ApplyMigrationsDirs(ctx, db, dirs, r.logger)
}
func ensureMigrationsTable(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)`)
return err
}
type migrationFile struct {
Version int
Name string
Path string
}
func readMigrationFiles(dir string) ([]migrationFile, error) {
entries, err := os.ReadDir(dir)
if err != nil {
if os.IsNotExist(err) {
return []migrationFile{}, nil
}
return nil, err
}
var out []migrationFile
for _, e := range entries {
if e.IsDir() {
continue
}
name := e.Name()
if !strings.HasSuffix(strings.ToLower(name), ".sql") {
continue
}
ver, ok := parseVersionPrefix(name)
if !ok {
continue
}
out = append(out, migrationFile{
Version: ver,
Name: name,
Path: filepath.Join(dir, name),
})
}
sort.Slice(out, func(i, j int) bool { return out[i].Version < out[j].Version })
return out, nil
}
func readMigrationFilesFromDirs(dirs []string) ([]migrationFile, error) {
all := make([]migrationFile, 0, 64)
seen := map[int]string{} // version -> path (for duplicate detection)
for _, d := range dirs {
files, err := readMigrationFiles(d)
if err != nil {
return nil, fmt.Errorf("reading dir %s: %w", d, err)
}
for _, f := range files {
if prev, dup := seen[f.Version]; dup {
return nil, fmt.Errorf("duplicate migration version %d detected in %s and %s; ensure global version uniqueness", f.Version, prev, f.Path)
}
seen[f.Version] = f.Path
all = append(all, f)
}
}
sort.Slice(all, func(i, j int) bool { return all[i].Version < all[j].Version })
return all, nil
}
func parseVersionPrefix(name string) (int, bool) {
// Expect formats like "001_initial.sql", "2_add_table.sql", etc.
i := 0
for i < len(name) && unicode.IsDigit(rune(name[i])) {
i++
}
if i == 0 {
return 0, false
}
ver, err := strconv.Atoi(name[:i])
if err != nil {
return 0, false
}
return ver, true
}
func loadAppliedVersions(ctx context.Context, db *sql.DB) (map[int]bool, error) {
rows, err := db.QueryContext(ctx, `SELECT version FROM schema_migrations`)
if err != nil {
// If the table doesn't exist yet (very first run), ensure it and return empty set.
if isNoSuchTable(err) {
if err := ensureMigrationsTable(ctx, db); err != nil {
return nil, err
}
return map[int]bool{}, nil
}
return nil, err
}
defer rows.Close()
applied := make(map[int]bool)
for rows.Next() {
var v int
if err := rows.Scan(&v); err != nil {
return nil, err
}
applied[v] = true
}
return applied, rows.Err()
}
func isNoSuchTable(err error) bool {
// rqlite/sqlite error messages vary; keep it permissive
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "no such table") || strings.Contains(msg, "does not exist")
}
// applySQL splits the script into individual statements, strips explicit
// transaction control (BEGIN/COMMIT/ROLLBACK/END), and executes statements
// sequentially to avoid nested transaction issues with rqlite.
func applySQL(ctx context.Context, db *sql.DB, script string) error {
s := strings.TrimSpace(script)
if s == "" {
return nil
}
stmts := splitSQLStatements(s)
stmts = filterOutTxnControls(stmts)
for _, stmt := range stmts {
if strings.TrimSpace(stmt) == "" {
continue
}
if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("exec stmt failed: %w (stmt: %s)", err, snippet(stmt))
}
}
return nil
}
func containsToken(stmts []string, token string) bool {
for _, s := range stmts {
if strings.EqualFold(strings.TrimSpace(s), token) {
return true
}
}
return false
}
// removed duplicate helper
// removed duplicate helper
// isTxnControl returns true if the statement is a transaction control command.
func isTxnControl(s string) bool {
t := strings.ToUpper(strings.TrimSpace(s))
switch t {
case "BEGIN", "BEGIN TRANSACTION", "COMMIT", "END", "ROLLBACK":
return true
default:
return false
}
}
// filterOutTxnControls removes BEGIN/COMMIT/ROLLBACK/END statements.
func filterOutTxnControls(stmts []string) []string {
out := make([]string, 0, len(stmts))
for _, s := range stmts {
if isTxnControl(s) {
continue
}
out = append(out, s)
}
return out
}
func snippet(s string) string {
s = strings.TrimSpace(s)
if len(s) > 120 {
return s[:120] + "..."
}
return s
}
// splitSQLStatements splits a SQL script into statements by semicolon, ignoring semicolons
// inside single/double-quoted strings and skipping comments (-- and /* */).
func splitSQLStatements(in string) []string {
var out []string
var b strings.Builder
inLineComment := false
inBlockComment := false
inSingle := false
inDouble := false
runes := []rune(in)
for i := 0; i < len(runes); i++ {
ch := runes[i]
next := rune(0)
if i+1 < len(runes) {
next = runes[i+1]
}
// Handle end of line comment
if inLineComment {
if ch == '\n' {
inLineComment = false
// keep newline normalization but don't include comment
}
continue
}
// Handle end of block comment
if inBlockComment {
if ch == '*' && next == '/' {
inBlockComment = false
i++
}
continue
}
// Start of comments?
if !inSingle && !inDouble {
if ch == '-' && next == '-' {
inLineComment = true
i++
continue
}
if ch == '/' && next == '*' {
inBlockComment = true
i++
continue
}
}
// Quotes
if !inDouble && ch == '\'' {
// Toggle single quotes, respecting escaped '' inside.
if inSingle {
// Check for escaped '' (two single quotes)
if next == '\'' {
b.WriteRune(ch) // write one '
i++ // skip the next '
continue
}
inSingle = false
} else {
inSingle = true
}
b.WriteRune(ch)
continue
}
if !inSingle && ch == '"' {
if inDouble {
if next == '"' {
b.WriteRune(ch)
i++
continue
}
inDouble = false
} else {
inDouble = true
}
b.WriteRune(ch)
continue
}
// Statement boundary
if ch == ';' && !inSingle && !inDouble {
stmt := strings.TrimSpace(b.String())
if stmt != "" {
out = append(out, stmt)
}
b.Reset()
continue
}
b.WriteRune(ch)
}
// Final fragment
if s := strings.TrimSpace(b.String()); s != "" {
out = append(out, s)
}
return out
}
// Optional helper to load embedded migrations if you later decide to embed.
// Keep for future use; currently unused.
func readDirFS(fsys fs.FS, root string) ([]string, error) {
var files []string
err := fs.WalkDir(fsys, root, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if strings.HasSuffix(strings.ToLower(d.Name()), ".sql") {
files = append(files, path)
}
return nil
})
return files, err
}

View File

@ -1,4 +1,4 @@
package database package rqlite
import ( import (
"context" "context"
@ -174,6 +174,14 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
} }
// After waitForLeadership / waitForSQLAvailable succeeds, before returning:
migrationsDir := "migrations"
if err := r.ApplyMigrations(ctx, migrationsDir); err != nil {
r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir))
return fmt.Errorf("apply migrations: %w", err)
}
r.logger.Info("RQLite node started successfully") r.logger.Info("RQLite node started successfully")
return nil return nil
} }
@ -317,9 +325,6 @@ func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress strin
} }
} }
if lastErr == nil {
lastErr = fmt.Errorf("join target not reachable within %s", timeout)
}
return lastErr return lastErr
} }

View File

@ -1,231 +0,0 @@
package storage
import (
"context"
"fmt"
"io"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
)
// Client provides distributed storage client functionality
type Client struct {
host host.Host
logger *zap.Logger
namespace string
}
// Context utilities for namespace override
type ctxKey string
// CtxKeyNamespaceOverride is the context key used to override namespace per request
const CtxKeyNamespaceOverride ctxKey = "storage_ns_override"
// WithNamespace returns a new context that carries a storage namespace override
func WithNamespace(ctx context.Context, ns string) context.Context {
return context.WithValue(ctx, CtxKeyNamespaceOverride, ns)
}
// NewClient creates a new storage client
func NewClient(h host.Host, namespace string, logger *zap.Logger) *Client {
return &Client{
host: h,
logger: logger,
namespace: namespace,
}
}
// Put stores a key-value pair in the distributed storage
func (c *Client) Put(ctx context.Context, key string, value []byte) error {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypePut,
Key: key,
Value: value,
Namespace: ns,
}
return c.sendRequest(ctx, request)
}
// Get retrieves a value by key from the distributed storage
func (c *Client) Get(ctx context.Context, key string) ([]byte, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypeGet,
Key: key,
Namespace: ns,
}
response, err := c.sendRequestWithResponse(ctx, request)
if err != nil {
return nil, err
}
if !response.Success {
return nil, fmt.Errorf(response.Error)
}
return response.Value, nil
}
// Delete removes a key from the distributed storage
func (c *Client) Delete(ctx context.Context, key string) error {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypeDelete,
Key: key,
Namespace: ns,
}
return c.sendRequest(ctx, request)
}
// List returns keys with a given prefix
func (c *Client) List(ctx context.Context, prefix string, limit int) ([]string, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypeList,
Prefix: prefix,
Limit: limit,
Namespace: ns,
}
response, err := c.sendRequestWithResponse(ctx, request)
if err != nil {
return nil, err
}
if !response.Success {
return nil, fmt.Errorf(response.Error)
}
return response.Keys, nil
}
// Exists checks if a key exists in the distributed storage
func (c *Client) Exists(ctx context.Context, key string) (bool, error) {
ns := c.namespace
if v := ctx.Value(CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
ns = s
}
}
request := &StorageRequest{
Type: MessageTypeExists,
Key: key,
Namespace: ns,
}
response, err := c.sendRequestWithResponse(ctx, request)
if err != nil {
return false, err
}
if !response.Success {
return false, fmt.Errorf(response.Error)
}
return response.Exists, nil
}
// sendRequest sends a request without expecting a response
func (c *Client) sendRequest(ctx context.Context, request *StorageRequest) error {
_, err := c.sendRequestWithResponse(ctx, request)
return err
}
// sendRequestWithResponse sends a request and waits for a response
func (c *Client) sendRequestWithResponse(ctx context.Context, request *StorageRequest) (*StorageResponse, error) {
// Get connected peers
peers := c.host.Network().Peers()
if len(peers) == 0 {
return nil, fmt.Errorf("no peers connected")
}
// Try to send to the first available peer
// In a production system, you might want to implement peer selection logic
for _, peerID := range peers {
response, err := c.sendToPeer(ctx, peerID, request)
if err != nil {
c.logger.Debug("Failed to send to peer",
zap.String("peer", peerID.String()),
zap.Error(err))
continue
}
return response, nil
}
return nil, fmt.Errorf("failed to send request to any peer")
}
// sendToPeer sends a request to a specific peer
func (c *Client) sendToPeer(ctx context.Context, peerID peer.ID, request *StorageRequest) (*StorageResponse, error) {
// Create a new stream to the peer
stream, err := c.host.NewStream(ctx, peerID, protocol.ID(StorageProtocolID))
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
defer stream.Close()
// Set deadline for the operation
deadline, ok := ctx.Deadline()
if ok {
stream.SetDeadline(deadline)
} else {
stream.SetDeadline(time.Now().Add(30 * time.Second))
}
// Marshal and send request
requestData, err := request.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
if _, err := stream.Write(requestData); err != nil {
return nil, fmt.Errorf("failed to write request: %w", err)
}
// Close write side to signal end of request
if err := stream.CloseWrite(); err != nil {
return nil, fmt.Errorf("failed to close write: %w", err)
}
// Read response
responseData, err := io.ReadAll(stream)
if err != nil {
return nil, fmt.Errorf("failed to read response: %w", err)
}
// Unmarshal response
var response StorageResponse
if err := response.Unmarshal(responseData); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
return &response, nil
}

View File

@ -1,182 +0,0 @@
package storage
import (
"database/sql"
"fmt"
"go.uber.org/zap"
)
// processRequest processes a storage request and returns a response
func (s *Service) processRequest(req *StorageRequest) *StorageResponse {
switch req.Type {
case MessageTypePut:
return s.handlePut(req)
case MessageTypeGet:
return s.handleGet(req)
case MessageTypeDelete:
return s.handleDelete(req)
case MessageTypeList:
return s.handleList(req)
case MessageTypeExists:
return s.handleExists(req)
default:
return &StorageResponse{
Success: false,
Error: fmt.Sprintf("unknown message type: %s", req.Type),
}
}
}
// handlePut stores a key-value pair
func (s *Service) handlePut(req *StorageRequest) *StorageResponse {
s.mu.Lock()
defer s.mu.Unlock()
// Use REPLACE to handle both insert and update
query := `
REPLACE INTO kv_storage (namespace, key, value, updated_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
`
_, err := s.db.Exec(query, req.Namespace, req.Key, req.Value)
if err != nil {
return &StorageResponse{
Success: false,
Error: fmt.Sprintf("failed to store key: %v", err),
}
}
s.logger.Debug("Stored key", zap.String("key", req.Key), zap.String("namespace", req.Namespace))
return &StorageResponse{Success: true}
}
// handleGet retrieves a value by key
func (s *Service) handleGet(req *StorageRequest) *StorageResponse {
s.mu.RLock()
defer s.mu.RUnlock()
query := `SELECT value FROM kv_storage WHERE namespace = ? AND key = ?`
var value []byte
err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&value)
if err != nil {
if err == sql.ErrNoRows {
return &StorageResponse{
Success: false,
Error: fmt.Sprintf("key not found: %s", req.Key),
}
}
return &StorageResponse{
Success: false,
Error: fmt.Sprintf("failed to get key: %v", err),
}
}
return &StorageResponse{
Success: true,
Value: value,
}
}
// handleDelete removes a key
func (s *Service) handleDelete(req *StorageRequest) *StorageResponse {
s.mu.Lock()
defer s.mu.Unlock()
query := `DELETE FROM kv_storage WHERE namespace = ? AND key = ?`
result, err := s.db.Exec(query, req.Namespace, req.Key)
if err != nil {
return &StorageResponse{
Success: false,
Error: fmt.Sprintf("failed to delete key: %v", err),
}
}
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
return &StorageResponse{
Success: false,
Error: fmt.Sprintf("key not found: %s", req.Key),
}
}
s.logger.Debug("Deleted key", zap.String("key", req.Key), zap.String("namespace", req.Namespace))
return &StorageResponse{Success: true}
}
// handleList lists keys with a prefix
func (s *Service) handleList(req *StorageRequest) *StorageResponse {
s.mu.RLock()
defer s.mu.RUnlock()
var query string
var args []interface{}
if req.Prefix == "" {
// List all keys in namespace
query = `SELECT key FROM kv_storage WHERE namespace = ?`
args = []interface{}{req.Namespace}
} else {
// List keys with prefix
query = `SELECT key FROM kv_storage WHERE namespace = ? AND key LIKE ?`
args = []interface{}{req.Namespace, req.Prefix + "%"}
}
if req.Limit > 0 {
query += ` LIMIT ?`
args = append(args, req.Limit)
}
rows, err := s.db.Query(query, args...)
if err != nil {
return &StorageResponse{
Success: false,
Error: fmt.Sprintf("failed to query keys: %v", err),
}
}
defer rows.Close()
var keys []string
for rows.Next() {
var key string
if err := rows.Scan(&key); err != nil {
continue
}
keys = append(keys, key)
}
return &StorageResponse{
Success: true,
Keys: keys,
}
}
// handleExists checks if a key exists
func (s *Service) handleExists(req *StorageRequest) *StorageResponse {
s.mu.RLock()
defer s.mu.RUnlock()
query := `SELECT 1 FROM kv_storage WHERE namespace = ? AND key = ? LIMIT 1`
var exists int
err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&exists)
if err != nil {
if err == sql.ErrNoRows {
return &StorageResponse{
Success: true,
Exists: false,
}
}
return &StorageResponse{
Success: false,
Error: fmt.Sprintf("failed to check key existence: %v", err),
}
}
return &StorageResponse{
Success: true,
Exists: true,
}
}

View File

@ -1,16 +0,0 @@
package storage
import "go.uber.org/zap"
// newStorageLogger creates a zap.Logger for storage components.
// Callers can pass quiet=true to reduce log verbosity.
func newStorageLogger(quiet bool) (*zap.Logger, error) {
if quiet {
cfg := zap.NewProductionConfig()
cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
cfg.DisableCaller = true
cfg.DisableStacktrace = true
return cfg.Build()
}
return zap.NewDevelopment()
}

View File

@ -1,60 +0,0 @@
package storage
import (
"encoding/json"
)
// Storage protocol definitions for distributed storage
const (
StorageProtocolID = "/network/storage/1.0.0"
)
// Message types for storage operations
type MessageType string
const (
MessageTypePut MessageType = "put"
MessageTypeGet MessageType = "get"
MessageTypeDelete MessageType = "delete"
MessageTypeList MessageType = "list"
MessageTypeExists MessageType = "exists"
)
// StorageRequest represents a storage operation request
type StorageRequest struct {
Type MessageType `json:"type"`
Key string `json:"key"`
Value []byte `json:"value,omitempty"`
Prefix string `json:"prefix,omitempty"`
Limit int `json:"limit,omitempty"`
Namespace string `json:"namespace"`
}
// StorageResponse represents a storage operation response
type StorageResponse struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Value []byte `json:"value,omitempty"`
Keys []string `json:"keys,omitempty"`
Exists bool `json:"exists,omitempty"`
}
// Marshal serializes a request to JSON
func (r *StorageRequest) Marshal() ([]byte, error) {
return json.Marshal(r)
}
// Unmarshal deserializes a request from JSON
func (r *StorageRequest) Unmarshal(data []byte) error {
return json.Unmarshal(data, r)
}
// Marshal serializes a response to JSON
func (r *StorageResponse) Marshal() ([]byte, error) {
return json.Marshal(r)
}
// Unmarshal deserializes a response from JSON
func (r *StorageResponse) Unmarshal(data []byte) error {
return json.Unmarshal(data, r)
}

View File

@ -1,23 +0,0 @@
package storage
import "testing"
func TestRequestResponseJSON(t *testing.T) {
req := &StorageRequest{Type: MessageTypePut, Key: "k", Value: []byte("v"), Namespace: "ns"}
b, err := req.Marshal()
if err != nil { t.Fatal(err) }
var out StorageRequest
if err := out.Unmarshal(b); err != nil { t.Fatal(err) }
if out.Type != MessageTypePut || out.Key != "k" || out.Namespace != "ns" {
t.Fatalf("roundtrip mismatch: %+v", out)
}
resp := &StorageResponse{Success: true, Keys: []string{"a"}, Exists: true}
b, err = resp.Marshal()
if err != nil { t.Fatal(err) }
var outR StorageResponse
if err := outR.Unmarshal(b); err != nil { t.Fatal(err) }
if !outR.Success || !outR.Exists || len(outR.Keys) != 1 {
t.Fatalf("resp mismatch: %+v", outR)
}
}

View File

@ -1,37 +0,0 @@
package storage
import (
"fmt"
)
// initTables creates the necessary tables for key-value storage
func (s *Service) initTables() error {
// Create storage table with namespace support
createTableSQL := `
CREATE TABLE IF NOT EXISTS kv_storage (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value BLOB NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (namespace, key)
)
`
// Create index for faster queries
createIndexSQL := `
CREATE INDEX IF NOT EXISTS idx_kv_storage_namespace_key
ON kv_storage(namespace, key)
`
if _, err := s.db.Exec(createTableSQL); err != nil {
return fmt.Errorf("failed to create storage table: %w", err)
}
if _, err := s.db.Exec(createIndexSQL); err != nil {
return fmt.Errorf("failed to create storage index: %w", err)
}
s.logger.Info("Storage tables initialized successfully")
return nil
}

View File

@ -1,32 +0,0 @@
package storage
import (
"database/sql"
"sync"
"go.uber.org/zap"
)
// Service provides distributed storage functionality using RQLite
type Service struct {
logger *zap.Logger
db *sql.DB
mu sync.RWMutex
}
// NewService creates a new storage service backed by RQLite
func NewService(db *sql.DB, logger *zap.Logger) (*Service, error) {
service := &Service{
logger: logger,
db: db,
}
return service, nil
}
// Close closes the storage service
func (s *Service) Close() error {
// The database connection is managed elsewhere
s.logger.Info("Storage service closed")
return nil
}

View File

@ -1,48 +0,0 @@
package storage
import (
"io"
"github.com/libp2p/go-libp2p/core/network"
"go.uber.org/zap"
)
// HandleStorageStream handles incoming storage protocol streams
func (s *Service) HandleStorageStream(stream network.Stream) {
defer stream.Close()
// Read request
data, err := io.ReadAll(stream)
if err != nil {
s.logger.Error("Failed to read storage request", zap.Error(err))
return
}
var request StorageRequest
if err := request.Unmarshal(data); err != nil {
s.logger.Error("Failed to unmarshal storage request", zap.Error(err))
return
}
// Process request
response := s.processRequest(&request)
// Send response
responseData, err := response.Marshal()
if err != nil {
s.logger.Error("Failed to marshal storage response", zap.Error(err))
return
}
if _, err := stream.Write(responseData); err != nil {
s.logger.Error("Failed to write storage response", zap.Error(err))
return
}
s.logger.Debug("Handled storage request",
zap.String("type", string(request.Type)),
zap.String("key", request.Key),
zap.String("namespace", request.Namespace),
zap.Bool("success", response.Success),
)
}