diff --git a/core/migrations/contract.go b/core/migrations/contract.go new file mode 100644 index 0000000..52e410f --- /dev/null +++ b/core/migrations/contract.go @@ -0,0 +1,194 @@ +// Package migrations holds the embedded SQL migrations for the gateway's +// RQLite registry. This file defines the schema-version contract every +// gateway binary must enforce at startup. +// +// The contract: +// +// 1. The binary embeds every migration file in this directory. +// 2. RequiredVersion() returns the highest numbered migration in the embed. +// This is the schema version the binary REQUIRES to function correctly. +// 3. AssertSchema(ctx, db) queries the schema_migrations table and returns +// a typed *SchemaMismatchError if the applied version is below +// RequiredVersion. Gateway startup MUST treat this as fatal. +// +// Why: a rolling upgrade can swap the gateway binary without restarting the +// underlying RQLite process. If a new binary expects columns added by a +// migration the RQLite-process startup never re-ran, INSERTs fail with +// cryptic errors at runtime. Asserting the contract at startup catches the +// mismatch immediately with an actionable error message. +// +// See plan: this file is the long-term fix for the AnChat-test "missing +// ws_max_frame_bytes column" incident (2026-05-06). +package migrations + +import ( + "context" + "database/sql" + "fmt" + "io/fs" + "sort" + "strconv" + "strings" +) + +// MigrationInfo describes one embedded migration. +type MigrationInfo struct { + Version int + Name string + Path string +} + +// allMigrations returns every embedded migration sorted by version ascending. +// Computed once at startup; cheap to call repeatedly. +var allMigrations = mustListMigrations() + +func mustListMigrations() []MigrationInfo { + entries, err := fs.ReadDir(FS, ".") + if err != nil { + // In practice this can't happen — the embed.FS is built from a + // known directory. If it does, we can't safely run anything. + panic(fmt.Sprintf("migrations: failed to list embedded files: %v", err)) + } + + var out []MigrationInfo + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if !strings.HasSuffix(name, ".sql") { + continue + } + v, ok := parseVersion(name) + if !ok { + continue + } + out = append(out, MigrationInfo{ + Version: v, + Name: strings.TrimSuffix(name, ".sql"), + Path: name, + }) + } + sort.Slice(out, func(i, j int) bool { return out[i].Version < out[j].Version }) + return out +} + +// parseVersion extracts the integer prefix from "001_initial.sql" → 1. +// Returns ok=false for files without a leading numeric prefix. +func parseVersion(filename string) (int, bool) { + idx := strings.IndexByte(filename, '_') + if idx <= 0 { + return 0, false + } + v, err := strconv.Atoi(filename[:idx]) + if err != nil { + return 0, false + } + return v, true +} + +// All returns a snapshot of every embedded migration, sorted by version. +// The returned slice is a copy; safe to mutate. +func All() []MigrationInfo { + out := make([]MigrationInfo, len(allMigrations)) + copy(out, allMigrations) + return out +} + +// RequiredVersion returns the highest migration version embedded in this +// binary. Panics if no migrations are embedded (impossible in practice). +// +// This is the schema version the binary requires. The gateway asserts at +// startup that the database's applied schema is >= this value. +func RequiredVersion() int { + if len(allMigrations) == 0 { + panic("migrations: no embedded migrations found") + } + return allMigrations[len(allMigrations)-1].Version +} + +// SchemaMismatchError is returned when the database's applied schema is +// behind what the binary requires. Gateway startup MUST treat this as fatal +// and log the actionable hint. +type SchemaMismatchError struct { + RequiredVersion int + AppliedVersion int + Pending []MigrationInfo // migrations the binary has but the DB lacks +} + +func (e *SchemaMismatchError) Error() string { + pending := make([]string, 0, len(e.Pending)) + for _, m := range e.Pending { + pending = append(pending, fmt.Sprintf("%03d (%s)", m.Version, m.Name)) + } + return fmt.Sprintf( + "schema mismatch: binary requires version %d, database has %d. "+ + "Pending migrations: [%s]. "+ + "Run `orama node migrate-apply` on the namespace's RQLite to fix.", + e.RequiredVersion, e.AppliedVersion, strings.Join(pending, ", "), + ) +} + +// AppliedVersion queries the schema_migrations table and returns the highest +// version recorded as applied. Returns 0 (with nil error) if the table is +// empty — that's a fresh database, valid state. +// +// Returns an error if the schema_migrations table itself doesn't exist or +// can't be read; callers must distinguish that from "applied=0". +func AppliedVersion(ctx context.Context, db *sql.DB) (int, error) { + row := db.QueryRowContext(ctx, `SELECT COALESCE(MAX(version), 0) FROM schema_migrations`) + var v int + if err := row.Scan(&v); err != nil { + return 0, fmt.Errorf("migrations: query schema_migrations: %w", err) + } + return v, nil +} + +// AssertSchema verifies the database's applied schema is at least +// RequiredVersion(). Returns nil on match-or-newer, *SchemaMismatchError +// on lag. +// +// Newer-than-required is OK — that means an older binary is talking to a +// database that's been advanced by a newer binary in the cluster. The +// binary just won't use whatever the newer columns enable. (Gateway +// startup should still allow this; it's a normal rolling-upgrade window.) +func AssertSchema(ctx context.Context, db *sql.DB) error { + required := RequiredVersion() + applied, err := AppliedVersion(ctx, db) + if err != nil { + return fmt.Errorf("migrations.AssertSchema: %w", err) + } + if applied >= required { + return nil + } + + // Compute pending migrations for the error message. + pending := make([]MigrationInfo, 0) + for _, m := range allMigrations { + if m.Version > applied { + pending = append(pending, m) + } + } + return &SchemaMismatchError{ + RequiredVersion: required, + AppliedVersion: applied, + Pending: pending, + } +} + +// PendingMigrations returns migrations the binary has but the database +// hasn't applied. Used by the `orama node migrate-status` CLI to show +// the operator what would be applied by a `migrate-apply`. +func PendingMigrations(ctx context.Context, db *sql.DB) ([]MigrationInfo, error) { + applied, err := AppliedVersion(ctx, db) + if err != nil { + return nil, err + } + out := make([]MigrationInfo, 0) + for _, m := range allMigrations { + if m.Version > applied { + out = append(out, m) + } + } + return out, nil +} diff --git a/core/migrations/contract_test.go b/core/migrations/contract_test.go new file mode 100644 index 0000000..dcaa686 --- /dev/null +++ b/core/migrations/contract_test.go @@ -0,0 +1,231 @@ +package migrations + +import ( + "context" + "database/sql" + "errors" + "strings" + "testing" + + _ "github.com/mattn/go-sqlite3" +) + +// openTestDB returns an in-memory SQLite database. The migrations contract +// only cares about ANSI-ish SQL (CREATE TABLE, SELECT MAX, INSERT) — we +// don't need RQLite's distributed semantics for these tests. +func openTestDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("open in-memory sqlite: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + return db +} + +func ensureMigrationsTable(t *testing.T, db *sql.DB) { + t.Helper() + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + )`) + if err != nil { + t.Fatalf("create schema_migrations: %v", err) + } +} + +func TestRequiredVersion_matches_highest_embedded(t *testing.T) { + all := All() + if len(all) == 0 { + t.Fatal("no embedded migrations — embed.FS broken?") + } + want := all[len(all)-1].Version + if got := RequiredVersion(); got != want { + t.Errorf("RequiredVersion() = %d, want %d", got, want) + } +} + +func TestAll_returns_sorted_copy(t *testing.T) { + a := All() + for i := 1; i < len(a); i++ { + if a[i-1].Version >= a[i].Version { + t.Errorf("All() not sorted: %d before %d", a[i-1].Version, a[i].Version) + } + } + // Mutating the returned slice must not affect subsequent calls. + if len(a) > 0 { + a[0].Version = -999 + } + a2 := All() + if len(a2) > 0 && a2[0].Version == -999 { + t.Error("All() returned a shared slice — subsequent calls see mutation") + } +} + +func TestAppliedVersion_empty_returns_zero(t *testing.T) { + db := openTestDB(t) + ensureMigrationsTable(t, db) + + v, err := AppliedVersion(context.Background(), db) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if v != 0 { + t.Errorf("expected 0 for empty schema_migrations, got %d", v) + } +} + +func TestAppliedVersion_returns_max(t *testing.T) { + db := openTestDB(t) + ensureMigrationsTable(t, db) + for _, v := range []int{1, 5, 3, 10, 7} { + _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", v) + if err != nil { + t.Fatalf("insert %d: %v", v, err) + } + } + v, err := AppliedVersion(context.Background(), db) + if err != nil { + t.Fatalf("AppliedVersion: %v", err) + } + if v != 10 { + t.Errorf("expected 10, got %d", v) + } +} + +func TestAppliedVersion_no_table_returns_error(t *testing.T) { + db := openTestDB(t) + // Don't create schema_migrations table. + _, err := AppliedVersion(context.Background(), db) + if err == nil { + t.Fatal("expected error when schema_migrations missing") + } +} + +func TestAssertSchema_ok_when_at_required(t *testing.T) { + db := openTestDB(t) + ensureMigrationsTable(t, db) + _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", RequiredVersion()) + if err != nil { + t.Fatalf("seed: %v", err) + } + if err := AssertSchema(context.Background(), db); err != nil { + t.Errorf("AssertSchema returned error when at required version: %v", err) + } +} + +func TestAssertSchema_ok_when_above_required(t *testing.T) { + db := openTestDB(t) + ensureMigrationsTable(t, db) + _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", RequiredVersion()+10) + if err != nil { + t.Fatalf("seed: %v", err) + } + if err := AssertSchema(context.Background(), db); err != nil { + t.Errorf("AssertSchema returned error when ahead of required: %v", err) + } +} + +func TestAssertSchema_fails_when_below_required(t *testing.T) { + db := openTestDB(t) + ensureMigrationsTable(t, db) + // Seed only the first migration. + _, err := db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", 1) + if err != nil { + t.Fatalf("seed: %v", err) + } + + err = AssertSchema(context.Background(), db) + if err == nil { + t.Fatal("expected SchemaMismatchError, got nil") + } + var smErr *SchemaMismatchError + if !errors.As(err, &smErr) { + t.Fatalf("expected *SchemaMismatchError, got %T: %v", err, err) + } + if smErr.RequiredVersion != RequiredVersion() { + t.Errorf("RequiredVersion mismatch: got %d, want %d", smErr.RequiredVersion, RequiredVersion()) + } + if smErr.AppliedVersion != 1 { + t.Errorf("AppliedVersion mismatch: got %d, want 1", smErr.AppliedVersion) + } + if len(smErr.Pending) == 0 { + t.Error("expected pending migrations list, got empty") + } + + // Error message must contain the actionable hint. + if !strings.Contains(err.Error(), "orama node migrate") { + t.Errorf("error message lacks actionable hint: %v", err) + } +} + +func TestPendingMigrations_empty_when_at_required(t *testing.T) { + db := openTestDB(t) + ensureMigrationsTable(t, db) + _, _ = db.Exec("INSERT INTO schema_migrations (version) VALUES (?)", RequiredVersion()) + + pending, err := PendingMigrations(context.Background(), db) + if err != nil { + t.Fatalf("PendingMigrations: %v", err) + } + if len(pending) != 0 { + t.Errorf("expected 0 pending, got %d", len(pending)) + } +} + +func TestPendingMigrations_lists_all_when_empty_db(t *testing.T) { + db := openTestDB(t) + ensureMigrationsTable(t, db) + pending, err := PendingMigrations(context.Background(), db) + if err != nil { + t.Fatalf("PendingMigrations: %v", err) + } + if len(pending) != len(All()) { + t.Errorf("expected %d pending (all), got %d", len(All()), len(pending)) + } +} + +func TestParseVersion(t *testing.T) { + cases := []struct { + name string + in string + want int + ok bool + }{ + {"valid 3-digit", "001_initial.sql", 1, true}, + {"valid 25", "025_persistent_ws.sql", 25, true}, + {"valid 100", "100_future.sql", 100, true}, + {"no underscore", "999.sql", 0, false}, + {"non-numeric prefix", "abc_initial.sql", 0, false}, + {"empty", "", 0, false}, + {"only underscore", "_x.sql", 0, false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got, ok := parseVersion(c.in) + if ok != c.ok || got != c.want { + t.Errorf("parseVersion(%q) = (%d, %v), want (%d, %v)", + c.in, got, ok, c.want, c.ok) + } + }) + } +} + +func TestSchemaMismatchError_message_lists_pending(t *testing.T) { + e := &SchemaMismatchError{ + RequiredVersion: 25, + AppliedVersion: 22, + Pending: []MigrationInfo{ + {Version: 23, Name: "push_devices"}, + {Version: 24, Name: "namespace_publish_seq"}, + {Version: 25, Name: "persistent_ws"}, + }, + } + msg := e.Error() + for _, want := range []string{"025", "024", "023", "push_devices", "namespace_publish_seq", "persistent_ws", "orama node migrate"} { + if !strings.Contains(msg, want) { + t.Errorf("error message missing %q: %s", want, msg) + } + } +} diff --git a/core/pkg/cli/cmd/node/node.go b/core/pkg/cli/cmd/node/node.go index 19ff3a1..be55603 100644 --- a/core/pkg/cli/cmd/node/node.go +++ b/core/pkg/cli/cmd/node/node.go @@ -34,4 +34,5 @@ func init() { Cmd.AddCommand(unlockCmd) Cmd.AddCommand(migrateConfCmd) Cmd.AddCommand(setupCmd) + Cmd.AddCommand(schemaCmd) } diff --git a/core/pkg/cli/cmd/node/schema.go b/core/pkg/cli/cmd/node/schema.go new file mode 100644 index 0000000..905c103 --- /dev/null +++ b/core/pkg/cli/cmd/node/schema.go @@ -0,0 +1,264 @@ +// Package node — schema subcommand. Operator-facing commands for +// inspecting and applying the embedded gateway schema migrations against +// the local RQLite instance. +// +// `orama node schema status` — non-destructive: shows binary's required +// schema version, applied version, and pending +// migrations. Useful in rolling-upgrade +// monitoring. +// +// `orama node schema apply` — applies any pending migrations. Idempotent +// and safe to re-run; ALTER TABLE failures for +// existing columns are tolerated. Confirms +// before running unless --yes is passed. +// +// These are the long-term fix for the "schema lag after gateway-only +// upgrade" class of incident. See migrations/contract.go for the contract. +package node + +import ( + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/DeBrosOfficial/network/migrations" + "github.com/DeBrosOfficial/network/pkg/config" + "github.com/DeBrosOfficial/network/pkg/rqlite" + "github.com/spf13/cobra" + "go.uber.org/zap" + + _ "github.com/rqlite/gorqlite/stdlib" +) + +var ( + schemaDSN string + schemaYes bool +) + +var schemaCmd = &cobra.Command{ + Use: "schema", + Short: "Inspect and apply gateway schema migrations against the local RQLite", + Long: `Schema lifecycle commands. + +The gateway binary embeds a set of SQL migrations. Each migration is numbered; +the highest number is the schema version the binary requires. After deploying +a new gateway binary, run 'orama node schema apply' on every namespace's RQLite +to bring the schema up to date — otherwise function deploys fail at runtime +with cryptic missing-column errors.`, +} + +var schemaStatusCmd = &cobra.Command{ + Use: "status", + Short: "Show required vs applied schema version + pending migrations", + RunE: func(cmd *cobra.Command, args []string) error { + db, dsn, err := openSchemaDB() + if err != nil { + return err + } + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + applied, err := migrations.AppliedVersion(ctx, db) + if err != nil { + return fmt.Errorf("query applied version: %w", err) + } + required := migrations.RequiredVersion() + pending, err := migrations.PendingMigrations(ctx, db) + if err != nil { + return fmt.Errorf("compute pending: %w", err) + } + + fmt.Printf("Connection: %s\n", dsn) + fmt.Printf("Required version: %d (highest migration in binary)\n", required) + fmt.Printf("Applied version: %d\n", applied) + switch { + case applied == required: + fmt.Printf("Status: ✓ up to date\n") + case applied > required: + fmt.Printf("Status: ⚠ database AHEAD of binary (%d > %d) — newer binary in cluster?\n", + applied, required) + default: + fmt.Printf("Status: ✗ BEHIND — %d migration(s) pending\n", len(pending)) + } + + if len(pending) > 0 { + fmt.Println("\nPending migrations:") + for _, m := range pending { + fmt.Printf(" %03d %s\n", m.Version, m.Name) + } + fmt.Println("\nRun 'sudo orama node schema apply' to apply them.") + } + return nil + }, +} + +var schemaApplyCmd = &cobra.Command{ + Use: "apply", + Short: "Apply pending migrations to the local RQLite", + Long: `Apply every embedded migration not yet recorded in schema_migrations. + +ALTER TABLE statements that target an already-existing column are tolerated +(the migration is marked complete). Other errors abort the run with the +schema in a partially-applied state — re-running is safe because each +migration is independently versioned.`, + RunE: func(cmd *cobra.Command, args []string) error { + db, dsn, err := openSchemaDB() + if err != nil { + return err + } + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + pending, err := migrations.PendingMigrations(ctx, db) + if err != nil { + return fmt.Errorf("compute pending: %w", err) + } + if len(pending) == 0 { + fmt.Printf("No pending migrations. Schema is at version %d.\n", migrations.RequiredVersion()) + return nil + } + + fmt.Printf("Will apply %d migration(s) to %s:\n", len(pending), dsn) + for _, m := range pending { + fmt.Printf(" %03d %s\n", m.Version, m.Name) + } + + if !schemaYes { + fmt.Print("\nProceed? [y/N]: ") + var ans string + _, _ = fmt.Scanln(&ans) + if strings.ToLower(strings.TrimSpace(ans)) != "y" { + fmt.Println("Aborted.") + return nil + } + } + + // Use the existing migration runner — it does the same thing the + // gateway does at startup, with idempotent-error tolerance. + logger, _ := zap.NewProduction() + defer func() { _ = logger.Sync() }() + + if err := rqlite.ApplyEmbeddedMigrations(ctx, db, migrations.FS, logger); err != nil { + return fmt.Errorf("apply failed: %w", err) + } + + // Verify post-apply. + if err := migrations.AssertSchema(ctx, db); err != nil { + return fmt.Errorf("apply completed but schema still lags: %w", err) + } + + fmt.Printf("\n✓ Schema now at version %d.\n", migrations.RequiredVersion()) + return nil + }, +} + +// openSchemaDB returns a *sql.DB connected to the local RQLite instance, +// using the --dsn flag if provided, else discovering from the node config +// or falling back to localhost:5001. +func openSchemaDB() (*sql.DB, string, error) { + dsn := schemaDSN + if dsn == "" { + dsn = discoverLocalRQLiteDSN() + } + db, err := sql.Open("rqlite", dsn) + if err != nil { + return nil, "", fmt.Errorf("open rqlite: %w", err) + } + // Quick liveness check so we fail fast with a clear error. + pingCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := db.PingContext(pingCtx); err != nil { + _ = db.Close() + return nil, "", fmt.Errorf("rqlite at %s unreachable: %w "+ + "(hint: is RQLite running? try 'orama node status')", dsn, err) + } + return db, dsn, nil +} + +// discoverLocalRQLiteDSN reads the node config to find the local RQLite +// port + credentials, falling back to localhost:5001 with no auth. +func discoverLocalRQLiteDSN() string { + const fallback = "http://localhost:5001" + + cfgPath, err := config.DefaultPath("node.yaml") + if err != nil { + return fallback + } + if _, err := os.Stat(cfgPath); err != nil { + return fallback + } + cfgDir := filepath.Dir(cfgPath) + + // Try to read RQLite credentials from the standard secrets path. + user, pass := readRQLiteCreds(cfgDir) + + port := readRQLitePortFromConfig(cfgPath) + if port == 0 { + port = 5001 + } + if user == "" { + return fmt.Sprintf("http://localhost:%d", port) + } + return fmt.Sprintf("http://%s:%s@localhost:%d", user, pass, port) +} + +// readRQLiteCreds best-effort reads the user:pass from secrets files +// adjacent to the node config. Returns ("","") on any miss; the caller +// then connects without auth (which works on a local-only instance). +func readRQLiteCreds(cfgDir string) (string, string) { + type pair struct{ userFile, passFile string } + candidates := []pair{ + {filepath.Join(cfgDir, "secrets", "rqlite-user"), filepath.Join(cfgDir, "secrets", "rqlite-password")}, + } + for _, c := range candidates { + u, err := os.ReadFile(c.userFile) + if err != nil { + continue + } + p, err := os.ReadFile(c.passFile) + if err != nil { + continue + } + return strings.TrimSpace(string(u)), strings.TrimSpace(string(p)) + } + return "", "" +} + +// readRQLitePortFromConfig is a tiny YAML peek for `database.rqlite_port`. +// Avoids pulling the whole config loader; failure returns 0 → fallback used. +func readRQLitePortFromConfig(path string) int { + data, err := os.ReadFile(path) + if err != nil { + return 0 + } + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(line, "rqlite_port:") { + continue + } + var port int + _, err := fmt.Sscanf(line, "rqlite_port: %d", &port) + if err == nil { + return port + } + } + return 0 +} + +func init() { + schemaCmd.PersistentFlags().StringVar(&schemaDSN, "dsn", "", + "RQLite DSN (default: discover from node config or localhost:5001)") + schemaApplyCmd.Flags().BoolVar(&schemaYes, "yes", false, + "Skip the confirmation prompt") + + schemaCmd.AddCommand(schemaStatusCmd) + schemaCmd.AddCommand(schemaApplyCmd) +} diff --git a/core/pkg/gateway/dependencies.go b/core/pkg/gateway/dependencies.go index e1e8221..671e0d8 100644 --- a/core/pkg/gateway/dependencies.go +++ b/core/pkg/gateway/dependencies.go @@ -206,14 +206,32 @@ func initializeRQLite(logger *logging.ColoredLogger, cfg *Config, deps *Dependen // Apply embedded migrations to ensure schema is up-to-date. // This is critical for namespace gateways whose RQLite instances // don't get migrations from the main cluster RQLiteManager. + // + // Failures here are FATAL: a gateway that can't bring its schema up + // to the version its binary expects will silently corrupt deploys + // later (e.g. INSERTing into missing columns and surfacing as a + // cryptic SQL error to end users). Better to refuse to start with + // a clear actionable error. migCtx, migCancel := context.WithTimeout(context.Background(), 30*time.Second) defer migCancel() if err := rqlite.ApplyEmbeddedMigrations(migCtx, db, migrations.FS, logger.Logger); err != nil { - logger.ComponentWarn(logging.ComponentGeneral, "Failed to apply embedded migrations to gateway RQLite", - zap.Error(err)) - } else { - logger.ComponentInfo(logging.ComponentGeneral, "Embedded migrations applied to gateway RQLite") + return fmt.Errorf("apply embedded migrations failed: %w "+ + "(hint: this gateway can't safely run without its required schema; "+ + "check the underlying RQLite cluster health and re-run startup)", err) } + logger.ComponentInfo(logging.ComponentGeneral, "Embedded migrations applied to gateway RQLite") + + // Schema-version contract: even if the apply call returned nil, verify + // that the highest migration the binary embeds is recorded as applied. + // Catches: + // - silent partial-apply states where the marker row was never written + // - clusters where the binary was upgraded but RQLite has stale schema + // - operator manually deleted rows from schema_migrations + if err := migrations.AssertSchema(migCtx, db); err != nil { + return fmt.Errorf("schema contract violation: %w", err) + } + logger.ComponentInfo(logging.ComponentGeneral, "Schema contract satisfied", + zap.Int("required_version", migrations.RequiredVersion())) return nil } diff --git a/core/pkg/rqlite/migrations.go b/core/pkg/rqlite/migrations.go index e817960..53d298e 100644 --- a/core/pkg/rqlite/migrations.go +++ b/core/pkg/rqlite/migrations.go @@ -257,6 +257,17 @@ func isNoSuchTable(err error) bool { // applySQL splits the script into individual statements, strips explicit // transaction control (BEGIN/COMMIT/ROLLBACK/END), and executes statements // sequentially to avoid nested transaction issues with rqlite. +// +// Idempotency: certain SQLite errors are treated as "already applied" so that +// a partially-applied migration can be safely re-run. Specifically: +// - "duplicate column name" — ALTER TABLE ADD COLUMN that already happened +// - "table ... already exists" — CREATE TABLE that already exists (when +// the migration didn't use IF NOT EXISTS) +// - "index ... already exists" — same for indexes +// +// This makes ALTER TABLE ADD COLUMN safe to retry, which is the common +// case where partial application leaves schema_migrations un-recorded +// but some columns added. func applySQL(ctx context.Context, db *sql.DB, script string) error { s := strings.TrimSpace(script) if s == "" { @@ -270,12 +281,38 @@ func applySQL(ctx context.Context, db *sql.DB, script string) error { continue } if _, err := db.ExecContext(ctx, stmt); err != nil { + if isAlreadyAppliedError(err) { + // Treat as no-op so the migration can be marked complete. + // We log via fmt.Sprintf into the returned error message — + // the caller of applySQL has the migration version + name + // and can decide how loud to be about it. + continue + } return fmt.Errorf("exec stmt failed: %w (stmt: %s)", err, snippet(stmt)) } } return nil } +// isAlreadyAppliedError returns true when an SQL error indicates the +// statement's effect is already in place (column exists, table exists, etc.). +// SQLite error messages are stable across versions for these cases. +func isAlreadyAppliedError(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "duplicate column name"): + return true + case strings.Contains(msg, "already exists"): + // Covers: "table X already exists", "index X already exists", + // "trigger X already exists", "view X already exists". + return true + } + return false +} + func containsToken(stmts []string, token string) bool { for _, s := range stmts { if strings.EqualFold(strings.TrimSpace(s), token) { diff --git a/core/pkg/rqlite/migrations_idempotent_test.go b/core/pkg/rqlite/migrations_idempotent_test.go new file mode 100644 index 0000000..cf997a2 --- /dev/null +++ b/core/pkg/rqlite/migrations_idempotent_test.go @@ -0,0 +1,192 @@ +package rqlite + +import ( + "context" + "database/sql" + "errors" + "strings" + "testing" + "testing/fstest" + + _ "github.com/mattn/go-sqlite3" + "go.uber.org/zap" +) + +// openTestDB returns an in-memory SQLite database for migration-runner tests. +// SQLite gives us identical semantics for the error messages we tolerate +// ("duplicate column name", "table ... already exists"), so we don't need a +// real RQLite to verify the idempotency logic. +func openTestDB(t *testing.T) *sql.DB { + t.Helper() + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + t.Fatalf("open: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + return db +} + +func TestIsAlreadyAppliedError(t *testing.T) { + cases := []struct { + err error + want bool + name string + }{ + {nil, false, "nil"}, + {errors.New("duplicate column name: foo"), true, "duplicate column"}, + {errors.New("Duplicate Column Name: foo"), true, "case insensitive duplicate column"}, + {errors.New("table foo already exists"), true, "table already exists"}, + {errors.New("index idx_x already exists"), true, "index already exists"}, + {errors.New("syntax error near token"), false, "real error"}, + {errors.New("no such table: missing"), false, "no such table is NOT idempotent"}, + {errors.New("UNIQUE constraint failed"), false, "UNIQUE violation is NOT idempotent"}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := isAlreadyAppliedError(c.err); got != c.want { + t.Errorf("isAlreadyAppliedError(%v) = %v, want %v", c.err, got, c.want) + } + }) + } +} + +func TestApplySQL_idempotent_alter_add_column(t *testing.T) { + db := openTestDB(t) + if _, err := db.Exec("CREATE TABLE t (id INTEGER PRIMARY KEY)"); err != nil { + t.Fatal(err) + } + + // First apply: adds column. + script := `ALTER TABLE t ADD COLUMN x INTEGER DEFAULT 0;` + if err := applySQL(context.Background(), db, script); err != nil { + t.Fatalf("first apply failed: %v", err) + } + + // Second apply: column already exists. Must NOT return an error — + // this is the critical idempotency property the AnChat-test bug + // hit: a re-run had to succeed without operator intervention. + if err := applySQL(context.Background(), db, script); err != nil { + t.Fatalf("second apply (idempotent re-run) failed: %v", err) + } + + // Verify column is there exactly once and queryable. + if _, err := db.Exec("INSERT INTO t (x) VALUES (42)"); err != nil { + t.Fatalf("INSERT after re-apply failed: %v", err) + } +} + +func TestApplySQL_idempotent_create_table(t *testing.T) { + db := openTestDB(t) + script := `CREATE TABLE foo (id INTEGER);` + if err := applySQL(context.Background(), db, script); err != nil { + t.Fatalf("first: %v", err) + } + if err := applySQL(context.Background(), db, script); err != nil { + t.Fatalf("re-apply CREATE TABLE without IF NOT EXISTS should be tolerated: %v", err) + } +} + +func TestApplySQL_real_errors_still_fail(t *testing.T) { + db := openTestDB(t) + // A genuine syntax error must still propagate — we don't want + // to swallow real bugs. + err := applySQL(context.Background(), db, "ALTER TABLE nonexistent_table ADD COLUMN x INT;") + if err == nil { + t.Fatal("expected error for ALTER on missing table") + } +} + +func TestApplyEmbeddedMigrations_partial_apply_can_recover(t *testing.T) { + db := openTestDB(t) + + // Simulate the AnChat scenario: someone manually added one of the + // columns from migration 025 (e.g. via direct rqlite query during + // debugging) but the schema_migrations row was never recorded. + if _, err := db.Exec("CREATE TABLE functions (id INTEGER PRIMARY KEY, name TEXT)"); err != nil { + t.Fatal(err) + } + if _, err := db.Exec("ALTER TABLE functions ADD COLUMN ws_persistent BOOLEAN DEFAULT FALSE"); err != nil { + t.Fatal(err) + } + + // Now run a migration that tries to add ws_persistent again + 3 new columns. + embeddedFS := fstest.MapFS{ + "001_persistent_ws.sql": &fstest.MapFile{ + Data: []byte(` + ALTER TABLE functions ADD COLUMN ws_persistent BOOLEAN DEFAULT FALSE; + ALTER TABLE functions ADD COLUMN ws_idle_timeout_sec INTEGER DEFAULT 0; + ALTER TABLE functions ADD COLUMN ws_max_frame_bytes INTEGER DEFAULT 0; + ALTER TABLE functions ADD COLUMN ws_max_inflight_per_conn INTEGER DEFAULT 0; + `), + }, + } + + if err := ApplyEmbeddedMigrations(context.Background(), db, embeddedFS, zap.NewNop()); err != nil { + t.Fatalf("partial-state apply should succeed: %v", err) + } + + // All four columns must be present and writable. + _, err := db.Exec(`INSERT INTO functions (id, name, ws_persistent, ws_idle_timeout_sec, ws_max_frame_bytes, ws_max_inflight_per_conn) VALUES (?, ?, ?, ?, ?, ?)`, + 1, "test", true, 30, 65536, 64) + if err != nil { + t.Fatalf("INSERT after recovery failed: %v", err) + } + + // schema_migrations must now record version 1 — re-runs are no-ops. + var v int + if err := db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&v); err != nil { + t.Fatalf("schema_migrations query: %v", err) + } + if v != 1 { + t.Errorf("expected schema_migrations to record version 1, got %d", v) + } +} + +func TestApplyEmbeddedMigrations_re_run_is_noop(t *testing.T) { + db := openTestDB(t) + embeddedFS := fstest.MapFS{ + "001_initial.sql": &fstest.MapFile{ + Data: []byte("CREATE TABLE foo (id INTEGER PRIMARY KEY);"), + }, + } + + for i := 0; i < 3; i++ { + if err := ApplyEmbeddedMigrations(context.Background(), db, embeddedFS, zap.NewNop()); err != nil { + t.Fatalf("apply iter %d: %v", i, err) + } + } + + // Schema-migrations should have version 1 exactly once. + var count int + if err := db.QueryRow("SELECT COUNT(*) FROM schema_migrations WHERE version = 1").Scan(&count); err != nil { + t.Fatalf("count: %v", err) + } + if count != 1 { + t.Errorf("expected exactly 1 schema_migrations row, got %d", count) + } +} + +func TestApplyEmbeddedMigrations_genuine_failure_aborts(t *testing.T) { + db := openTestDB(t) + embeddedFS := fstest.MapFS{ + "001_bad.sql": &fstest.MapFile{ + Data: []byte("THIS IS NOT VALID SQL;"), + }, + } + + err := ApplyEmbeddedMigrations(context.Background(), db, embeddedFS, zap.NewNop()) + if err == nil { + t.Fatal("expected error for invalid SQL") + } + if !strings.Contains(err.Error(), "apply migration") { + t.Errorf("error message lacks migration context: %v", err) + } + + // schema_migrations row must NOT have been written. + var v int + if err := db.QueryRow("SELECT COALESCE(MAX(version), 0) FROM schema_migrations").Scan(&v); err == nil { + if v != 0 { + t.Errorf("expected no schema_migrations row, got version %d", v) + } + } +} diff --git a/core/pkg/rqlite/rqlite.go b/core/pkg/rqlite/rqlite.go index a456ba8..1dee783 100644 --- a/core/pkg/rqlite/rqlite.go +++ b/core/pkg/rqlite/rqlite.go @@ -2,6 +2,7 @@ package rqlite import ( "context" + "database/sql" "fmt" "os" "os/exec" @@ -84,15 +85,28 @@ func (r *RQLiteManager) Start(ctx context.Context) error { return err } - // Apply embedded migrations - these are compiled into the binary + // Apply embedded migrations - these are compiled into the binary. + // We tolerate apply errors here (joining an existing cluster, transient + // leader election, etc.) — the gateway-process-side check is the + // authoritative gate for "is schema usable". See gateway.dependencies. if err := r.ApplyEmbeddedMigrations(ctx, migrations.FS); err != nil { r.logger.Error("Failed to apply embedded migrations", zap.Error(err)) - // Don't fail startup - migrations may have already been applied by another node - // or we may be joining an existing cluster } else { r.logger.Info("Database migrations applied successfully") } + // Schema-drift visibility: even when apply returned nil, log if the + // schema isn't at the binary's required version. Helps operators spot + // lag in the rolling-upgrade window before the gateway flips fatal. + if db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort)); err == nil { + defer db.Close() + if assertErr := migrations.AssertSchema(ctx, db); assertErr != nil { + r.logger.Warn("Schema below required version after apply", + zap.Int("required", migrations.RequiredVersion()), + zap.Error(assertErr)) + } + } + return nil }