diff --git a/cmd/node/main.go b/cmd/node/main.go index 8578fc9..fcc7b66 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -38,7 +38,7 @@ func parse_and_return_network_flags() (configPath *string, dataDir, nodeID *stri dataDir = flag.String("data", "", "Data directory (auto-detected if not provided)") nodeID = flag.String("id", "", "Node identifier (for running multiple local nodes)") p2pPort = flag.Int("p2p-port", 4001, "LibP2P listen port") - advAddr = flag.String("adv-addr", "127.0.0.1", "Default Advertise address for rqlite and rafts") + advAddr = flag.String("adv-addr", "0.0.0.0", "Default Advertise address for rqlite and rafts") help = flag.Bool("help", false, "Show help") flag.Parse() diff --git a/migrations/001_initial.sql b/migrations/001_initial.sql index 586c122..f4b989b 100644 --- a/migrations/001_initial.sql +++ b/migrations/001_initial.sql @@ -2,8 +2,6 @@ -- This file scaffolds core tables used by the HTTP gateway for auth, observability, and namespacing. -- Apply via your migration tooling or manual execution in RQLite. -BEGIN; - -- Tracks applied migrations (optional if your runner manages this separately) CREATE TABLE IF NOT EXISTS schema_migrations ( version INTEGER PRIMARY KEY, @@ -51,5 +49,3 @@ INSERT OR IGNORE INTO namespaces(name) VALUES ('default'); -- Mark this migration as applied (optional) INSERT OR IGNORE INTO schema_migrations(version) VALUES (1); - -COMMIT; diff --git a/migrations/002_core.sql b/migrations/002_core.sql index 790c506..55e20f9 100644 --- a/migrations/002_core.sql +++ b/migrations/002_core.sql @@ -2,8 +2,6 @@ -- Adds apps, nonces, subscriptions, refresh_tokens, audit_events, namespace_ownership -- SQLite/RQLite dialect -BEGIN; - -- Apps registered within a namespace (optional public key for attestation) CREATE TABLE IF NOT EXISTS apps ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -91,5 +89,3 @@ CREATE INDEX IF NOT EXISTS idx_ns_owner_ns ON namespace_ownership(namespace_id); -- Optional marker (ignored by runner) INSERT OR IGNORE INTO schema_migrations(version) VALUES (2); - -COMMIT; diff --git a/migrations/003_wallet_api_keys.sql b/migrations/003_wallet_api_keys.sql index 6c9e725..576109d 100644 --- a/migrations/003_wallet_api_keys.sql +++ b/migrations/003_wallet_api_keys.sql @@ -1,8 +1,6 @@ -- DeBros Gateway - Wallet to API Key linkage (Phase 3) -- Ensures one API key per (namespace, wallet) and enables lookup -BEGIN; - CREATE TABLE IF NOT EXISTS wallet_api_keys ( id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_id INTEGER NOT NULL, @@ -17,5 +15,3 @@ CREATE TABLE IF NOT EXISTS wallet_api_keys ( CREATE INDEX IF NOT EXISTS idx_wallet_api_keys_ns ON wallet_api_keys(namespace_id); INSERT OR IGNORE INTO schema_migrations(version) VALUES (3); - -COMMIT; diff --git a/pkg/rqlite/cluster_handlers.go b/pkg/rqlite/cluster_handlers.go index aca6712..77cd07c 100644 --- a/pkg/rqlite/cluster_handlers.go +++ b/pkg/rqlite/cluster_handlers.go @@ -125,6 +125,20 @@ func (cm *ClusterManager) handleCreateConfirm(msg *MetadataMessage) error { zap.String("coordinator", confirm.CoordinatorNodeID), zap.Int("nodes", len(confirm.SelectedNodes))) + // Check if database already exists or is being initialized (ignore duplicate confirmations) + cm.mu.RLock() + _, alreadyActive := cm.activeClusters[confirm.DatabaseName] + _, alreadyInitializing := cm.initializingDBs[confirm.DatabaseName] + cm.mu.RUnlock() + + if alreadyActive || alreadyInitializing { + cm.logger.Debug("Database already active or initializing on this node, ignoring confirmation", + zap.String("database", confirm.DatabaseName), + zap.Bool("active", alreadyActive), + zap.Bool("initializing", alreadyInitializing)) + return nil + } + // Check if this node was selected var myAssignment *NodeAssignment for i, node := range confirm.SelectedNodes { @@ -144,6 +158,11 @@ func (cm *ClusterManager) handleCreateConfirm(msg *MetadataMessage) error { zap.String("database", confirm.DatabaseName), zap.String("role", myAssignment.Role)) + // Mark database as initializing to prevent duplicate confirmations + cm.mu.Lock() + cm.initializingDBs[confirm.DatabaseName] = true + cm.mu.Unlock() + // Create database metadata portMappings := make(map[string]PortPair) nodeIDs := make([]string, len(confirm.SelectedNodes)) @@ -203,12 +222,21 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe // Join to the leader leaderNodeID := metadata.LeaderNodeID if leaderPorts, exists := metadata.PortMappings[leaderNodeID]; exists { - joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort) + joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.HTTPPort) + cm.logger.Info("Follower joining leader", + zap.String("database", metadata.DatabaseName), + zap.String("leader_node", leaderNodeID), + zap.String("join_address", joinAddr), + zap.Int("leader_raft_port", leaderPorts.RaftPort)) + } else { + cm.logger.Error("Leader node not found in port mappings", + zap.String("database", metadata.DatabaseName), + zap.String("leader_node", leaderNodeID)) } } - // Start the instance - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // Start the instance with longer timeout for bootstrap + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() if err := instance.Start(ctx, isLeader, joinAddr); err != nil { @@ -216,14 +244,20 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe zap.String("database", metadata.DatabaseName), zap.Error(err)) + // Clear initializing flag on failure + cm.mu.Lock() + delete(cm.initializingDBs, metadata.DatabaseName) + cm.mu.Unlock() + // Broadcast failure status cm.broadcastStatusUpdate(metadata.DatabaseName, StatusInitializing) return } - // Store active instance + // Store active instance and clear initializing flag cm.mu.Lock() cm.activeClusters[metadata.DatabaseName] = instance + delete(cm.initializingDBs, metadata.DatabaseName) cm.mu.Unlock() // Broadcast active status @@ -435,7 +469,7 @@ func (cm *ClusterManager) getAdvertiseAddress() string { } return addr } - return "127.0.0.1" + return "0.0.0.0" } // handleIdleNotification processes idle notifications from other nodes diff --git a/pkg/rqlite/cluster_manager.go b/pkg/rqlite/cluster_manager.go index 4d328ea..d305cf3 100644 --- a/pkg/rqlite/cluster_manager.go +++ b/pkg/rqlite/cluster_manager.go @@ -5,11 +5,13 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "time" "github.com/DeBrosOfficial/network/pkg/config" "github.com/DeBrosOfficial/network/pkg/pubsub" + "github.com/rqlite/gorqlite" "go.uber.org/zap" ) @@ -26,6 +28,7 @@ type ClusterManager struct { portManager *PortManager pubsubAdapter *pubsub.ClientAdapter coordinatorRegistry *CoordinatorRegistry + initializingDBs map[string]bool // Track databases currently being initialized mu sync.RWMutex ctx context.Context @@ -60,6 +63,7 @@ func NewClusterManager( portManager: portManager, pubsubAdapter: pubsubAdapter, coordinatorRegistry: NewCoordinatorRegistry(), + initializingDBs: make(map[string]bool), ctx: ctx, cancel: cancel, } @@ -140,10 +144,6 @@ func (cm *ClusterManager) handleMetadataMessage(topic string, data []byte) error return nil } - cm.logger.Debug("Received metadata message", - zap.String("type", string(msg.Type)), - zap.String("from", msg.NodeID)) - switch msg.Type { case MsgDatabaseCreateRequest: return cm.handleCreateRequest(msg) @@ -593,9 +593,9 @@ func (cm *ClusterManager) initializeSystemDatabase() error { zap.String("database", systemDBName), zap.Int("replication_factor", cm.config.ReplicationFactor)) - // Wait longer for nodes to discover each other + // Wait longer for nodes to discover each other and for system DB metadata to propagate cm.logger.Info("Waiting for peer discovery before system database creation...") - time.Sleep(5 * time.Second) + time.Sleep(15 * time.Second) // Check if system database already exists in metadata existingDB := cm.metadataStore.GetDatabase(systemDBName) @@ -611,14 +611,18 @@ func (cm *ClusterManager) initializeSystemDatabase() error { maxRetries := 3 var lastErr error for attempt := 1; attempt <= maxRetries; attempt++ { - // Check again if it was created by another node - if cm.metadataStore.GetDatabase(systemDBName) != nil { - cm.logger.Info("System database now exists (created by another node)") + // Check again if it was created by another node (metadata may have been received via pubsub) + existingDB = cm.metadataStore.GetDatabase(systemDBName) + if existingDB != nil { + cm.logger.Info("System database now exists (created by another node)", + zap.Int("attempt", attempt)) + lastErr = nil break } lastErr = cm.CreateDatabase(systemDBName, cm.config.ReplicationFactor) if lastErr == nil { + cm.logger.Info("System database creation initiated successfully") break } @@ -628,14 +632,17 @@ func (cm *ClusterManager) initializeSystemDatabase() error { zap.Error(lastErr)) if attempt < maxRetries { - // Wait before retry to allow more nodes to join + // Wait before retry to allow more nodes to join and metadata to sync + cm.logger.Info("Waiting before retry", + zap.Duration("wait_time", 3*time.Second)) time.Sleep(3 * time.Second) } } if lastErr != nil { - cm.logger.Warn("System database creation completed with errors (may be created by another node)", - zap.Error(lastErr)) + cm.logger.Info("System database creation completed with errors, will wait for it to become active", + zap.Error(lastErr), + zap.String("note", "This node may not be selected for system database hosting")) } } @@ -725,16 +732,31 @@ func (cm *ClusterManager) runMigrations(dbName string) error { return fmt.Errorf("failed to read migration file %s: %w", file, err) } - // Execute the migration using Execute (no automatic transaction wrapping) - // Migration files already contain BEGIN/COMMIT - _, err = conn.Execute(string(content)) - if err != nil { - cm.logger.Error("Migration failed", - zap.String("file", filepath.Base(file)), - zap.Error(err)) - // Continue with other migrations even if one fails - // (tables might already exist from previous runs) - continue + // Parse SQL content into individual statements + sqlContent := string(content) + + // Split by semicolon but preserve multi-line statements + // Simple approach: execute the whole file as one batch + statements := []string{sqlContent} + + // Execute using WriteParameterized to avoid auto-transaction wrapping + for _, stmt := range statements { + stmt = strings.TrimSpace(stmt) + if stmt == "" { + continue + } + + _, err = conn.WriteOneParameterized(gorqlite.ParameterizedStatement{ + Query: stmt, + }) + if err != nil { + cm.logger.Error("Migration failed", + zap.String("file", filepath.Base(file)), + zap.Error(err)) + // Continue with other migrations even if one fails + // (tables might already exist from previous runs) + break + } } cm.logger.Info("Migration completed", diff --git a/pkg/rqlite/instance.go b/pkg/rqlite/instance.go index e043d3e..9589c71 100644 --- a/pkg/rqlite/instance.go +++ b/pkg/rqlite/instance.go @@ -168,7 +168,7 @@ func (ri *RQLiteInstance) waitForReady(ctx context.Context) error { url := fmt.Sprintf("http://localhost:%d/status", ri.HTTPPort) client := &http.Client{Timeout: 2 * time.Second} - for i := 0; i < 30; i++ { + for i := 0; i < 60; i++ { select { case <-ctx.Done(): return ctx.Err() @@ -198,7 +198,7 @@ func (ri *RQLiteInstance) waitForSQLAvailable(ctx context.Context) error { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() - for i := 0; i < 30; i++ { + for i := 0; i < 60; i++ { select { case <-ctx.Done(): return ctx.Err()