From 16845b758d9b66340f7c8bbad4879c059527faf0 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Tue, 23 Sep 2025 06:00:57 +0300 Subject: [PATCH 1/2] - Remoevd old storage folder - Created new rqlite folder - Created rqlite adapter, client, gateway, migrations and rqlite init - Updated node.go to support new rqlite architecture - Updated readme - Updated version --- CHANGELOG.md | 8 + Makefile | 2 +- README.md | 237 ++++++++ pkg/node/node.go | 2 +- pkg/{database => rqlite}/adapter.go | 2 +- pkg/rqlite/client.go | 835 ++++++++++++++++++++++++++++ pkg/rqlite/gateway.go | 0 pkg/rqlite/migrations.go | 436 +++++++++++++++ pkg/{database => rqlite}/rqlite.go | 10 +- pkg/storage/client.go | 231 -------- pkg/storage/kv_ops.go | 182 ------ pkg/storage/logging.go | 16 - pkg/storage/protocol.go | 60 -- pkg/storage/protocol_test.go | 23 - pkg/storage/rqlite_init.go | 37 -- pkg/storage/service.go | 32 -- pkg/storage/stream_handler.go | 48 -- 17 files changed, 1528 insertions(+), 633 deletions(-) rename pkg/{database => rqlite}/adapter.go (98%) create mode 100644 pkg/rqlite/client.go create mode 100644 pkg/rqlite/gateway.go create mode 100644 pkg/rqlite/migrations.go rename pkg/{database => rqlite}/rqlite.go (96%) delete mode 100644 pkg/storage/client.go delete mode 100644 pkg/storage/kv_ops.go delete mode 100644 pkg/storage/logging.go delete mode 100644 pkg/storage/protocol.go delete mode 100644 pkg/storage/protocol_test.go delete mode 100644 pkg/storage/rqlite_init.go delete mode 100644 pkg/storage/service.go delete mode 100644 pkg/storage/stream_handler.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 82534f5..664f1e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,12 +8,20 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Added +- Created new rqlite folder +- Created rqlite adapter, client, gateway, migrations and rqlite init + ### Changed +- Updated node.go to support new rqlite architecture +- Updated readme + ### Deprecated ### Removed +- Removed old storage folder + ### Fixed ### Security diff --git a/Makefile b/Makefile index e3a5b18..2343443 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.44.0-beta +VERSION := 0.50.0-beta COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/README.md b/README.md index 7eabb77..a0f038b 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ A robust, decentralized peer-to-peer network built in Go, providing distributed - [CLI Usage](#cli-usage) - [HTTP Gateway](#http-gateway) - [Development](#development) +- [Database Client (Go ORM-like)](#database-client-go-orm-like) - [Troubleshooting](#troubleshooting) - [License](#license) @@ -700,6 +701,242 @@ make clean # Clean build artifacts scripts/test-multinode.sh ``` +--- + +## Database Client (Go ORM-like) + +A lightweight ORM-like client over rqlite using Go’s `database/sql`. It provides: +- Query/Exec for raw SQL +- A fluent QueryBuilder (`Where`, `InnerJoin`, `LeftJoin`, `OrderBy`, `GroupBy`, `Limit`, `Offset`) +- Simple repositories with `Find`/`FindOne` +- `Save`/`Remove` for entities with primary keys +- Transaction support via `Tx` + +### Installation + +- Ensure rqlite is running (the node starts and manages rqlite automatically). +- Import the client: + - Package: `github.com/DeBrosOfficial/network/pkg/rqlite` + +### Quick Start + +```go +package main + +import ( + "context" + "database/sql" + "time" + + "github.com/DeBrosOfficial/network/pkg/rqlite" + _ "github.com/rqlite/gorqlite/stdlib" +) + +type User struct { + ID int64 `db:"id,pk,auto"` + Email string `db:"email"` + FirstName string `db:"first_name"` + LastName string `db:"last_name"` + CreatedAt time.Time `db:"created_at"` +} + +func (User) TableName() string { return "users" } + +func main() { + ctx := context.Background() + + adapter, _ := rqlite.NewRQLiteAdapter(manager) + client := rqlite.NewClientFromAdapter(adapter) + + // Save (INSERT) + u := &User{Email: "alice@example.com", FirstName: "Alice", LastName: "A"} + _ = client.Save(ctx, u) // auto-sets u.ID if autoincrement is available + + // FindOneBy + var one User + _ = client.FindOneBy(ctx, &one, "users", map[string]any{"email": "alice@example.com"}) + + // QueryBuilder + var users []User + _ = client.CreateQueryBuilder("users"). + Where("email LIKE ?", "%@example.com"). + OrderBy("created_at DESC"). + Limit(10). + GetMany(ctx, &users) +} + +### Entities and Mapping + +- Use struct tags: `db:"column_name"`; the first tag value is the column name. +- Mark primary key: `db:"id,pk"` (and `auto` if autoincrement): `db:"id,pk,auto"`. +- Fallbacks: + - If no `db` tag is provided, the field name is used as the column (case-insensitive). + - If a field is named `ID`, it is treated as the primary key by default. + +```go +type Post struct { + ID int64 `db:"id,pk,auto"` + UserID int64 `db:"user_id"` + Title string `db:"title"` + Body string `db:"body"` + CreatedAt time.Time `db:"created_at"` +} +func (Post) TableName() string { return "posts" } +``` + +### Basic queries + +Raw SQL with scanning into structs or maps: + +```go +var users []User +err := client.Query(ctx, &users, "SELECT id, email, first_name, last_name, created_at FROM users WHERE email LIKE ?", "%@example.com") +if err != nil { + // handle +} + +var rows []map[string]any +_ = client.Query(ctx, &rows, "SELECT id, email FROM users WHERE id IN (?,?)", 1, 2) +``` + +### Query Buider + +Build complex SELECTs with joins, filters, grouping, ordering, and pagination. + +```go +var results []User +qb := client.CreateQueryBuilder("users u"). + InnerJoin("posts p", "p.user_id = u.id"). + Where("u.email LIKE ?", "%@example.com"). + AndWhere("p.created_at >= ?", "2024-01-01T00:00:00Z"). + GroupBy("u.id"). + OrderBy("u.created_at DESC"). + Limit(20). + Offset(0) + +if err := qb.GetMany(ctx, &results); err != nil { + // handle +} + +// Single row (LIMIT 1) +var one User +if err := qb.Limit(1).GetOne(ctx, &one); err != nil { + // handle sql.ErrNoRows, etc. +} +``` + +### FindBy / FindOneBy + +Simple map-based filters: + +```go +var active []User +_ = client.FindBy(ctx, &active, "users", map[string]any{"last_name": "A"}, rqlite.WithOrderBy("created_at DESC"), rqlite.WithLimit(50)) + +var u User +if err := client.FindOneBy(ctx, &u, "users", map[string]any{"email": "alice@example.com"}); err != nil { + // sql.ErrNoRows if not found +} +``` + +### Save / Remove + +`Save` inserts if PK is zero, otherwise updates by PK. +`Remove` deletes by PK. + +```go +// Insert (ID is zero) +u := &User{Email: "bob@example.com", FirstName: "Bob"} +_ = client.Save(ctx, u) // INSERT; sets u.ID if autoincrement + +// Update (ID is non-zero) +u.FirstName = "Bobby" +_ = client.Save(ctx, u) // UPDATE ... WHERE id = ? + +// Remove +_ = client.Remove(ctx, u) // DELETE ... WHERE id = ? + +``` + +### transactions + +Run multiple operations atomically. If your function returns an error, the transaction is rolled back; otherwise it commits. + +```go +err := client.Tx(ctx, func(tx rqlite.Tx) error { + // Read inside the same transaction + var me User + if err := tx.Query(ctx, &me, "SELECT * FROM users WHERE id = ?", 1); err != nil { + return err + } + + // Write inside the same transaction + me.LastName = "Updated" + if err := tx.Save(ctx, &me); err != nil { + return err + } + + // Complex query via builder + var recent []User + if err := tx.CreateQueryBuilder("users"). + OrderBy("created_at DESC"). + Limit(5). + GetMany(ctx, &recent); err != nil { + return err + } + + return nil // commit +}) + +``` + +### Repositories (optional, generic) + +Strongly-typed convenience layer bound to a table: + +```go +repo := client.Repository[User]("users") + +var many []User +_ = repo.Find(ctx, &many, map[string]any{"last_name": "A"}, rqlite.WithOrderBy("created_at DESC"), rqlite.WithLimit(10)) + +var one User +_ = repo.FindOne(ctx, &one, map[string]any{"email": "alice@example.com"}) + +_ = repo.Save(ctx, &one) +_ = repo.Remove(ctx, &one) + +``` + +### Migrations + +Option A: From the node (after rqlite is ready) + +```go +ctx := context.Background() +dirs := []string{ + "network/migrations", // default + "path/to/your/app/migrations", // extra +} + +if err := rqliteManager.ApplyMigrationsDirs(ctx, dirs); err != nil { + logger.Fatal("apply migrations failed", zap.Error(err)) +} +``` + +Option B: Using the adapter sql.DB + +```go +ctx := context.Background() +db := adapter.GetSQLDB() +dirs := []string{"network/migrations", "app/migrations"} + +if err := rqlite.ApplyMigrationsDirs(ctx, db, dirs, logger); err != nil { + logger.Fatal("apply migrations failed", zap.Error(err)) +} +``` + + --- ## Troubleshooting diff --git a/pkg/node/node.go b/pkg/node/node.go index b41feca..78fc0ab 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -23,9 +23,9 @@ import ( "go.uber.org/zap" "github.com/DeBrosOfficial/network/pkg/config" - "github.com/DeBrosOfficial/network/pkg/database" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/pubsub" + database "github.com/DeBrosOfficial/network/pkg/rqlite" ) // Node represents a network node with RQLite database diff --git a/pkg/database/adapter.go b/pkg/rqlite/adapter.go similarity index 98% rename from pkg/database/adapter.go rename to pkg/rqlite/adapter.go index 1aa2686..81205bf 100644 --- a/pkg/database/adapter.go +++ b/pkg/rqlite/adapter.go @@ -1,4 +1,4 @@ -package database +package rqlite import ( "database/sql" diff --git a/pkg/rqlite/client.go b/pkg/rqlite/client.go new file mode 100644 index 0000000..70c78e2 --- /dev/null +++ b/pkg/rqlite/client.go @@ -0,0 +1,835 @@ +package rqlite + +// client.go defines the ORM-like interfaces and a minimal implementation over database/sql. +// It builds on the rqlite stdlib driver so it behaves like a regular SQL-backed ORM. + +import ( + "context" + "database/sql" + "errors" + "fmt" + "reflect" + "strings" + "time" +) + +// TableNamer lets a struct provide its table name. +type TableNamer interface { + TableName() string +} + +// Client is the high-level ORM-like API. +type Client interface { + // Query runs an arbitrary SELECT and scans rows into dest (pointer to slice of structs or []map[string]any). + Query(ctx context.Context, dest any, query string, args ...any) error + // Exec runs a write statement (INSERT/UPDATE/DELETE). + Exec(ctx context.Context, query string, args ...any) (sql.Result, error) + + // FindBy/FindOneBy provide simple map-based criteria filtering. + FindBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error + FindOneBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error + + // Save inserts or updates an entity (single-PK). + Save(ctx context.Context, entity any) error + // Remove deletes by PK (single-PK). + Remove(ctx context.Context, entity any) error + + // Repositories (generic layer). Optional but convenient if you use Go generics. + Repository(table string) any + + // Fluent query builder for advanced querying. + CreateQueryBuilder(table string) *QueryBuilder + + // Tx executes a function within a transaction. + Tx(ctx context.Context, fn func(tx Tx) error) error +} + +// Tx mirrors Client but executes within a transaction. +type Tx interface { + Query(ctx context.Context, dest any, query string, args ...any) error + Exec(ctx context.Context, query string, args ...any) (sql.Result, error) + CreateQueryBuilder(table string) *QueryBuilder + + // Optional: scoped Save/Remove inside tx + Save(ctx context.Context, entity any) error + Remove(ctx context.Context, entity any) error +} + +// Repository provides typed entity operations for a table. +type Repository[T any] interface { + Find(ctx context.Context, dest *[]T, criteria map[string]any, opts ...FindOption) error + FindOne(ctx context.Context, dest *T, criteria map[string]any, opts ...FindOption) error + Save(ctx context.Context, entity *T) error + Remove(ctx context.Context, entity *T) error + + // Builder helpers + Q() *QueryBuilder +} + +// NewClient wires the ORM client to a *sql.DB (from your RQLiteAdapter). +func NewClient(db *sql.DB) Client { + return &client{db: db} +} + +// NewClientFromAdapter is convenient if you already created the adapter. +func NewClientFromAdapter(adapter *RQLiteAdapter) Client { + return NewClient(adapter.GetSQLDB()) +} + +// client implements Client over *sql.DB. +type client struct { + db *sql.DB +} + +func (c *client) Query(ctx context.Context, dest any, query string, args ...any) error { + rows, err := c.db.QueryContext(ctx, query, args...) + if err != nil { + return err + } + defer rows.Close() + return scanIntoDest(rows, dest) +} + +func (c *client) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) { + return c.db.ExecContext(ctx, query, args...) +} + +func (c *client) FindBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error { + qb := c.CreateQueryBuilder(table) + for k, v := range criteria { + qb = qb.AndWhere(fmt.Sprintf("%s = ?", k), v) + } + for _, opt := range opts { + opt(qb) + } + return qb.GetMany(ctx, dest) +} + +func (c *client) FindOneBy(ctx context.Context, dest any, table string, criteria map[string]any, opts ...FindOption) error { + qb := c.CreateQueryBuilder(table) + for k, v := range criteria { + qb = qb.AndWhere(fmt.Sprintf("%s = ?", k), v) + } + for _, opt := range opts { + opt(qb) + } + return qb.GetOne(ctx, dest) +} + +func (c *client) Save(ctx context.Context, entity any) error { + return saveEntity(ctx, c.db, entity) +} + +func (c *client) Remove(ctx context.Context, entity any) error { + return removeEntity(ctx, c.db, entity) +} + +func (c *client) Repository(table string) any { + // This returns an untyped interface since Go methods cannot have type parameters + // Users will need to type assert the result to Repository[T] + return func() any { + return &repository[any]{c: c, table: table} + }() +} + +func (c *client) CreateQueryBuilder(table string) *QueryBuilder { + return newQueryBuilder(c.db, table) +} + +func (c *client) Tx(ctx context.Context, fn func(tx Tx) error) error { + sqlTx, err := c.db.BeginTx(ctx, nil) + if err != nil { + return err + } + txc := &txClient{tx: sqlTx} + if err := fn(txc); err != nil { + _ = sqlTx.Rollback() + return err + } + return sqlTx.Commit() +} + +// txClient implements Tx over *sql.Tx. +type txClient struct { + tx *sql.Tx +} + +func (t *txClient) Query(ctx context.Context, dest any, query string, args ...any) error { + rows, err := t.tx.QueryContext(ctx, query, args...) + if err != nil { + return err + } + defer rows.Close() + return scanIntoDest(rows, dest) +} + +func (t *txClient) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) { + return t.tx.ExecContext(ctx, query, args...) +} + +func (t *txClient) CreateQueryBuilder(table string) *QueryBuilder { + return newQueryBuilder(t.tx, table) +} + +func (t *txClient) Save(ctx context.Context, entity any) error { + return saveEntity(ctx, t.tx, entity) +} + +func (t *txClient) Remove(ctx context.Context, entity any) error { + return removeEntity(ctx, t.tx, entity) +} + +// executor is implemented by *sql.DB and *sql.Tx. +type executor interface { + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) +} + +// QueryBuilder implements a fluent SELECT builder with joins, where, etc. +type QueryBuilder struct { + exec executor + table string + alias string + selects []string + + joins []joinClause + wheres []whereClause + + groupBys []string + orderBys []string + limit *int + offset *int +} + +// joinClause represents INNER/LEFT/etc joins. +type joinClause struct { + kind string // "INNER", "LEFT", "JOIN" (default) + table string + on string +} + +// whereClause holds an expression and args with a conjunction. +type whereClause struct { + conj string // "AND" or "OR" + expr string + args []any +} + +func newQueryBuilder(exec executor, table string) *QueryBuilder { + return &QueryBuilder{ + exec: exec, + table: table, + } +} + +func (qb *QueryBuilder) Select(cols ...string) *QueryBuilder { + qb.selects = append(qb.selects, cols...) + return qb +} + +func (qb *QueryBuilder) Alias(a string) *QueryBuilder { + qb.alias = a + return qb +} + +func (qb *QueryBuilder) Where(expr string, args ...any) *QueryBuilder { + return qb.AndWhere(expr, args...) +} + +func (qb *QueryBuilder) AndWhere(expr string, args ...any) *QueryBuilder { + qb.wheres = append(qb.wheres, whereClause{conj: "AND", expr: expr, args: args}) + return qb +} + +func (qb *QueryBuilder) OrWhere(expr string, args ...any) *QueryBuilder { + qb.wheres = append(qb.wheres, whereClause{conj: "OR", expr: expr, args: args}) + return qb +} + +func (qb *QueryBuilder) InnerJoin(table string, on string) *QueryBuilder { + qb.joins = append(qb.joins, joinClause{kind: "INNER", table: table, on: on}) + return qb +} + +func (qb *QueryBuilder) LeftJoin(table string, on string) *QueryBuilder { + qb.joins = append(qb.joins, joinClause{kind: "LEFT", table: table, on: on}) + return qb +} + +func (qb *QueryBuilder) Join(table string, on string) *QueryBuilder { + qb.joins = append(qb.joins, joinClause{kind: "JOIN", table: table, on: on}) + return qb +} + +func (qb *QueryBuilder) GroupBy(cols ...string) *QueryBuilder { + qb.groupBys = append(qb.groupBys, cols...) + return qb +} + +func (qb *QueryBuilder) OrderBy(exprs ...string) *QueryBuilder { + qb.orderBys = append(qb.orderBys, exprs...) + return qb +} + +func (qb *QueryBuilder) Limit(n int) *QueryBuilder { + qb.limit = &n + return qb +} + +func (qb *QueryBuilder) Offset(n int) *QueryBuilder { + qb.offset = &n + return qb +} + +// Build returns the SQL string and args for a SELECT. +func (qb *QueryBuilder) Build() (string, []any) { + cols := "*" + if len(qb.selects) > 0 { + cols = strings.Join(qb.selects, ", ") + } + base := fmt.Sprintf("SELECT %s FROM %s", cols, qb.table) + if qb.alias != "" { + base += " AS " + qb.alias + } + + args := make([]any, 0, 16) + for _, j := range qb.joins { + base += fmt.Sprintf(" %s JOIN %s ON %s", j.kind, j.table, j.on) + } + + if len(qb.wheres) > 0 { + base += " WHERE " + for i, w := range qb.wheres { + if i > 0 { + base += " " + w.conj + " " + } + base += "(" + w.expr + ")" + args = append(args, w.args...) + } + } + + if len(qb.groupBys) > 0 { + base += " GROUP BY " + strings.Join(qb.groupBys, ", ") + } + if len(qb.orderBys) > 0 { + base += " ORDER BY " + strings.Join(qb.orderBys, ", ") + } + if qb.limit != nil { + base += fmt.Sprintf(" LIMIT %d", *qb.limit) + } + if qb.offset != nil { + base += fmt.Sprintf(" OFFSET %d", *qb.offset) + } + return base, args +} + +// GetMany executes the built query and scans into dest (pointer to slice). +func (qb *QueryBuilder) GetMany(ctx context.Context, dest any) error { + sqlStr, args := qb.Build() + rows, err := qb.exec.QueryContext(ctx, sqlStr, args...) + if err != nil { + return err + } + defer rows.Close() + return scanIntoDest(rows, dest) +} + +// GetOne executes the built query and scans into dest (pointer to struct or map) with LIMIT 1. +func (qb *QueryBuilder) GetOne(ctx context.Context, dest any) error { + limit := 1 + if qb.limit == nil { + qb.limit = &limit + } else if qb.limit != nil && *qb.limit > 1 { + qb.limit = &limit + } + sqlStr, args := qb.Build() + rows, err := qb.exec.QueryContext(ctx, sqlStr, args...) + if err != nil { + return err + } + defer rows.Close() + if !rows.Next() { + return sql.ErrNoRows + } + return scanIntoSingle(rows, dest) +} + +// FindOption customizes Find queries. +type FindOption func(q *QueryBuilder) + +func WithOrderBy(exprs ...string) FindOption { + return func(q *QueryBuilder) { q.OrderBy(exprs...) } +} +func WithGroupBy(cols ...string) FindOption { + return func(q *QueryBuilder) { q.GroupBy(cols...) } +} +func WithLimit(n int) FindOption { + return func(q *QueryBuilder) { q.Limit(n) } +} +func WithOffset(n int) FindOption { + return func(q *QueryBuilder) { q.Offset(n) } +} +func WithSelect(cols ...string) FindOption { + return func(q *QueryBuilder) { q.Select(cols...) } +} +func WithJoin(kind, table, on string) FindOption { + return func(q *QueryBuilder) { + switch strings.ToUpper(kind) { + case "INNER": + q.InnerJoin(table, on) + case "LEFT": + q.LeftJoin(table, on) + default: + q.Join(table, on) + } + } +} + +// repository is a generic table repository for type T. +type repository[T any] struct { + c *client + table string +} + +func (r *repository[T]) Find(ctx context.Context, dest *[]T, criteria map[string]any, opts ...FindOption) error { + qb := r.c.CreateQueryBuilder(r.table) + for k, v := range criteria { + qb.AndWhere(fmt.Sprintf("%s = ?", k), v) + } + for _, opt := range opts { + opt(qb) + } + return qb.GetMany(ctx, dest) +} + +func (r *repository[T]) FindOne(ctx context.Context, dest *T, criteria map[string]any, opts ...FindOption) error { + qb := r.c.CreateQueryBuilder(r.table) + for k, v := range criteria { + qb.AndWhere(fmt.Sprintf("%s = ?", k), v) + } + for _, opt := range opts { + opt(qb) + } + return qb.GetOne(ctx, dest) +} + +func (r *repository[T]) Save(ctx context.Context, entity *T) error { + return saveEntity(ctx, r.c.db, entity) +} + +func (r *repository[T]) Remove(ctx context.Context, entity *T) error { + return removeEntity(ctx, r.c.db, entity) +} + +func (r *repository[T]) Q() *QueryBuilder { + return r.c.CreateQueryBuilder(r.table) +} + +// ----------------------- +// Reflection + scanning +// ----------------------- + +func scanIntoDest(rows *sql.Rows, dest any) error { + // dest must be pointer to slice (of struct or map) + rv := reflect.ValueOf(dest) + if rv.Kind() != reflect.Pointer || rv.IsNil() { + return errors.New("dest must be a non-nil pointer") + } + sliceVal := rv.Elem() + if sliceVal.Kind() != reflect.Slice { + return errors.New("dest must be pointer to a slice") + } + elemType := sliceVal.Type().Elem() + + cols, err := rows.Columns() + if err != nil { + return err + } + + for rows.Next() { + itemPtr := reflect.New(elemType) + // Support map[string]any and struct + if elemType.Kind() == reflect.Map { + m, err := scanRowToMap(rows, cols) + if err != nil { + return err + } + sliceVal.Set(reflect.Append(sliceVal, reflect.ValueOf(m))) + continue + } + + if elemType.Kind() == reflect.Struct { + if err := scanCurrentRowIntoStruct(rows, cols, itemPtr.Elem()); err != nil { + return err + } + sliceVal.Set(reflect.Append(sliceVal, itemPtr.Elem())) + continue + } + + return fmt.Errorf("unsupported slice element type: %s", elemType.Kind()) + } + return rows.Err() +} + +func scanIntoSingle(rows *sql.Rows, dest any) error { + rv := reflect.ValueOf(dest) + if rv.Kind() != reflect.Pointer || rv.IsNil() { + return errors.New("dest must be a non-nil pointer") + } + cols, err := rows.Columns() + if err != nil { + return err + } + + switch rv.Elem().Kind() { + case reflect.Map: + m, err := scanRowToMap(rows, cols) + if err != nil { + return err + } + rv.Elem().Set(reflect.ValueOf(m)) + return nil + case reflect.Struct: + return scanCurrentRowIntoStruct(rows, cols, rv.Elem()) + default: + return fmt.Errorf("unsupported dest kind: %s", rv.Elem().Kind()) + } +} + +func scanRowToMap(rows *sql.Rows, cols []string) (map[string]any, error) { + raw := make([]any, len(cols)) + ptrs := make([]any, len(cols)) + for i := range raw { + ptrs[i] = &raw[i] + } + if err := rows.Scan(ptrs...); err != nil { + return nil, err + } + out := make(map[string]any, len(cols)) + for i, c := range cols { + out[c] = normalizeSQLValue(raw[i]) + } + return out, nil +} + +func scanCurrentRowIntoStruct(rows *sql.Rows, cols []string, destStruct reflect.Value) error { + raw := make([]any, len(cols)) + ptrs := make([]any, len(cols)) + for i := range raw { + ptrs[i] = &raw[i] + } + if err := rows.Scan(ptrs...); err != nil { + return err + } + fieldIndex := buildFieldIndex(destStruct.Type()) + for i, c := range cols { + if idx, ok := fieldIndex[strings.ToLower(c)]; ok { + field := destStruct.Field(idx) + if field.CanSet() { + if err := setReflectValue(field, raw[i]); err != nil { + return fmt.Errorf("column %s: %w", c, err) + } + } + } + } + return nil +} + +func normalizeSQLValue(v any) any { + switch t := v.(type) { + case []byte: + return string(t) + default: + return v + } +} + +func buildFieldIndex(t reflect.Type) map[string]int { + m := make(map[string]int) + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + if f.IsExported() == false { + continue + } + tag := f.Tag.Get("db") + col := "" + if tag != "" { + col = strings.Split(tag, ",")[0] + } + if col == "" { + col = f.Name + } + m[strings.ToLower(col)] = i + } + return m +} + +func setReflectValue(field reflect.Value, raw any) error { + if raw == nil { + // leave zero value + return nil + } + switch field.Kind() { + case reflect.String: + switch v := raw.(type) { + case string: + field.SetString(v) + case []byte: + field.SetString(string(v)) + default: + field.SetString(fmt.Sprint(v)) + } + case reflect.Bool: + switch v := raw.(type) { + case bool: + field.SetBool(v) + case int64: + field.SetBool(v != 0) + case []byte: + s := string(v) + field.SetBool(s == "1" || strings.EqualFold(s, "true")) + default: + field.SetBool(false) + } + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + switch v := raw.(type) { + case int64: + field.SetInt(v) + case []byte: + var n int64 + fmt.Sscan(string(v), &n) + field.SetInt(n) + default: + return fmt.Errorf("cannot convert %T to int", raw) + } + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + switch v := raw.(type) { + case int64: + if v < 0 { + v = 0 + } + field.SetUint(uint64(v)) + case []byte: + var n uint64 + fmt.Sscan(string(v), &n) + field.SetUint(n) + default: + return fmt.Errorf("cannot convert %T to uint", raw) + } + case reflect.Float32, reflect.Float64: + switch v := raw.(type) { + case float64: + field.SetFloat(v) + case []byte: + var fv float64 + fmt.Sscan(string(v), &fv) + field.SetFloat(fv) + default: + return fmt.Errorf("cannot convert %T to float", raw) + } + case reflect.Struct: + // Support time.Time; extend as needed. + if field.Type() == reflect.TypeOf(time.Time{}) { + switch v := raw.(type) { + case time.Time: + field.Set(reflect.ValueOf(v)) + case []byte: + // Try RFC3339 + if tt, err := time.Parse(time.RFC3339, string(v)); err == nil { + field.Set(reflect.ValueOf(tt)) + } + } + return nil + } + fallthrough + default: + // Not supported yet + return fmt.Errorf("unsupported dest field kind: %s", field.Kind()) + } + return nil +} + +// ----------------------- +// Save/Remove (basic PK) +// ----------------------- + +type fieldMeta struct { + index int + column string + isPK bool + auto bool +} + +func collectMeta(t reflect.Type) (fields []fieldMeta, pk fieldMeta, hasPK bool) { + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + if !f.IsExported() { + continue + } + tag := f.Tag.Get("db") + if tag == "-" { + continue + } + opts := strings.Split(tag, ",") + col := opts[0] + if col == "" { + col = f.Name + } + meta := fieldMeta{index: i, column: col} + for _, o := range opts[1:] { + switch strings.ToLower(strings.TrimSpace(o)) { + case "pk": + meta.isPK = true + case "auto", "autoincrement": + meta.auto = true + } + } + // If not tagged as pk, fallback to field name "ID" + if !meta.isPK && f.Name == "ID" { + meta.isPK = true + if col == "" { + meta.column = "id" + } + } + fields = append(fields, meta) + if meta.isPK { + pk = meta + hasPK = true + } + } + return +} + +func getTableNameFromEntity(v reflect.Value) (string, bool) { + // If entity implements TableNamer + if v.CanInterface() { + if tn, ok := v.Interface().(TableNamer); ok { + return tn.TableName(), true + } + } + // Fallback: very naive pluralization (append 's') + typ := v.Type() + if typ.Kind() == reflect.Pointer { + typ = typ.Elem() + } + if typ.Kind() == reflect.Struct { + return strings.ToLower(typ.Name()) + "s", true + } + return "", false +} + +func saveEntity(ctx context.Context, exec executor, entity any) error { + rv := reflect.ValueOf(entity) + if rv.Kind() != reflect.Pointer || rv.IsNil() { + return errors.New("entity must be a non-nil pointer to struct") + } + ev := rv.Elem() + if ev.Kind() != reflect.Struct { + return errors.New("entity must point to a struct") + } + + fields, pkMeta, hasPK := collectMeta(ev.Type()) + if !hasPK { + return errors.New("no primary key field found (tag db:\"...,pk\" or field named ID)") + } + table, ok := getTableNameFromEntity(ev) + if !ok || table == "" { + return errors.New("unable to resolve table name; implement TableNamer or set up a repository with explicit table") + } + + // Build lists + cols := make([]string, 0, len(fields)) + vals := make([]any, 0, len(fields)) + setParts := make([]string, 0, len(fields)) + + var pkVal any + var pkIsZero bool + + for _, fm := range fields { + f := ev.Field(fm.index) + if fm.isPK { + pkVal = f.Interface() + pkIsZero = isZeroValue(f) + continue + } + cols = append(cols, fm.column) + vals = append(vals, f.Interface()) + setParts = append(setParts, fmt.Sprintf("%s = ?", fm.column)) + } + + if pkIsZero { + // INSERT + placeholders := strings.Repeat("?,", len(cols)) + if len(placeholders) > 0 { + placeholders = placeholders[:len(placeholders)-1] + } + sqlStr := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", table, strings.Join(cols, ", "), placeholders) + res, err := exec.ExecContext(ctx, sqlStr, vals...) + if err != nil { + return err + } + // Set auto ID if needed + if pkMeta.auto { + if id, err := res.LastInsertId(); err == nil { + ev.Field(pkMeta.index).SetInt(id) + } + } + return nil + } + + // UPDATE ... WHERE pk = ? + sqlStr := fmt.Sprintf("UPDATE %s SET %s WHERE %s = ?", table, strings.Join(setParts, ", "), pkMeta.column) + valsWithPK := append(vals, pkVal) + _, err := exec.ExecContext(ctx, sqlStr, valsWithPK...) + return err +} + +func removeEntity(ctx context.Context, exec executor, entity any) error { + rv := reflect.ValueOf(entity) + if rv.Kind() != reflect.Pointer || rv.IsNil() { + return errors.New("entity must be a non-nil pointer to struct") + } + ev := rv.Elem() + if ev.Kind() != reflect.Struct { + return errors.New("entity must point to a struct") + } + _, pkMeta, hasPK := collectMeta(ev.Type()) + if !hasPK { + return errors.New("no primary key field found") + } + table, ok := getTableNameFromEntity(ev) + if !ok || table == "" { + return errors.New("unable to resolve table name") + } + pkVal := ev.Field(pkMeta.index).Interface() + sqlStr := fmt.Sprintf("DELETE FROM %s WHERE %s = ?", table, pkMeta.column) + _, err := exec.ExecContext(ctx, sqlStr, pkVal) + return err +} + +func isZeroValue(v reflect.Value) bool { + switch v.Kind() { + case reflect.String: + return v.Len() == 0 + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return v.Int() == 0 + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return v.Uint() == 0 + case reflect.Bool: + return v.Bool() == false + case reflect.Pointer, reflect.Interface: + return v.IsNil() + case reflect.Slice, reflect.Map: + return v.Len() == 0 + case reflect.Struct: + // Special-case time.Time + if v.Type() == reflect.TypeOf(time.Time{}) { + t := v.Interface().(time.Time) + return t.IsZero() + } + zero := reflect.Zero(v.Type()) + return reflect.DeepEqual(v.Interface(), zero.Interface()) + default: + return false + } +} diff --git a/pkg/rqlite/gateway.go b/pkg/rqlite/gateway.go new file mode 100644 index 0000000..e69de29 diff --git a/pkg/rqlite/migrations.go b/pkg/rqlite/migrations.go new file mode 100644 index 0000000..1c43ed9 --- /dev/null +++ b/pkg/rqlite/migrations.go @@ -0,0 +1,436 @@ +package rqlite + +import ( + "context" + "database/sql" + "fmt" + "io/fs" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "unicode" + + _ "github.com/rqlite/gorqlite/stdlib" + "go.uber.org/zap" +) + +// ApplyMigrations scans a directory for *.sql files, orders them by numeric prefix, +// and applies any that are not yet recorded in schema_migrations(version). +func ApplyMigrations(ctx context.Context, db *sql.DB, dir string, logger *zap.Logger) error { + if logger == nil { + logger = zap.NewNop() + } + + if err := ensureMigrationsTable(ctx, db); err != nil { + return fmt.Errorf("ensure schema_migrations: %w", err) + } + + files, err := readMigrationFiles(dir) + if err != nil { + return fmt.Errorf("read migration files: %w", err) + } + if len(files) == 0 { + logger.Info("No migrations found", zap.String("dir", dir)) + return nil + } + + applied, err := loadAppliedVersions(ctx, db) + if err != nil { + return fmt.Errorf("load applied versions: %w", err) + } + + for _, mf := range files { + if applied[mf.Version] { + logger.Info("Migration already applied; skipping", zap.Int("version", mf.Version), zap.String("name", mf.Name)) + continue + } + + sqlBytes, err := os.ReadFile(mf.Path) + if err != nil { + return fmt.Errorf("read migration %s: %w", mf.Path, err) + } + + logger.Info("Applying migration", zap.Int("version", mf.Version), zap.String("name", mf.Name)) + if err := applySQL(ctx, db, string(sqlBytes)); err != nil { + return fmt.Errorf("apply migration %d (%s): %w", mf.Version, mf.Name, err) + } + + if _, err := db.ExecContext(ctx, `INSERT OR IGNORE INTO schema_migrations(version) VALUES (?)`, mf.Version); err != nil { + return fmt.Errorf("record migration %d: %w", mf.Version, err) + } + logger.Info("Migration applied", zap.Int("version", mf.Version), zap.String("name", mf.Name)) + } + + return nil +} + +// ApplyMigrationsDirs applies migrations from multiple directories. +// - Gathers *.sql files from each dir +// - Parses numeric prefix as the version +// - Errors if the same version appears in more than one dir (to avoid ambiguity) +// - Sorts globally by version and applies those not yet in schema_migrations +func ApplyMigrationsDirs(ctx context.Context, db *sql.DB, dirs []string, logger *zap.Logger) error { + if logger == nil { + logger = zap.NewNop() + } + if err := ensureMigrationsTable(ctx, db); err != nil { + return fmt.Errorf("ensure schema_migrations: %w", err) + } + + files, err := readMigrationFilesFromDirs(dirs) + if err != nil { + return err + } + if len(files) == 0 { + logger.Info("No migrations found in provided directories", zap.Strings("dirs", dirs)) + return nil + } + + applied, err := loadAppliedVersions(ctx, db) + if err != nil { + return fmt.Errorf("load applied versions: %w", err) + } + + for _, mf := range files { + if applied[mf.Version] { + logger.Info("Migration already applied; skipping", zap.Int("version", mf.Version), zap.String("name", mf.Name), zap.String("path", mf.Path)) + continue + } + sqlBytes, err := os.ReadFile(mf.Path) + if err != nil { + return fmt.Errorf("read migration %s: %w", mf.Path, err) + } + + logger.Info("Applying migration", zap.Int("version", mf.Version), zap.String("name", mf.Name), zap.String("path", mf.Path)) + if err := applySQL(ctx, db, string(sqlBytes)); err != nil { + return fmt.Errorf("apply migration %d (%s): %w", mf.Version, mf.Name, err) + } + + if _, err := db.ExecContext(ctx, `INSERT OR IGNORE INTO schema_migrations(version) VALUES (?)`, mf.Version); err != nil { + return fmt.Errorf("record migration %d: %w", mf.Version, err) + } + logger.Info("Migration applied", zap.Int("version", mf.Version), zap.String("name", mf.Name)) + } + + return nil +} + +// ApplyMigrationsFromManager is a convenience helper bound to RQLiteManager. +func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error { + db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) + if err != nil { + return fmt.Errorf("open rqlite db: %w", err) + } + defer db.Close() + + return ApplyMigrations(ctx, db, dir, r.logger) +} + +// ApplyMigrationsDirs is the multi-dir variant on RQLiteManager. +func (r *RQLiteManager) ApplyMigrationsDirs(ctx context.Context, dirs []string) error { + db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) + if err != nil { + return fmt.Errorf("open rqlite db: %w", err) + } + defer db.Close() + + return ApplyMigrationsDirs(ctx, db, dirs, r.logger) +} + +func ensureMigrationsTable(ctx context.Context, db *sql.DB) error { + _, err := db.ExecContext(ctx, ` +CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +)`) + return err +} + +type migrationFile struct { + Version int + Name string + Path string +} + +func readMigrationFiles(dir string) ([]migrationFile, error) { + entries, err := os.ReadDir(dir) + if err != nil { + if os.IsNotExist(err) { + return []migrationFile{}, nil + } + return nil, err + } + + var out []migrationFile + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if !strings.HasSuffix(strings.ToLower(name), ".sql") { + continue + } + ver, ok := parseVersionPrefix(name) + if !ok { + continue + } + out = append(out, migrationFile{ + Version: ver, + Name: name, + Path: filepath.Join(dir, name), + }) + } + sort.Slice(out, func(i, j int) bool { return out[i].Version < out[j].Version }) + return out, nil +} + +func readMigrationFilesFromDirs(dirs []string) ([]migrationFile, error) { + all := make([]migrationFile, 0, 64) + seen := map[int]string{} // version -> path (for duplicate detection) + + for _, d := range dirs { + files, err := readMigrationFiles(d) + if err != nil { + return nil, fmt.Errorf("reading dir %s: %w", d, err) + } + for _, f := range files { + if prev, dup := seen[f.Version]; dup { + return nil, fmt.Errorf("duplicate migration version %d detected in %s and %s; ensure global version uniqueness", f.Version, prev, f.Path) + } + seen[f.Version] = f.Path + all = append(all, f) + } + } + sort.Slice(all, func(i, j int) bool { return all[i].Version < all[j].Version }) + return all, nil +} + +func parseVersionPrefix(name string) (int, bool) { + // Expect formats like "001_initial.sql", "2_add_table.sql", etc. + i := 0 + for i < len(name) && unicode.IsDigit(rune(name[i])) { + i++ + } + if i == 0 { + return 0, false + } + ver, err := strconv.Atoi(name[:i]) + if err != nil { + return 0, false + } + return ver, true +} + +func loadAppliedVersions(ctx context.Context, db *sql.DB) (map[int]bool, error) { + rows, err := db.QueryContext(ctx, `SELECT version FROM schema_migrations`) + if err != nil { + // If the table doesn't exist yet (very first run), ensure it and return empty set. + if isNoSuchTable(err) { + if err := ensureMigrationsTable(ctx, db); err != nil { + return nil, err + } + return map[int]bool{}, nil + } + return nil, err + } + defer rows.Close() + + applied := make(map[int]bool) + for rows.Next() { + var v int + if err := rows.Scan(&v); err != nil { + return nil, err + } + applied[v] = true + } + return applied, rows.Err() +} + +func isNoSuchTable(err error) bool { + // rqlite/sqlite error messages vary; keep it permissive + msg := strings.ToLower(err.Error()) + return strings.Contains(msg, "no such table") || strings.Contains(msg, "does not exist") +} + +// applySQL tries to run the entire script in one Exec. +// If the driver rejects multi-statement Exec, it falls back to splitting statements and executing sequentially. +func applySQL(ctx context.Context, db *sql.DB, script string) error { + s := strings.TrimSpace(script) + if s == "" { + return nil + } + if _, err := db.ExecContext(ctx, s); err == nil { + return nil + } else { + // Fall back to splitting into statements and executing sequentially (respecting BEGIN/COMMIT if present). + stmts := splitSQLStatements(s) + // If the script already contains explicit BEGIN/COMMIT, we just run as-is. + // Otherwise, we attempt to wrap in a transaction; if BeginTx fails, execute one-by-one. + hasExplicitTxn := containsToken(stmts, "BEGIN") || containsToken(stmts, "BEGIN;") + if !hasExplicitTxn { + if tx, txErr := db.BeginTx(ctx, nil); txErr == nil { + for _, stmt := range stmts { + if stmt == "" { + continue + } + if _, execErr := tx.ExecContext(ctx, stmt); execErr != nil { + _ = tx.Rollback() + return fmt.Errorf("exec stmt failed: %w (stmt: %s)", execErr, snippet(stmt)) + } + } + return tx.Commit() + } + // Fall through to plain sequential exec if BeginTx not supported. + } + + for _, stmt := range stmts { + if stmt == "" { + continue + } + if _, execErr := db.ExecContext(ctx, stmt); execErr != nil { + return fmt.Errorf("exec stmt failed: %w (stmt: %s)", execErr, snippet(stmt)) + } + } + return nil + } +} + +func containsToken(stmts []string, token string) bool { + for _, s := range stmts { + if strings.EqualFold(strings.TrimSpace(s), token) { + return true + } + } + return false +} + +func snippet(s string) string { + s = strings.TrimSpace(s) + if len(s) > 120 { + return s[:120] + "..." + } + return s +} + +// splitSQLStatements splits a SQL script into statements by semicolon, ignoring semicolons +// inside single/double-quoted strings and skipping comments (-- and /* */). +func splitSQLStatements(in string) []string { + var out []string + var b strings.Builder + + inLineComment := false + inBlockComment := false + inSingle := false + inDouble := false + + runes := []rune(in) + for i := 0; i < len(runes); i++ { + ch := runes[i] + next := rune(0) + if i+1 < len(runes) { + next = runes[i+1] + } + + // Handle end of line comment + if inLineComment { + if ch == '\n' { + inLineComment = false + // keep newline normalization but don't include comment + } + continue + } + // Handle end of block comment + if inBlockComment { + if ch == '*' && next == '/' { + inBlockComment = false + i++ + } + continue + } + + // Start of comments? + if !inSingle && !inDouble { + if ch == '-' && next == '-' { + inLineComment = true + i++ + continue + } + if ch == '/' && next == '*' { + inBlockComment = true + i++ + continue + } + } + + // Quotes + if !inDouble && ch == '\'' { + // Toggle single quotes, respecting escaped '' inside. + if inSingle { + // Check for escaped '' (two single quotes) + if next == '\'' { + b.WriteRune(ch) // write one ' + i++ // skip the next ' + continue + } + inSingle = false + } else { + inSingle = true + } + b.WriteRune(ch) + continue + } + if !inSingle && ch == '"' { + if inDouble { + if next == '"' { + b.WriteRune(ch) + i++ + continue + } + inDouble = false + } else { + inDouble = true + } + b.WriteRune(ch) + continue + } + + // Statement boundary + if ch == ';' && !inSingle && !inDouble { + stmt := strings.TrimSpace(b.String()) + if stmt != "" { + out = append(out, stmt) + } + b.Reset() + continue + } + + b.WriteRune(ch) + } + + // Final fragment + if s := strings.TrimSpace(b.String()); s != "" { + out = append(out, s) + } + return out +} + +// Optional helper to load embedded migrations if you later decide to embed. +// Keep for future use; currently unused. +func readDirFS(fsys fs.FS, root string) ([]string, error) { + var files []string + err := fs.WalkDir(fsys, root, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + if strings.HasSuffix(strings.ToLower(d.Name()), ".sql") { + files = append(files, path) + } + return nil + }) + return files, err +} diff --git a/pkg/database/rqlite.go b/pkg/rqlite/rqlite.go similarity index 96% rename from pkg/database/rqlite.go rename to pkg/rqlite/rqlite.go index de1866b..740e887 100644 --- a/pkg/database/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -1,4 +1,4 @@ -package database +package rqlite import ( "context" @@ -174,6 +174,14 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } } + // After waitForLeadership / waitForSQLAvailable succeeds, before returning: + migrationsDir := "network/migrations" + + if err := r.ApplyMigrations(ctx, migrationsDir); err != nil { + r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir)) + return fmt.Errorf("apply migrations: %w", err) + } + r.logger.Info("RQLite node started successfully") return nil } diff --git a/pkg/storage/client.go b/pkg/storage/client.go deleted file mode 100644 index 2a551d4..0000000 --- a/pkg/storage/client.go +++ /dev/null @@ -1,231 +0,0 @@ -package storage - -import ( - "context" - "fmt" - "io" - "time" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - "go.uber.org/zap" -) - -// Client provides distributed storage client functionality -type Client struct { - host host.Host - logger *zap.Logger - namespace string -} - -// Context utilities for namespace override -type ctxKey string - -// CtxKeyNamespaceOverride is the context key used to override namespace per request -const CtxKeyNamespaceOverride ctxKey = "storage_ns_override" - -// WithNamespace returns a new context that carries a storage namespace override -func WithNamespace(ctx context.Context, ns string) context.Context { - return context.WithValue(ctx, CtxKeyNamespaceOverride, ns) -} - -// NewClient creates a new storage client -func NewClient(h host.Host, namespace string, logger *zap.Logger) *Client { - return &Client{ - host: h, - logger: logger, - namespace: namespace, - } -} - -// Put stores a key-value pair in the distributed storage -func (c *Client) Put(ctx context.Context, key string, value []byte) error { - ns := c.namespace - if v := ctx.Value(CtxKeyNamespaceOverride); v != nil { - if s, ok := v.(string); ok && s != "" { - ns = s - } - } - request := &StorageRequest{ - Type: MessageTypePut, - Key: key, - Value: value, - Namespace: ns, - } - - return c.sendRequest(ctx, request) -} - -// Get retrieves a value by key from the distributed storage -func (c *Client) Get(ctx context.Context, key string) ([]byte, error) { - ns := c.namespace - if v := ctx.Value(CtxKeyNamespaceOverride); v != nil { - if s, ok := v.(string); ok && s != "" { - ns = s - } - } - request := &StorageRequest{ - Type: MessageTypeGet, - Key: key, - Namespace: ns, - } - - response, err := c.sendRequestWithResponse(ctx, request) - if err != nil { - return nil, err - } - - if !response.Success { - return nil, fmt.Errorf(response.Error) - } - - return response.Value, nil -} - -// Delete removes a key from the distributed storage -func (c *Client) Delete(ctx context.Context, key string) error { - ns := c.namespace - if v := ctx.Value(CtxKeyNamespaceOverride); v != nil { - if s, ok := v.(string); ok && s != "" { - ns = s - } - } - request := &StorageRequest{ - Type: MessageTypeDelete, - Key: key, - Namespace: ns, - } - - return c.sendRequest(ctx, request) -} - -// List returns keys with a given prefix -func (c *Client) List(ctx context.Context, prefix string, limit int) ([]string, error) { - ns := c.namespace - if v := ctx.Value(CtxKeyNamespaceOverride); v != nil { - if s, ok := v.(string); ok && s != "" { - ns = s - } - } - request := &StorageRequest{ - Type: MessageTypeList, - Prefix: prefix, - Limit: limit, - Namespace: ns, - } - - response, err := c.sendRequestWithResponse(ctx, request) - if err != nil { - return nil, err - } - - if !response.Success { - return nil, fmt.Errorf(response.Error) - } - - return response.Keys, nil -} - -// Exists checks if a key exists in the distributed storage -func (c *Client) Exists(ctx context.Context, key string) (bool, error) { - ns := c.namespace - if v := ctx.Value(CtxKeyNamespaceOverride); v != nil { - if s, ok := v.(string); ok && s != "" { - ns = s - } - } - request := &StorageRequest{ - Type: MessageTypeExists, - Key: key, - Namespace: ns, - } - - response, err := c.sendRequestWithResponse(ctx, request) - if err != nil { - return false, err - } - - if !response.Success { - return false, fmt.Errorf(response.Error) - } - - return response.Exists, nil -} - -// sendRequest sends a request without expecting a response -func (c *Client) sendRequest(ctx context.Context, request *StorageRequest) error { - _, err := c.sendRequestWithResponse(ctx, request) - return err -} - -// sendRequestWithResponse sends a request and waits for a response -func (c *Client) sendRequestWithResponse(ctx context.Context, request *StorageRequest) (*StorageResponse, error) { - // Get connected peers - peers := c.host.Network().Peers() - if len(peers) == 0 { - return nil, fmt.Errorf("no peers connected") - } - - // Try to send to the first available peer - // In a production system, you might want to implement peer selection logic - for _, peerID := range peers { - response, err := c.sendToPeer(ctx, peerID, request) - if err != nil { - c.logger.Debug("Failed to send to peer", - zap.String("peer", peerID.String()), - zap.Error(err)) - continue - } - return response, nil - } - - return nil, fmt.Errorf("failed to send request to any peer") -} - -// sendToPeer sends a request to a specific peer -func (c *Client) sendToPeer(ctx context.Context, peerID peer.ID, request *StorageRequest) (*StorageResponse, error) { - // Create a new stream to the peer - stream, err := c.host.NewStream(ctx, peerID, protocol.ID(StorageProtocolID)) - if err != nil { - return nil, fmt.Errorf("failed to create stream: %w", err) - } - defer stream.Close() - - // Set deadline for the operation - deadline, ok := ctx.Deadline() - if ok { - stream.SetDeadline(deadline) - } else { - stream.SetDeadline(time.Now().Add(30 * time.Second)) - } - - // Marshal and send request - requestData, err := request.Marshal() - if err != nil { - return nil, fmt.Errorf("failed to marshal request: %w", err) - } - - if _, err := stream.Write(requestData); err != nil { - return nil, fmt.Errorf("failed to write request: %w", err) - } - - // Close write side to signal end of request - if err := stream.CloseWrite(); err != nil { - return nil, fmt.Errorf("failed to close write: %w", err) - } - - // Read response - responseData, err := io.ReadAll(stream) - if err != nil { - return nil, fmt.Errorf("failed to read response: %w", err) - } - - // Unmarshal response - var response StorageResponse - if err := response.Unmarshal(responseData); err != nil { - return nil, fmt.Errorf("failed to unmarshal response: %w", err) - } - - return &response, nil -} diff --git a/pkg/storage/kv_ops.go b/pkg/storage/kv_ops.go deleted file mode 100644 index ad46b85..0000000 --- a/pkg/storage/kv_ops.go +++ /dev/null @@ -1,182 +0,0 @@ -package storage - -import ( - "database/sql" - "fmt" - - "go.uber.org/zap" -) - -// processRequest processes a storage request and returns a response -func (s *Service) processRequest(req *StorageRequest) *StorageResponse { - switch req.Type { - case MessageTypePut: - return s.handlePut(req) - case MessageTypeGet: - return s.handleGet(req) - case MessageTypeDelete: - return s.handleDelete(req) - case MessageTypeList: - return s.handleList(req) - case MessageTypeExists: - return s.handleExists(req) - default: - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("unknown message type: %s", req.Type), - } - } -} - -// handlePut stores a key-value pair -func (s *Service) handlePut(req *StorageRequest) *StorageResponse { - s.mu.Lock() - defer s.mu.Unlock() - - // Use REPLACE to handle both insert and update - query := ` - REPLACE INTO kv_storage (namespace, key, value, updated_at) - VALUES (?, ?, ?, CURRENT_TIMESTAMP) - ` - - _, err := s.db.Exec(query, req.Namespace, req.Key, req.Value) - if err != nil { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to store key: %v", err), - } - } - - s.logger.Debug("Stored key", zap.String("key", req.Key), zap.String("namespace", req.Namespace)) - return &StorageResponse{Success: true} -} - -// handleGet retrieves a value by key -func (s *Service) handleGet(req *StorageRequest) *StorageResponse { - s.mu.RLock() - defer s.mu.RUnlock() - - query := `SELECT value FROM kv_storage WHERE namespace = ? AND key = ?` - - var value []byte - err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&value) - if err != nil { - if err == sql.ErrNoRows { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("key not found: %s", req.Key), - } - } - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to get key: %v", err), - } - } - - return &StorageResponse{ - Success: true, - Value: value, - } -} - -// handleDelete removes a key -func (s *Service) handleDelete(req *StorageRequest) *StorageResponse { - s.mu.Lock() - defer s.mu.Unlock() - - query := `DELETE FROM kv_storage WHERE namespace = ? AND key = ?` - - result, err := s.db.Exec(query, req.Namespace, req.Key) - if err != nil { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to delete key: %v", err), - } - } - - rowsAffected, _ := result.RowsAffected() - if rowsAffected == 0 { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("key not found: %s", req.Key), - } - } - - s.logger.Debug("Deleted key", zap.String("key", req.Key), zap.String("namespace", req.Namespace)) - return &StorageResponse{Success: true} -} - -// handleList lists keys with a prefix -func (s *Service) handleList(req *StorageRequest) *StorageResponse { - s.mu.RLock() - defer s.mu.RUnlock() - - var query string - var args []interface{} - - if req.Prefix == "" { - // List all keys in namespace - query = `SELECT key FROM kv_storage WHERE namespace = ?` - args = []interface{}{req.Namespace} - } else { - // List keys with prefix - query = `SELECT key FROM kv_storage WHERE namespace = ? AND key LIKE ?` - args = []interface{}{req.Namespace, req.Prefix + "%"} - } - - if req.Limit > 0 { - query += ` LIMIT ?` - args = append(args, req.Limit) - } - - rows, err := s.db.Query(query, args...) - if err != nil { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to query keys: %v", err), - } - } - defer rows.Close() - - var keys []string - for rows.Next() { - var key string - if err := rows.Scan(&key); err != nil { - continue - } - keys = append(keys, key) - } - - return &StorageResponse{ - Success: true, - Keys: keys, - } -} - -// handleExists checks if a key exists -func (s *Service) handleExists(req *StorageRequest) *StorageResponse { - s.mu.RLock() - defer s.mu.RUnlock() - - query := `SELECT 1 FROM kv_storage WHERE namespace = ? AND key = ? LIMIT 1` - - var exists int - err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&exists) - if err != nil { - if err == sql.ErrNoRows { - return &StorageResponse{ - Success: true, - Exists: false, - } - } - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to check key existence: %v", err), - } - } - - return &StorageResponse{ - Success: true, - Exists: true, - } -} diff --git a/pkg/storage/logging.go b/pkg/storage/logging.go deleted file mode 100644 index 648af74..0000000 --- a/pkg/storage/logging.go +++ /dev/null @@ -1,16 +0,0 @@ -package storage - -import "go.uber.org/zap" - -// newStorageLogger creates a zap.Logger for storage components. -// Callers can pass quiet=true to reduce log verbosity. -func newStorageLogger(quiet bool) (*zap.Logger, error) { - if quiet { - cfg := zap.NewProductionConfig() - cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel) - cfg.DisableCaller = true - cfg.DisableStacktrace = true - return cfg.Build() - } - return zap.NewDevelopment() -} diff --git a/pkg/storage/protocol.go b/pkg/storage/protocol.go deleted file mode 100644 index af9c4dd..0000000 --- a/pkg/storage/protocol.go +++ /dev/null @@ -1,60 +0,0 @@ -package storage - -import ( - "encoding/json" -) - -// Storage protocol definitions for distributed storage -const ( - StorageProtocolID = "/network/storage/1.0.0" -) - -// Message types for storage operations -type MessageType string - -const ( - MessageTypePut MessageType = "put" - MessageTypeGet MessageType = "get" - MessageTypeDelete MessageType = "delete" - MessageTypeList MessageType = "list" - MessageTypeExists MessageType = "exists" -) - -// StorageRequest represents a storage operation request -type StorageRequest struct { - Type MessageType `json:"type"` - Key string `json:"key"` - Value []byte `json:"value,omitempty"` - Prefix string `json:"prefix,omitempty"` - Limit int `json:"limit,omitempty"` - Namespace string `json:"namespace"` -} - -// StorageResponse represents a storage operation response -type StorageResponse struct { - Success bool `json:"success"` - Error string `json:"error,omitempty"` - Value []byte `json:"value,omitempty"` - Keys []string `json:"keys,omitempty"` - Exists bool `json:"exists,omitempty"` -} - -// Marshal serializes a request to JSON -func (r *StorageRequest) Marshal() ([]byte, error) { - return json.Marshal(r) -} - -// Unmarshal deserializes a request from JSON -func (r *StorageRequest) Unmarshal(data []byte) error { - return json.Unmarshal(data, r) -} - -// Marshal serializes a response to JSON -func (r *StorageResponse) Marshal() ([]byte, error) { - return json.Marshal(r) -} - -// Unmarshal deserializes a response from JSON -func (r *StorageResponse) Unmarshal(data []byte) error { - return json.Unmarshal(data, r) -} diff --git a/pkg/storage/protocol_test.go b/pkg/storage/protocol_test.go deleted file mode 100644 index 07d3673..0000000 --- a/pkg/storage/protocol_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package storage - -import "testing" - -func TestRequestResponseJSON(t *testing.T) { - req := &StorageRequest{Type: MessageTypePut, Key: "k", Value: []byte("v"), Namespace: "ns"} - b, err := req.Marshal() - if err != nil { t.Fatal(err) } - var out StorageRequest - if err := out.Unmarshal(b); err != nil { t.Fatal(err) } - if out.Type != MessageTypePut || out.Key != "k" || out.Namespace != "ns" { - t.Fatalf("roundtrip mismatch: %+v", out) - } - - resp := &StorageResponse{Success: true, Keys: []string{"a"}, Exists: true} - b, err = resp.Marshal() - if err != nil { t.Fatal(err) } - var outR StorageResponse - if err := outR.Unmarshal(b); err != nil { t.Fatal(err) } - if !outR.Success || !outR.Exists || len(outR.Keys) != 1 { - t.Fatalf("resp mismatch: %+v", outR) - } -} diff --git a/pkg/storage/rqlite_init.go b/pkg/storage/rqlite_init.go deleted file mode 100644 index c339467..0000000 --- a/pkg/storage/rqlite_init.go +++ /dev/null @@ -1,37 +0,0 @@ -package storage - -import ( - "fmt" -) - -// initTables creates the necessary tables for key-value storage -func (s *Service) initTables() error { - // Create storage table with namespace support - createTableSQL := ` - CREATE TABLE IF NOT EXISTS kv_storage ( - namespace TEXT NOT NULL, - key TEXT NOT NULL, - value BLOB NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (namespace, key) - ) - ` - - // Create index for faster queries - createIndexSQL := ` - CREATE INDEX IF NOT EXISTS idx_kv_storage_namespace_key - ON kv_storage(namespace, key) - ` - - if _, err := s.db.Exec(createTableSQL); err != nil { - return fmt.Errorf("failed to create storage table: %w", err) - } - - if _, err := s.db.Exec(createIndexSQL); err != nil { - return fmt.Errorf("failed to create storage index: %w", err) - } - - s.logger.Info("Storage tables initialized successfully") - return nil -} diff --git a/pkg/storage/service.go b/pkg/storage/service.go deleted file mode 100644 index e3062c2..0000000 --- a/pkg/storage/service.go +++ /dev/null @@ -1,32 +0,0 @@ -package storage - -import ( - "database/sql" - "sync" - - "go.uber.org/zap" -) - -// Service provides distributed storage functionality using RQLite -type Service struct { - logger *zap.Logger - db *sql.DB - mu sync.RWMutex -} - -// NewService creates a new storage service backed by RQLite -func NewService(db *sql.DB, logger *zap.Logger) (*Service, error) { - service := &Service{ - logger: logger, - db: db, - } - - return service, nil -} - -// Close closes the storage service -func (s *Service) Close() error { - // The database connection is managed elsewhere - s.logger.Info("Storage service closed") - return nil -} diff --git a/pkg/storage/stream_handler.go b/pkg/storage/stream_handler.go deleted file mode 100644 index 4c38fa1..0000000 --- a/pkg/storage/stream_handler.go +++ /dev/null @@ -1,48 +0,0 @@ -package storage - -import ( - "io" - - "github.com/libp2p/go-libp2p/core/network" - "go.uber.org/zap" -) - -// HandleStorageStream handles incoming storage protocol streams -func (s *Service) HandleStorageStream(stream network.Stream) { - defer stream.Close() - - // Read request - data, err := io.ReadAll(stream) - if err != nil { - s.logger.Error("Failed to read storage request", zap.Error(err)) - return - } - - var request StorageRequest - if err := request.Unmarshal(data); err != nil { - s.logger.Error("Failed to unmarshal storage request", zap.Error(err)) - return - } - - // Process request - response := s.processRequest(&request) - - // Send response - responseData, err := response.Marshal() - if err != nil { - s.logger.Error("Failed to marshal storage response", zap.Error(err)) - return - } - - if _, err := stream.Write(responseData); err != nil { - s.logger.Error("Failed to write storage response", zap.Error(err)) - return - } - - s.logger.Debug("Handled storage request", - zap.String("type", string(request.Type)), - zap.String("key", request.Key), - zap.String("namespace", request.Namespace), - zap.Bool("success", response.Success), - ) -} From c0d8fcb8950e9a4b998a8615f82dc5fb4951dc52 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Tue, 23 Sep 2025 07:19:35 +0300 Subject: [PATCH 2/2] - Created namespace_helpers on gateway - Removed old pkg/gatway storage and migrated to new rqlite - Updated readme - Created new rqlite implementation - Updated changelog - Fixed migration error on migrations.go applySQL --- CHANGELOG.md | 4 + README.md | 98 ++- pkg/config/config.go | 11 +- pkg/gateway/apps_handlers.go | 170 ----- pkg/gateway/auth_handlers.go | 3 +- pkg/gateway/gateway.go | 82 +-- pkg/gateway/middleware.go | 19 +- pkg/gateway/migrate.go | 188 ------ pkg/gateway/migrate_test.go | 42 -- .../{db_helpers.go => namespace_helpers.go} | 21 +- pkg/gateway/pubsub_handlers.go | 4 +- pkg/gateway/routes.go | 15 +- pkg/gateway/storage_handlers.go | 120 +--- pkg/rqlite/gateway.go | 615 ++++++++++++++++++ pkg/rqlite/migrations.go | 72 +- pkg/rqlite/rqlite.go | 5 +- 16 files changed, 807 insertions(+), 662 deletions(-) delete mode 100644 pkg/gateway/apps_handlers.go delete mode 100644 pkg/gateway/migrate.go delete mode 100644 pkg/gateway/migrate_test.go rename pkg/gateway/{db_helpers.go => namespace_helpers.go} (50%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 664f1e0..ed22aae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,17 +10,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant - Created new rqlite folder - Created rqlite adapter, client, gateway, migrations and rqlite init +- Created namespace_helpers on gateway +- Created new rqlite implementation ### Changed - Updated node.go to support new rqlite architecture - Updated readme + ### Deprecated ### Removed - Removed old storage folder +- Removed old pkg/gatway storage and migrated to new rqlite ### Fixed diff --git a/README.md b/README.md index a0f038b..ddf6f03 100644 --- a/README.md +++ b/README.md @@ -414,33 +414,65 @@ bootstrap_peers: ### Database Operations (Gateway REST) ```http -POST /v1/db/create-table # Body: {"schema": "CREATE TABLE ..."} -POST /v1/db/drop-table # Body: {"table": "table_name"} -POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]} -POST /v1/db/transaction # Body: {"statements": ["SQL 1", "SQL 2", ...]} -GET /v1/db/schema # Returns current tables and columns +POST /v1/db/exec # Body: {"sql": "INSERT/UPDATE/DELETE/DDL ...", "args": [...]} +POST /v1/db/find # Body: {"table":"...", "criteria":{"col":val,...}, "options":{...}} +POST /v1/db/find-one # Body: same as /find, returns a single row (404 if not found) +POST /v1/db/select # Body: {"table":"...", "select":[...], "where":[...], "joins":[...], "order_by":[...], "limit":N, "offset":N, "one":false} +POST /v1/db/transaction # Body: {"ops":[{"kind":"exec|query","sql":"...","args":[...]}], "return_results": true} +POST /v1/db/query # Body: {"sql": "SELECT ...", "args": [..]} (legacy-friendly SELECT) +GET /v1/db/schema # Returns tables/views + create SQL +POST /v1/db/create-table # Body: {"schema": "CREATE TABLE ..."} +POST /v1/db/drop-table # Body: {"table": "table_name"} ``` -Common migration workflow: +Common workflows: ```bash -# Add a new table -curl -X POST "$GW/v1/db/create-table" \ +# Exec (INSERT/UPDATE/DELETE/DDL) +curl -X POST "$GW/v1/db/exec" \ -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ - -d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)"}' + -d '{"sql":"INSERT INTO users(name,email) VALUES(?,?)","args":["Alice","alice@example.com"]}' -# Apply multiple statements atomically +# Find (criteria + options) +curl -X POST "$GW/v1/db/find" \ + -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{ + "table":"users", + "criteria":{"active":true}, + "options":{"select":["id","email"],"order_by":["created_at DESC"],"limit":25} + }' + +# Select (fluent builder via JSON) +curl -X POST "$GW/v1/db/select" \ + -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{ + "table":"orders o", + "select":["o.id","o.total","u.email AS user_email"], + "joins":[{"kind":"INNER","table":"users u","on":"u.id = o.user_id"}], + "where":[{"conj":"AND","expr":"o.total > ?","args":[100]}], + "order_by":["o.created_at DESC"], + "limit":10 + }' + +# Transaction (atomic batch) curl -X POST "$GW/v1/db/transaction" \ -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ - -d '{"statements":[ - "ALTER TABLE users ADD COLUMN email TEXT", - "CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)" - ]}' + -d '{ + "return_results": true, + "ops": [ + {"kind":"exec","sql":"INSERT INTO users(email) VALUES(?)","args":["bob@example.com"]}, + {"kind":"query","sql":"SELECT last_insert_rowid() AS id","args":[]} + ] + }' -# Verify -curl -X POST "$GW/v1/db/query" \ - -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ - -d '{"sql":"PRAGMA table_info(users)"}' +# Schema +curl "$GW/v1/db/schema" -H "Authorization: Bearer $API_KEY" + +# DDL helpers +curl -X POST "$GW/v1/db/create-table" -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{"schema":"CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)"}' +curl -X POST "$GW/v1/db/drop-table" -H "Authorization: Bearer $API_KEY" -H 'Content-Type: application/json' \ + -d '{"table":"users"}' ``` ### Authentication @@ -540,7 +572,23 @@ GET /v1/auth/whoami # Current auth status POST /v1/auth/api-key # Generate API key (authenticated) ``` +#### RQLite HTTP ORM Gateway (/v1/db) +The gateway now exposes a full HTTP interface over the Go ORM-like client (see `pkg/rqlite/gateway.go`) so you can build SDKs in any language. + +- Base path: `/v1/db` +- Endpoints: + - `POST /v1/db/exec` — Execute write/DDL SQL; returns `{ rows_affected, last_insert_id }` + - `POST /v1/db/find` — Map-based criteria; returns `{ items: [...], count: N }` + - `POST /v1/db/find-one` — Single row; 404 if not found + - `POST /v1/db/select` — Fluent SELECT via JSON (joins, where, order, group, limit, offset) + - `POST /v1/db/transaction` — Atomic batch of exec/query ops, optional per-op results + - `POST /v1/db/query` — Arbitrary SELECT (legacy-friendly), returns `items` + - `GET /v1/db/schema` — List user tables/views + create SQL + - `POST /v1/db/create-table` — Convenience for DDL + - `POST /v1/db/drop-table` — Safe drop (identifier validated) + +Payload examples are shown in the [Database Operations (Gateway REST)](#database-operations-gateway-rest) section. #### Network Operations ```http @@ -575,11 +623,15 @@ GET /v1/pubsub/topics # List active topics ### Key HTTP endpoints for SDKs - **Database** - - Create Table: `POST /v1/db/create-table` `{schema}` → `{status:"ok"}` - - Drop Table: `POST /v1/db/drop-table` `{table}` → `{status:"ok"}` - - Query: `POST /v1/db/query` `{sql, args?}` → `{columns, rows, count}` - - Transaction: `POST /v1/db/transaction` `{statements:[...]}` → `{status:"ok"}` - - Schema: `GET /v1/db/schema` → schema JSON + - Exec: `POST /v1/db/exec` `{sql, args?}` → `{rows_affected,last_insert_id}` + - Find: `POST /v1/db/find` `{table, criteria, options?}` → `{items,count}` + - FindOne: `POST /v1/db/find-one` `{table, criteria, options?}` → single object or 404 + - Select: `POST /v1/db/select` `{table, select?, joins?, where?, order_by?, group_by?, limit?, offset?, one?}` + - Transaction: `POST /v1/db/transaction` `{ops:[{kind,sql,args?}], return_results?}` + - Query: `POST /v1/db/query` `{sql, args?}` → `{items,count}` + - Schema: `GET /v1/db/schema` + - Create Table: `POST /v1/db/create-table` `{schema}` + - Drop Table: `POST /v1/db/drop-table` `{table}` - **PubSub** - WS Subscribe: `GET /v1/pubsub/ws?topic=` - Publish: `POST /v1/pubsub/publish` `{topic, data_base64}` → `{status:"ok"}` diff --git a/pkg/config/config.go b/pkg/config/config.go index de0bcbe..39bef1b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -109,12 +109,11 @@ func DefaultConfig() *Config { }, Discovery: DiscoveryConfig{ BootstrapPeers: []string{ - "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWSHHwEY6cga3ng7tD1rzStAU58ogQXVMX3LZJ6Gqf6dee", - // "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx", - // "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1", - // "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR", - // "/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1", - // "/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4", + "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx", + "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1", + "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR", + "/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1", + "/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4", }, BootstrapPort: 4001, // Default LibP2P port DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing diff --git a/pkg/gateway/apps_handlers.go b/pkg/gateway/apps_handlers.go deleted file mode 100644 index 2b6c533..0000000 --- a/pkg/gateway/apps_handlers.go +++ /dev/null @@ -1,170 +0,0 @@ -package gateway - -import ( - "crypto/rand" - "encoding/base64" - "encoding/json" - "net/http" - "strings" - - "github.com/DeBrosOfficial/network/pkg/storage" -) - -// appsHandler implements minimal CRUD for apps within a namespace. -// Routes handled: -// - GET /v1/apps -> list -// - POST /v1/apps -> create -// - GET /v1/apps/{app_id} -> fetch -// - PUT /v1/apps/{app_id} -> update (name/public_key) -// - DELETE /v1/apps/{app_id} -> delete -func (g *Gateway) appsHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - ctx := r.Context() - ns := g.cfg.ClientNamespace - if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil { - if s, ok := v.(string); ok && s != "" { - ns = s - } - } - if strings.TrimSpace(ns) == "" { - ns = "default" - } - db := g.client.Database() - nsID, err := g.resolveNamespaceID(ctx, ns) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - - path := r.URL.Path - // Determine if operating on collection or single resource - if path == "/v1/apps" || path == "/v1/apps/" { - switch r.Method { - case http.MethodGet: - // List apps - res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? ORDER BY created_at DESC", nsID) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - items := make([]map[string]any, 0, res.Count) - for _, row := range res.Rows { - item := map[string]any{ - "app_id": row[0], - "name": row[1], - "public_key": row[2], - "namespace": ns, - "created_at": row[3], - } - items = append(items, item) - } - writeJSON(w, http.StatusOK, map[string]any{"items": items, "count": len(items)}) - return - case http.MethodPost: - // Create app with provided name/public_key - var req struct { - Name string `json:"name"` - PublicKey string `json:"public_key"` - } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid json body") - return - } - // Generate app_id - buf := make([]byte, 12) - if _, err := rand.Read(buf); err != nil { - writeError(w, http.StatusInternalServerError, "failed to generate app id") - return - } - appID := "app_" + base64.RawURLEncoding.EncodeToString(buf) - if _, err := db.Query(ctx, "INSERT INTO apps(namespace_id, app_id, name, public_key) VALUES (?, ?, ?, ?)", nsID, appID, req.Name, req.PublicKey); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusCreated, map[string]any{ - "app_id": appID, - "name": req.Name, - "public_key": req.PublicKey, - "namespace": ns, - }) - return - default: - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - } - - // Single resource: /v1/apps/{app_id} - if strings.HasPrefix(path, "/v1/apps/") { - appID := strings.TrimPrefix(path, "/v1/apps/") - appID = strings.TrimSpace(appID) - if appID == "" { - writeError(w, http.StatusBadRequest, "missing app_id") - return - } - switch r.Method { - case http.MethodGet: - res, err := db.Query(ctx, "SELECT app_id, name, public_key, created_at FROM apps WHERE namespace_id = ? AND app_id = ? LIMIT 1", nsID, appID) - if err != nil || res == nil || res.Count == 0 { - writeError(w, http.StatusNotFound, "app not found") - return - } - row := res.Rows[0] - writeJSON(w, http.StatusOK, map[string]any{ - "app_id": row[0], - "name": row[1], - "public_key": row[2], - "namespace": ns, - "created_at": row[3], - }) - return - case http.MethodPut: - var req struct { - Name *string `json:"name"` - PublicKey *string `json:"public_key"` - } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid json body") - return - } - // Build update dynamically - sets := make([]string, 0, 2) - args := make([]any, 0, 4) - if req.Name != nil { - sets = append(sets, "name = ?") - args = append(args, *req.Name) - } - if req.PublicKey != nil { - sets = append(sets, "public_key = ?") - args = append(args, *req.PublicKey) - } - if len(sets) == 0 { - writeError(w, http.StatusBadRequest, "no fields to update") - return - } - q := "UPDATE apps SET " + strings.Join(sets, ", ") + " WHERE namespace_id = ? AND app_id = ?" - args = append(args, nsID, appID) - if _, err := db.Query(ctx, q, args...); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) - return - case http.MethodDelete: - if _, err := db.Query(ctx, "DELETE FROM apps WHERE namespace_id = ? AND app_id = ?", nsID, appID); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) - return - default: - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - } - - writeError(w, http.StatusNotFound, "not found") -} diff --git a/pkg/gateway/auth_handlers.go b/pkg/gateway/auth_handlers.go index 0504fee..ceec8e7 100644 --- a/pkg/gateway/auth_handlers.go +++ b/pkg/gateway/auth_handlers.go @@ -12,7 +12,6 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/storage" ethcrypto "github.com/ethereum/go-ethereum/crypto" ) @@ -20,7 +19,7 @@ func (g *Gateway) whoamiHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // Determine namespace (may be overridden by auth layer) ns := g.cfg.ClientNamespace - if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil { + if v := ctx.Value(ctxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok && s != "" { ns = s } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index fbb9a10..4e140ed 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -4,13 +4,16 @@ import ( "context" "crypto/rand" "crypto/rsa" - "net/http" + "database/sql" "strconv" "time" "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/logging" + "github.com/DeBrosOfficial/network/pkg/rqlite" "go.uber.org/zap" + + _ "github.com/rqlite/gorqlite/stdlib" ) // Config holds configuration for the gateway server @@ -18,6 +21,10 @@ type Config struct { ListenAddr string ClientNamespace string BootstrapPeers []string + + // Optional DSN for rqlite database/sql driver, e.g. "http://localhost:4001" + // If empty, defaults to "http://localhost:4001". + RQLiteDSN string } type Gateway struct { @@ -27,6 +34,11 @@ type Gateway struct { startedAt time.Time signingKey *rsa.PrivateKey keyID string + + // rqlite SQL connection and HTTP ORM gateway + sqlDB *sql.DB + ormClient rqlite.Client + ormHTTP *rqlite.HTTPGateway } // New creates and initializes a new Gateway instance @@ -75,28 +87,24 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.ComponentWarn(logging.ComponentGeneral, "failed to generate RSA key; jwks will be empty", zap.Error(err)) } - logger.ComponentInfo(logging.ComponentGeneral, "Starting database migrations goroutine...") - // Non-blocking DB migrations: probe RQLite; if reachable, apply migrations asynchronously - go func() { - if gw.probeRQLiteReachable(3 * time.Second) { - internalCtx := gw.withInternalAuth(context.Background()) - if err := gw.applyMigrations(internalCtx); err != nil { - if err == errNoMigrationsFound { - if err2 := gw.applyAutoMigrations(internalCtx); err2 != nil { - logger.ComponentWarn(logging.ComponentDatabase, "auto migrations failed", zap.Error(err2)) - } else { - logger.ComponentInfo(logging.ComponentDatabase, "auto migrations applied") - } - } else { - logger.ComponentWarn(logging.ComponentDatabase, "migrations failed", zap.Error(err)) - } - } else { - logger.ComponentInfo(logging.ComponentDatabase, "migrations applied") - } - } else { - logger.ComponentWarn(logging.ComponentDatabase, "RQLite not reachable; skipping migrations for now") - } - }() + logger.ComponentInfo(logging.ComponentGeneral, "Initializing RQLite ORM HTTP gateway...") + dsn := cfg.RQLiteDSN + if dsn == "" { + dsn = "http://localhost:4001" + } + db, dbErr := sql.Open("rqlite", dsn) + if dbErr != nil { + logger.ComponentWarn(logging.ComponentGeneral, "failed to open rqlite sql db; http orm gateway disabled", zap.Error(dbErr)) + } else { + gw.sqlDB = db + orm := rqlite.NewClient(db) + gw.ormClient = orm + gw.ormHTTP = rqlite.NewHTTPGateway(orm, "/v1/db") + logger.ComponentInfo(logging.ComponentGeneral, "RQLite ORM HTTP gateway ready", + zap.String("dsn", dsn), + zap.String("base_path", "/v1/db"), + ) + } logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...") return gw, nil @@ -107,31 +115,6 @@ func (g *Gateway) withInternalAuth(ctx context.Context) context.Context { return client.WithInternalAuth(ctx) } -// probeRQLiteReachable performs a quick GET /status against candidate endpoints with a short timeout. -func (g *Gateway) probeRQLiteReachable(timeout time.Duration) bool { - endpoints := client.DefaultDatabaseEndpoints() - httpClient := &http.Client{Timeout: timeout} - for _, ep := range endpoints { - url := ep - if url == "" { - continue - } - if url[len(url)-1] == '/' { - url = url[:len(url)-1] - } - reqURL := url + "/status" - resp, err := httpClient.Get(reqURL) - if err != nil { - continue - } - resp.Body.Close() - if resp.StatusCode == http.StatusOK { - return true - } - } - return false -} - // Close disconnects the gateway client func (g *Gateway) Close() { if g.client != nil { @@ -139,4 +122,7 @@ func (g *Gateway) Close() { g.logger.ComponentWarn(logging.ComponentClient, "error during client disconnect", zap.Error(err)) } } + if g.sqlDB != nil { + _ = g.sqlDB.Close() + } } diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index d2fb209..8f786a3 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -11,7 +11,6 @@ import ( "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/logging" - "github.com/DeBrosOfficial/network/pkg/storage" "go.uber.org/zap" ) @@ -19,8 +18,9 @@ import ( type contextKey string const ( - ctxKeyAPIKey contextKey = "api_key" - ctxKeyJWT contextKey = "jwt_claims" + ctxKeyAPIKey contextKey = "api_key" + ctxKeyJWT contextKey = "jwt_claims" + ctxKeyNamespaceOverride contextKey = "namespace_override" ) // withMiddleware adds CORS and logging middleware @@ -78,7 +78,7 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { // Attach JWT claims and namespace to context ctx := context.WithValue(r.Context(), ctxKeyJWT, claims) if ns := strings.TrimSpace(claims.Namespace); ns != "" { - ctx = storage.WithNamespace(ctx, ns) + ctx = context.WithValue(ctx, ctxKeyNamespaceOverride, ns) } next.ServeHTTP(w, r.WithContext(ctx)) return @@ -125,7 +125,7 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { // Attach auth metadata to context for downstream use reqCtx := context.WithValue(r.Context(), ctxKeyAPIKey, key) - reqCtx = storage.WithNamespace(reqCtx, ns) + reqCtx = context.WithValue(reqCtx, ctxKeyNamespaceOverride, ns) next.ServeHTTP(w, r.WithContext(reqCtx)) }) } @@ -190,7 +190,7 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler { // Determine namespace from context ctx := r.Context() ns := "" - if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil { + if v := ctx.Value(ctxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok { ns = strings.TrimSpace(s) } @@ -265,16 +265,13 @@ func (g *Gateway) authorizationMiddleware(next http.Handler) http.Handler { // requiresNamespaceOwnership returns true if the path should be guarded by // namespace ownership checks. func requiresNamespaceOwnership(p string) bool { - if p == "/storage" || p == "/v1/storage" || strings.HasPrefix(p, "/v1/storage/") { - return true - } - if p == "/v1/apps" || strings.HasPrefix(p, "/v1/apps/") { + if p == "/rqlite" || p == "/v1/rqlite" || strings.HasPrefix(p, "/v1/rqlite/") { return true } if strings.HasPrefix(p, "/v1/pubsub") { return true } - if strings.HasPrefix(p, "/v1/db/") { + if strings.HasPrefix(p, "/v1/rqlite/") { return true } return false diff --git a/pkg/gateway/migrate.go b/pkg/gateway/migrate.go deleted file mode 100644 index a419210..0000000 --- a/pkg/gateway/migrate.go +++ /dev/null @@ -1,188 +0,0 @@ -package gateway - -import ( - "context" - "errors" - "os" - "path/filepath" - "sort" - "strconv" - "strings" - - "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/logging" - "go.uber.org/zap" -) - -var errNoMigrationsFound = errors.New("no migrations found") - -func (g *Gateway) applyAutoMigrations(ctx context.Context) error { - if g.client == nil { - return nil - } - db := g.client.Database() - - // Use internal context to bypass authentication for system migrations - internalCtx := client.WithInternalAuth(ctx) - - stmts := []string{ - // namespaces - "CREATE TABLE IF NOT EXISTS namespaces (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t name TEXT NOT NULL UNIQUE,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP\n)", - // api_keys - "CREATE TABLE IF NOT EXISTS api_keys (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t key TEXT NOT NULL UNIQUE,\n\t name TEXT,\n\t namespace_id INTEGER NOT NULL,\n\t scopes TEXT,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t last_used_at TIMESTAMP,\n\t FOREIGN KEY(namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE\n)", - "CREATE INDEX IF NOT EXISTS idx_api_keys_namespace ON api_keys(namespace_id)", - // request_logs - "CREATE TABLE IF NOT EXISTS request_logs (\n\t id INTEGER PRIMARY KEY AUTOINCREMENT,\n\t method TEXT NOT NULL,\n\t path TEXT NOT NULL,\n\t status_code INTEGER NOT NULL,\n\t bytes_out INTEGER NOT NULL DEFAULT 0,\n\t duration_ms INTEGER NOT NULL DEFAULT 0,\n\t ip TEXT,\n\t api_key_id INTEGER,\n\t created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n\t FOREIGN KEY(api_key_id) REFERENCES api_keys(id) ON DELETE SET NULL\n)", - "CREATE INDEX IF NOT EXISTS idx_request_logs_api_key ON request_logs(api_key_id)", - "CREATE INDEX IF NOT EXISTS idx_request_logs_created_at ON request_logs(created_at)", - // seed default namespace - "INSERT OR IGNORE INTO namespaces(name) VALUES ('default')", - } - - for _, stmt := range stmts { - if _, err := db.Query(internalCtx, stmt); err != nil { - return err - } - } - return nil -} - -func (g *Gateway) applyMigrations(ctx context.Context) error { - if g.client == nil { - return nil - } - db := g.client.Database() - - // Use internal context to bypass authentication for system migrations - internalCtx := client.WithInternalAuth(ctx) - - // Ensure schema_migrations exists first - if _, err := db.Query(internalCtx, "CREATE TABLE IF NOT EXISTS schema_migrations (\n\tversion INTEGER PRIMARY KEY,\n\tapplied_at TIMESTAMP NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ','now'))\n)"); err != nil { - return err - } - - // Locate migrations directory relative to CWD - migDir := "migrations" - if fi, err := os.Stat(migDir); err != nil || !fi.IsDir() { - return errNoMigrationsFound - } - - entries, err := os.ReadDir(migDir) - if err != nil { - return err - } - type mig struct { - ver int - path string - } - migrations := make([]mig, 0) - for _, e := range entries { - if e.IsDir() { - continue - } - name := e.Name() - if !strings.HasSuffix(strings.ToLower(name), ".sql") { - continue - } - if ver, ok := parseMigrationVersion(name); ok { - migrations = append(migrations, mig{ver: ver, path: filepath.Join(migDir, name)}) - } - } - if len(migrations) == 0 { - return errNoMigrationsFound - } - sort.Slice(migrations, func(i, j int) bool { return migrations[i].ver < migrations[j].ver }) - - // Helper to check if version applied - isApplied := func(ctx context.Context, v int) (bool, error) { - res, err := db.Query(ctx, "SELECT 1 FROM schema_migrations WHERE version = ? LIMIT 1", v) - if err != nil { - return false, err - } - return res != nil && res.Count > 0, nil - } - - for _, m := range migrations { - applied, err := isApplied(internalCtx, m.ver) - if err != nil { - return err - } - if applied { - continue - } - // Read and split SQL file into statements - content, err := os.ReadFile(m.path) - if err != nil { - return err - } - stmts := splitSQLStatements(string(content)) - for _, s := range stmts { - if s == "" { - continue - } - if _, err := db.Query(internalCtx, s); err != nil { - return err - } - } - // Mark as applied - if _, err := db.Query(internalCtx, "INSERT INTO schema_migrations (version) VALUES (?)", m.ver); err != nil { - return err - } - g.logger.ComponentInfo(logging.ComponentDatabase, "applied migration", zap.Int("version", m.ver), zap.String("file", m.path)) - } - return nil -} - -func parseMigrationVersion(name string) (int, bool) { - i := 0 - for i < len(name) && name[i] >= '0' && name[i] <= '9' { - i++ - } - if i == 0 { - return 0, false - } - v, err := strconv.Atoi(name[:i]) - if err != nil { - return 0, false - } - return v, true -} - -func splitSQLStatements(sqlText string) []string { - lines := strings.Split(sqlText, "\n") - cleaned := make([]string, 0, len(lines)) - for _, ln := range lines { - s := strings.TrimSpace(ln) - if s == "" { - continue - } - // Handle inline comments by removing everything after -- - if commentIdx := strings.Index(s, "--"); commentIdx >= 0 { - s = strings.TrimSpace(s[:commentIdx]) - if s == "" { - continue // line was only a comment - } - } - upper := strings.ToUpper(s) - if upper == "BEGIN;" || upper == "COMMIT;" || upper == "BEGIN" || upper == "COMMIT" { - continue - } - if strings.HasPrefix(upper, "INSERT") && strings.Contains(upper, "SCHEMA_MIGRATIONS") { - // ignore in-file migration markers - continue - } - cleaned = append(cleaned, s) - } - // Join and split by ';' - joined := strings.Join(cleaned, "\n") - parts := strings.Split(joined, ";") - out := make([]string, 0, len(parts)) - for _, p := range parts { - sp := strings.TrimSpace(p) - if sp == "" { - continue - } - out = append(out, sp+";") - } - return out -} diff --git a/pkg/gateway/migrate_test.go b/pkg/gateway/migrate_test.go deleted file mode 100644 index d78fa7e..0000000 --- a/pkg/gateway/migrate_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package gateway - -import "testing" - -func TestParseMigrationVersion(t *testing.T) { - cases := map[string]struct{ - name string - ok bool - }{ - "001_init.sql": {"001_init.sql", true}, - "10foobar.SQL": {"10foobar.SQL", true}, - "abc.sql": {"abc.sql", false}, - "": {"", false}, - "123_no_ext": {"123_no_ext", true}, - } - for _, c := range cases { - _, ok := parseMigrationVersion(c.name) - if ok != c.ok { - t.Fatalf("for %q expected %v got %v", c.name, c.ok, ok) - } - } -} - -func TestSplitSQLStatements(t *testing.T) { - in := `-- comment -BEGIN; -CREATE TABLE t (id INTEGER); --- another -INSERT INTO t VALUES (1); -- inline comment -COMMIT; -` - out := splitSQLStatements(in) - if len(out) != 2 { - t.Fatalf("expected 2 statements, got %d: %#v", len(out), out) - } - if out[0] != "CREATE TABLE t (id INTEGER);" { - t.Fatalf("unexpected first: %q", out[0]) - } - if out[1] != "INSERT INTO t VALUES (1);" { - t.Fatalf("unexpected second: %q", out[1]) - } -} diff --git a/pkg/gateway/db_helpers.go b/pkg/gateway/namespace_helpers.go similarity index 50% rename from pkg/gateway/db_helpers.go rename to pkg/gateway/namespace_helpers.go index 99334a0..2b3e9e5 100644 --- a/pkg/gateway/db_helpers.go +++ b/pkg/gateway/namespace_helpers.go @@ -2,22 +2,35 @@ package gateway import ( "context" + "errors" + "strings" "github.com/DeBrosOfficial/network/pkg/client" ) +// resolveNamespaceID ensures the given namespace exists and returns its primary key ID. +// Falls back to "default" when ns is empty. Uses internal auth context for system operations. func (g *Gateway) resolveNamespaceID(ctx context.Context, ns string) (interface{}, error) { - // Use internal context to bypass authentication for system operations + if g == nil || g.client == nil { + return nil, errors.New("client not initialized") + } + ns = strings.TrimSpace(ns) + if ns == "" { + ns = "default" + } + internalCtx := client.WithInternalAuth(ctx) db := g.client.Database() + if _, err := db.Query(internalCtx, "INSERT OR IGNORE INTO namespaces(name) VALUES (?)", ns); err != nil { return nil, err } res, err := db.Query(internalCtx, "SELECT id FROM namespaces WHERE name = ? LIMIT 1", ns) - if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + if err != nil { return nil, err } + if res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + return nil, errors.New("failed to resolve namespace") + } return res.Rows[0][0], nil } - -// Deprecated: seeding API keys from config is removed. diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index 84ecaf3..971a09c 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -9,7 +9,7 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/storage" + "github.com/gorilla/websocket" ) @@ -190,7 +190,7 @@ func (g *Gateway) pubsubTopicsHandler(w http.ResponseWriter, r *http.Request) { // resolveNamespaceFromRequest gets namespace from context set by auth middleware func resolveNamespaceFromRequest(r *http.Request) string { - if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil { + if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok { return s } diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index cd24712..46154f1 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -27,16 +27,11 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/auth/logout", g.logoutHandler) mux.HandleFunc("/v1/auth/whoami", g.whoamiHandler) - // apps CRUD - mux.HandleFunc("/v1/apps", g.appsHandler) - mux.HandleFunc("/v1/apps/", g.appsHandler) - - // database - mux.HandleFunc("/v1/db/query", g.dbQueryHandler) - mux.HandleFunc("/v1/db/transaction", g.dbTransactionHandler) - mux.HandleFunc("/v1/db/schema", g.dbSchemaHandler) - mux.HandleFunc("/v1/db/create-table", g.dbCreateTableHandler) - mux.HandleFunc("/v1/db/drop-table", g.dbDropTableHandler) + // rqlite ORM HTTP gateway (mounts /v1/db/* endpoints) + if g.ormHTTP != nil { + g.ormHTTP.BasePath = "/v1/rqlite" + g.ormHTTP.RegisterRoutes(mux) + } // network mux.HandleFunc("/v1/network/status", g.networkStatusHandler) diff --git a/pkg/gateway/storage_handlers.go b/pkg/gateway/storage_handlers.go index f3784fb..01d0ef0 100644 --- a/pkg/gateway/storage_handlers.go +++ b/pkg/gateway/storage_handlers.go @@ -3,127 +3,9 @@ package gateway import ( "encoding/json" "net/http" - - "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/pubsub" ) // Database HTTP handlers -func (g *Gateway) dbQueryHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - var body struct { - SQL string `json:"sql"` - Args []any `json:"args"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.SQL == "" { - writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}") - return - } - ctx := client.WithInternalAuth(r.Context()) - res, err := g.client.Database().Query(ctx, body.SQL, body.Args...) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, res) -} - -func (g *Gateway) dbTransactionHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - var body struct { - Statements []string `json:"statements"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Statements) == 0 { - writeError(w, http.StatusBadRequest, "invalid body: {statements:[...]}") - return - } - ctx := client.WithInternalAuth(r.Context()) - if err := g.client.Database().Transaction(ctx, body.Statements); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) -} - -func (g *Gateway) dbSchemaHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - ctx := client.WithInternalAuth(r.Context()) - schema, err := g.client.Database().GetSchema(ctx) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, schema) -} - -func (g *Gateway) dbCreateTableHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - var body struct { - Schema string `json:"schema"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Schema == "" { - writeError(w, http.StatusBadRequest, "invalid body: {schema}") - return - } - ctx := client.WithInternalAuth(r.Context()) - if err := g.client.Database().CreateTable(ctx, body.Schema); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusCreated, map[string]any{"status": "ok"}) -} - -func (g *Gateway) dbDropTableHandler(w http.ResponseWriter, r *http.Request) { - if g.client == nil { - writeError(w, http.StatusServiceUnavailable, "client not initialized") - return - } - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - var body struct { - Table string `json:"table"` - } - if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Table == "" { - writeError(w, http.StatusBadRequest, "invalid body: {table}") - return - } - ctx := client.WithInternalAuth(r.Context()) - if err := g.client.Database().DropTable(ctx, body.Table); err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) -} func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) { if g.client == nil { @@ -204,7 +86,7 @@ func (g *Gateway) validateNamespaceParam(r *http.Request) bool { if qns == "" { return true } - if v := r.Context().Value(pubsub.CtxKeyNamespaceOverride); v != nil { + if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { if s, ok := v.(string); ok && s != "" { return s == qns } diff --git a/pkg/rqlite/gateway.go b/pkg/rqlite/gateway.go index e69de29..369b2e1 100644 --- a/pkg/rqlite/gateway.go +++ b/pkg/rqlite/gateway.go @@ -0,0 +1,615 @@ +package rqlite + +// HTTP gateway for the rqlite ORM client. +// +// This file exposes a minimal, SDK-friendly HTTP interface over the ORM-like +// client defined in client.go. It maps high-level operations (Query, Exec, +// FindBy, FindOneBy, QueryBuilder-based SELECTs, Transactions) and a few schema +// helpers into JSON-over-HTTP endpoints that can be called from any language. +// +// Endpoints (under BasePath, default: /v1/db): +// - POST {base}/query -> arbitrary SELECT; returns rows as []map[string]any +// - POST {base}/exec -> write statement (INSERT/UPDATE/DELETE/DDL); returns {rows_affected,last_insert_id} +// - POST {base}/find -> FindBy(table, criteria, opts...) -> returns []map +// - POST {base}/find-one -> FindOneBy(table, criteria, opts...) -> returns map +// - POST {base}/select -> Fluent SELECT builder via JSON (joins, where, order, group, limit, offset); returns []map or one map if one=true +// - POST {base}/transaction -> Execute a sequence of exec/query ops atomically; optionally return results +// +// Schema helpers (convenience; powered via Exec/Query): +// - GET {base}/schema -> list of user tables/views and create SQL +// - POST {base}/create-table -> {schema: "CREATE TABLE ..."} -> status ok +// - POST {base}/drop-table -> {table: "name"} -> status ok (safe-validated identifier) +// +// Notes: +// - All numbers in JSON are decoded as float64 by default; we best-effort coerce +// integral values to int64 for SQL placeholders. +// - The Save/Remove reflection helpers in the ORM require concrete Go structs; +// exposing them generically over HTTP is not portable. Prefer using the Exec +// and Find APIs, or the Select builder for CRUD-like flows. + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "net/http" + "regexp" + "strings" + "time" +) + +// HTTPGateway exposes the ORM Client as a set of HTTP handlers. +type HTTPGateway struct { + // Client is the ORM-like rqlite client to execute operations against. + Client Client + // BasePath is the prefix for all routes, e.g. "/v1/db". + // If empty, defaults to "/v1/db". A trailing slash is trimmed. + BasePath string + + // Optional: Request timeout. If > 0, handlers will use a context with this timeout. + Timeout time.Duration +} + +// NewHTTPGateway constructs a new HTTPGateway with sensible defaults. +func NewHTTPGateway(c Client, base string) *HTTPGateway { + return &HTTPGateway{ + Client: c, + BasePath: base, + } +} + +// RegisterRoutes registers all handlers onto the provided mux under BasePath. +func (g *HTTPGateway) RegisterRoutes(mux *http.ServeMux) { + base := g.base() + mux.HandleFunc(base+"/query", g.handleQuery) + mux.HandleFunc(base+"/exec", g.handleExec) + mux.HandleFunc(base+"/find", g.handleFind) + mux.HandleFunc(base+"/find-one", g.handleFindOne) + mux.HandleFunc(base+"/select", g.handleSelect) + // Keep "transaction" for compatibility with existing routes. + mux.HandleFunc(base+"/transaction", g.handleTransaction) + + // Schema helpers + mux.HandleFunc(base+"/schema", g.handleSchema) + mux.HandleFunc(base+"/create-table", g.handleCreateTable) + mux.HandleFunc(base+"/drop-table", g.handleDropTable) +} + +func (g *HTTPGateway) base() string { + b := strings.TrimSpace(g.BasePath) + if b == "" { + b = "/v1/db" + } + if b != "/" { + b = strings.TrimRight(b, "/") + } + return b +} + +func (g *HTTPGateway) withTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + if g.Timeout > 0 { + return context.WithTimeout(ctx, g.Timeout) + } + return context.WithCancel(ctx) +} + +// -------------------- +// Common HTTP helpers +// -------------------- + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +func writeError(w http.ResponseWriter, code int, msg string) { + writeJSON(w, code, map[string]any{"error": msg}) +} + +func onlyMethod(w http.ResponseWriter, r *http.Request, method string) bool { + if r.Method != method { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return false + } + return true +} + +// Normalize JSON-decoded args for SQL placeholders. +// - Convert float64 with integral value to int64 to better match SQLite expectations. +// - Leave strings, bools and nulls as-is. +// - Recursively normalizes nested arrays if present. +func normalizeArgs(args []any) []any { + out := make([]any, len(args)) + for i, a := range args { + switch v := a.(type) { + case float64: + // If v is integral (within epsilon), convert to int64 + if v == float64(int64(v)) { + out[i] = int64(v) + } else { + out[i] = v + } + case []any: + out[i] = normalizeArgs(v) + default: + out[i] = a + } + } + return out +} + +// -------------------- +// Request DTOs +// -------------------- + +type queryRequest struct { + SQL string `json:"sql"` + Args []any `json:"args"` +} + +type execRequest struct { + SQL string `json:"sql"` + Args []any `json:"args"` +} + +type findOptions struct { + Select []string `json:"select"` + OrderBy []string `json:"order_by"` + GroupBy []string `json:"group_by"` + Limit *int `json:"limit"` + Offset *int `json:"offset"` + Joins []joinBody `json:"joins"` +} + +type findRequest struct { + Table string `json:"table"` + Criteria map[string]any `json:"criteria"` + Options findOptions `json:"options"` + // Back-compat: allow options at top-level too + Select []string `json:"select"` + OrderBy []string `json:"order_by"` + GroupBy []string `json:"group_by"` + Limit *int `json:"limit"` + Offset *int `json:"offset"` + Joins []joinBody `json:"joins"` +} + +type findOneRequest = findRequest + +type joinBody struct { + Kind string `json:"kind"` // "INNER" | "LEFT" | "JOIN" + Table string `json:"table"` // table name + On string `json:"on"` // join condition +} + +type whereBody struct { + Conj string `json:"conj"` // "AND" | "OR" (default AND) + Expr string `json:"expr"` // e.g., "a = ? AND b > ?" + Args []any `json:"args"` +} + +type selectRequest struct { + Table string `json:"table"` + Alias string `json:"alias"` + Select []string `json:"select"` + Joins []joinBody `json:"joins"` + Where []whereBody `json:"where"` + GroupBy []string `json:"group_by"` + OrderBy []string `json:"order_by"` + Limit *int `json:"limit"` + Offset *int `json:"offset"` + One bool `json:"one"` // if true, returns a single row (object) +} + +type txOp struct { + Kind string `json:"kind"` // "exec" | "query" + SQL string `json:"sql"` + Args []any `json:"args"` +} + +type transactionRequest struct { + Ops []txOp `json:"ops"` + ReturnResults bool `json:"return_results"` // if true, returns per-op results + StopOnError bool `json:"stop_on_error"` // default true in tx + PartialResults bool `json:"partial_results"` // ignored for actual TX (atomic); kept for API symmetry +} + +// -------------------- +// Handlers +// -------------------- + +func (g *HTTPGateway) handleQuery(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body queryRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.SQL) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}") + return + } + args := normalizeArgs(body.Args) + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + out := make([]map[string]any, 0, 16) + if err := g.Client.Query(ctx, &out, body.SQL, args...); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "items": out, + "count": len(out), + }) +} + +func (g *HTTPGateway) handleExec(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body execRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.SQL) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {sql, args?}") + return + } + args := normalizeArgs(body.Args) + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + res, err := g.Client.Exec(ctx, body.SQL, args...) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + liid, _ := res.LastInsertId() + ra, _ := res.RowsAffected() + writeJSON(w, http.StatusOK, map[string]any{ + "rows_affected": ra, + "last_insert_id": liid, + "execution_state": "ok", + }) +} + +func (g *HTTPGateway) handleFind(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body findRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {table, criteria, options?}") + return + } + opts := makeFindOptions(mergeFindOptions(body)) + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + out := make([]map[string]any, 0, 32) + if err := g.Client.FindBy(ctx, &out, body.Table, body.Criteria, opts...); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "items": out, + "count": len(out), + }) +} + +func (g *HTTPGateway) handleFindOne(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body findOneRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {table, criteria, options?}") + return + } + opts := makeFindOptions(mergeFindOptions(body)) + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + row := make(map[string]any) + if err := g.Client.FindOneBy(ctx, &row, body.Table, body.Criteria, opts...); err != nil { + if errors.Is(err, sql.ErrNoRows) { + writeError(w, http.StatusNotFound, "not found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, row) +} + +func (g *HTTPGateway) handleSelect(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body selectRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {table, select?, where?, joins?, order_by?, group_by?, limit?, offset?, one?}") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + qb := g.Client.CreateQueryBuilder(body.Table) + if alias := strings.TrimSpace(body.Alias); alias != "" { + qb = qb.Alias(alias) + } + if len(body.Select) > 0 { + qb = qb.Select(body.Select...) + } + // joins + for _, j := range body.Joins { + switch strings.ToUpper(strings.TrimSpace(j.Kind)) { + case "INNER": + qb = qb.InnerJoin(j.Table, j.On) + case "LEFT": + qb = qb.LeftJoin(j.Table, j.On) + default: + qb = qb.Join(j.Table, j.On) + } + } + // where + for _, wcl := range body.Where { + switch strings.ToUpper(strings.TrimSpace(wcl.Conj)) { + case "OR": + qb = qb.OrWhere(wcl.Expr, normalizeArgs(wcl.Args)...) + default: + qb = qb.AndWhere(wcl.Expr, normalizeArgs(wcl.Args)...) + } + } + // group/order/limit/offset + if len(body.GroupBy) > 0 { + qb = qb.GroupBy(body.GroupBy...) + } + if len(body.OrderBy) > 0 { + qb = qb.OrderBy(body.OrderBy...) + } + if body.Limit != nil { + qb = qb.Limit(*body.Limit) + } + if body.Offset != nil { + qb = qb.Offset(*body.Offset) + } + + if body.One { + row := make(map[string]any) + if err := qb.GetOne(ctx, &row); err != nil { + if errors.Is(err, sql.ErrNoRows) { + writeError(w, http.StatusNotFound, "not found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, row) + return + } + + rows := make([]map[string]any, 0, 32) + if err := qb.GetMany(ctx, &rows); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "items": rows, + "count": len(rows), + }) +} + +func (g *HTTPGateway) handleTransaction(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body transactionRequest + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || len(body.Ops) == 0 { + writeError(w, http.StatusBadRequest, "invalid body: {ops:[{kind,sql,args?}], return_results?}") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + results := make([]any, 0, len(body.Ops)) + err := g.Client.Tx(ctx, func(tx Tx) error { + for _, op := range body.Ops { + switch strings.ToLower(strings.TrimSpace(op.Kind)) { + case "exec": + res, err := tx.Exec(ctx, op.SQL, normalizeArgs(op.Args)...) + if err != nil { + return err + } + if body.ReturnResults { + li, _ := res.LastInsertId() + ra, _ := res.RowsAffected() + results = append(results, map[string]any{ + "rows_affected": ra, + "last_insert_id": li, + }) + } + case "query": + var rows []map[string]any + if err := tx.Query(ctx, &rows, op.SQL, normalizeArgs(op.Args)...); err != nil { + return err + } + if body.ReturnResults { + results = append(results, rows) + } + default: + return fmt.Errorf("invalid op kind: %s", op.Kind) + } + } + return nil + }) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + if body.ReturnResults { + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + "results": results, + }) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) +} + +// -------------------- +// Schema helpers +// -------------------- + +func (g *HTTPGateway) handleSchema(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodGet) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + sqlText := `SELECT name, type, sql FROM sqlite_master WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' ORDER BY name` + var rows []map[string]any + if err := g.Client.Query(ctx, &rows, sqlText); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "objects": rows, + "count": len(rows), + }) +} + +func (g *HTTPGateway) handleCreateTable(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body struct { + Schema string `json:"schema"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Schema) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {schema}") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + if _, err := g.Client.Exec(ctx, body.Schema); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusCreated, map[string]any{"status": "ok"}) +} + +var identRe = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`) + +func (g *HTTPGateway) handleDropTable(w http.ResponseWriter, r *http.Request) { + if !onlyMethod(w, r, http.MethodPost) { + return + } + if g.Client == nil { + writeError(w, http.StatusServiceUnavailable, "client not initialized") + return + } + var body struct { + Table string `json:"table"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil || strings.TrimSpace(body.Table) == "" { + writeError(w, http.StatusBadRequest, "invalid body: {table}") + return + } + tbl := strings.TrimSpace(body.Table) + if !identRe.MatchString(tbl) { + writeError(w, http.StatusBadRequest, "invalid table identifier") + return + } + ctx, cancel := g.withTimeout(r.Context()) + defer cancel() + + stmt := "DROP TABLE IF EXISTS " + tbl + if _, err := g.Client.Exec(ctx, stmt); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) +} + +// -------------------- +// Helpers +// -------------------- + +func mergeFindOptions(fr findRequest) findOptions { + // Prefer nested Options; fallback to top-level legacy fields + if (len(fr.Options.Select)+len(fr.Options.OrderBy)+len(fr.Options.GroupBy)) > 0 || + fr.Options.Limit != nil || fr.Options.Offset != nil || len(fr.Options.Joins) > 0 { + return fr.Options + } + return findOptions{ + Select: fr.Select, + OrderBy: fr.OrderBy, + GroupBy: fr.GroupBy, + Limit: fr.Limit, + Offset: fr.Offset, + Joins: fr.Joins, + } +} + +func makeFindOptions(o findOptions) []FindOption { + opts := make([]FindOption, 0, 6) + if len(o.OrderBy) > 0 { + opts = append(opts, WithOrderBy(o.OrderBy...)) + } + if len(o.GroupBy) > 0 { + opts = append(opts, WithGroupBy(o.GroupBy...)) + } + if o.Limit != nil { + opts = append(opts, WithLimit(*o.Limit)) + } + if o.Offset != nil { + opts = append(opts, WithOffset(*o.Offset)) + } + if len(o.Select) > 0 { + opts = append(opts, WithSelect(o.Select...)) + } + for _, j := range o.Joins { + opts = append(opts, WithJoin(justOrDefault(strings.ToUpper(j.Kind), "JOIN"), j.Table, j.On)) + } + return opts +} + +func justOrDefault(s, def string) string { + if strings.TrimSpace(s) == "" { + return def + } + return s +} diff --git a/pkg/rqlite/migrations.go b/pkg/rqlite/migrations.go index 1c43ed9..60efc9b 100644 --- a/pkg/rqlite/migrations.go +++ b/pkg/rqlite/migrations.go @@ -254,47 +254,26 @@ func isNoSuchTable(err error) bool { return strings.Contains(msg, "no such table") || strings.Contains(msg, "does not exist") } -// applySQL tries to run the entire script in one Exec. -// If the driver rejects multi-statement Exec, it falls back to splitting statements and executing sequentially. +// applySQL splits the script into individual statements, strips explicit +// transaction control (BEGIN/COMMIT/ROLLBACK/END), and executes statements +// sequentially to avoid nested transaction issues with rqlite. func applySQL(ctx context.Context, db *sql.DB, script string) error { s := strings.TrimSpace(script) if s == "" { return nil } - if _, err := db.ExecContext(ctx, s); err == nil { - return nil - } else { - // Fall back to splitting into statements and executing sequentially (respecting BEGIN/COMMIT if present). - stmts := splitSQLStatements(s) - // If the script already contains explicit BEGIN/COMMIT, we just run as-is. - // Otherwise, we attempt to wrap in a transaction; if BeginTx fails, execute one-by-one. - hasExplicitTxn := containsToken(stmts, "BEGIN") || containsToken(stmts, "BEGIN;") - if !hasExplicitTxn { - if tx, txErr := db.BeginTx(ctx, nil); txErr == nil { - for _, stmt := range stmts { - if stmt == "" { - continue - } - if _, execErr := tx.ExecContext(ctx, stmt); execErr != nil { - _ = tx.Rollback() - return fmt.Errorf("exec stmt failed: %w (stmt: %s)", execErr, snippet(stmt)) - } - } - return tx.Commit() - } - // Fall through to plain sequential exec if BeginTx not supported. - } + stmts := splitSQLStatements(s) + stmts = filterOutTxnControls(stmts) - for _, stmt := range stmts { - if stmt == "" { - continue - } - if _, execErr := db.ExecContext(ctx, stmt); execErr != nil { - return fmt.Errorf("exec stmt failed: %w (stmt: %s)", execErr, snippet(stmt)) - } + for _, stmt := range stmts { + if strings.TrimSpace(stmt) == "" { + continue + } + if _, err := db.ExecContext(ctx, stmt); err != nil { + return fmt.Errorf("exec stmt failed: %w (stmt: %s)", err, snippet(stmt)) } - return nil } + return nil } func containsToken(stmts []string, token string) bool { @@ -306,6 +285,33 @@ func containsToken(stmts []string, token string) bool { return false } +// removed duplicate helper + +// removed duplicate helper + +// isTxnControl returns true if the statement is a transaction control command. +func isTxnControl(s string) bool { + t := strings.ToUpper(strings.TrimSpace(s)) + switch t { + case "BEGIN", "BEGIN TRANSACTION", "COMMIT", "END", "ROLLBACK": + return true + default: + return false + } +} + +// filterOutTxnControls removes BEGIN/COMMIT/ROLLBACK/END statements. +func filterOutTxnControls(stmts []string) []string { + out := make([]string, 0, len(stmts)) + for _, s := range stmts { + if isTxnControl(s) { + continue + } + out = append(out, s) + } + return out +} + func snippet(s string) string { s = strings.TrimSpace(s) if len(s) > 120 { diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 740e887..eca678d 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -175,7 +175,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } // After waitForLeadership / waitForSQLAvailable succeeds, before returning: - migrationsDir := "network/migrations" + migrationsDir := "migrations" if err := r.ApplyMigrations(ctx, migrationsDir); err != nil { r.logger.Error("Migrations failed", zap.Error(err), zap.String("dir", migrationsDir)) @@ -325,9 +325,6 @@ func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress strin } } - if lastErr == nil { - lastErr = fmt.Errorf("join target not reachable within %s", timeout) - } return lastErr }