diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index 971a09c..ea674d6 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -62,6 +62,28 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) ctx := client.WithInternalAuth(r.Context()) // Subscribe to the topic; push data into msgs with simple per-connection de-dup recent := make(map[string]time.Time) + + // Start cleanup goroutine to prevent memory leak + cleanupTicker := time.NewTicker(30 * time.Second) + defer cleanupTicker.Stop() + + go func() { + for { + select { + case <-cleanupTicker.C: + // Clean up old entries to prevent memory leak + now := time.Now() + for key, timestamp := range recent { + if now.Sub(timestamp) > 2*time.Minute { + delete(recent, key) + } + } + case <-ctx.Done(): + return + } + } + }() + h := func(_ string, data []byte) error { // Drop duplicates seen in the last 2 seconds sum := sha256.Sum256(data) diff --git a/pkg/rqlite/cluster_handlers.go b/pkg/rqlite/cluster_handlers.go index 77cd07c..cf429b9 100644 --- a/pkg/rqlite/cluster_handlers.go +++ b/pkg/rqlite/cluster_handlers.go @@ -125,21 +125,7 @@ func (cm *ClusterManager) handleCreateConfirm(msg *MetadataMessage) error { zap.String("coordinator", confirm.CoordinatorNodeID), zap.Int("nodes", len(confirm.SelectedNodes))) - // Check if database already exists or is being initialized (ignore duplicate confirmations) - cm.mu.RLock() - _, alreadyActive := cm.activeClusters[confirm.DatabaseName] - _, alreadyInitializing := cm.initializingDBs[confirm.DatabaseName] - cm.mu.RUnlock() - - if alreadyActive || alreadyInitializing { - cm.logger.Debug("Database already active or initializing on this node, ignoring confirmation", - zap.String("database", confirm.DatabaseName), - zap.Bool("active", alreadyActive), - zap.Bool("initializing", alreadyInitializing)) - return nil - } - - // Check if this node was selected + // Check if this node was selected first (before any locking) var myAssignment *NodeAssignment for i, node := range confirm.SelectedNodes { if node.NodeID == cm.nodeID { @@ -154,15 +140,29 @@ func (cm *ClusterManager) handleCreateConfirm(msg *MetadataMessage) error { return nil } + // Use atomic check-and-set to prevent race conditions + cm.mu.Lock() + defer cm.mu.Unlock() + + // Check if database already exists or is being initialized (atomic check) + _, alreadyActive := cm.activeClusters[confirm.DatabaseName] + _, alreadyInitializing := cm.initializingDBs[confirm.DatabaseName] + + if alreadyActive || alreadyInitializing { + cm.logger.Debug("Database already active or initializing on this node, ignoring confirmation", + zap.String("database", confirm.DatabaseName), + zap.Bool("active", alreadyActive), + zap.Bool("initializing", alreadyInitializing)) + return nil + } + + // Atomically mark database as initializing to prevent duplicate confirmations + cm.initializingDBs[confirm.DatabaseName] = true + cm.logger.Info("Selected to host database", zap.String("database", confirm.DatabaseName), zap.String("role", myAssignment.Role)) - // Mark database as initializing to prevent duplicate confirmations - cm.mu.Lock() - cm.initializingDBs[confirm.DatabaseName] = true - cm.mu.Unlock() - // Create database metadata portMappings := make(map[string]PortPair) nodeIDs := make([]string, len(confirm.SelectedNodes)) @@ -222,7 +222,7 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe // Join to the leader leaderNodeID := metadata.LeaderNodeID if leaderPorts, exists := metadata.PortMappings[leaderNodeID]; exists { - joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.HTTPPort) + joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort) cm.logger.Info("Follower joining leader", zap.String("database", metadata.DatabaseName), zap.String("leader_node", leaderNodeID), @@ -235,13 +235,34 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe } } - // Start the instance with longer timeout for bootstrap - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + // For followers with existing data, ensure we have a join address + if !isLeader && instance.hasExistingData() { + if joinAddr == "" { + cm.logger.Error("Follower has existing data but no join address available", + zap.String("database", metadata.DatabaseName)) + // Clear initializing flag + cm.mu.Lock() + delete(cm.initializingDBs, metadata.DatabaseName) + cm.mu.Unlock() + return + } + cm.logger.Info("Follower restarting with existing data, will rejoin cluster", + zap.String("database", metadata.DatabaseName), + zap.String("join_address", joinAddr)) + } + + // Start the instance with appropriate timeout + timeout := 60 * time.Second + if isLeader { + timeout = 90 * time.Second // Leaders need more time for bootstrap + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() if err := instance.Start(ctx, isLeader, joinAddr); err != nil { cm.logger.Error("Failed to start database instance", zap.String("database", metadata.DatabaseName), + zap.Bool("is_leader", isLeader), zap.Error(err)) // Clear initializing flag on failure @@ -254,6 +275,14 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe return } + // For followers, start background SQL readiness check + if !isLeader { + instance.StartBackgroundSQLReadinessCheck(cm.ctx, func() { + cm.logger.Info("Follower SQL became ready", + zap.String("database", metadata.DatabaseName)) + }) + } + // Store active instance and clear initializing flag cm.mu.Lock() cm.activeClusters[metadata.DatabaseName] = instance @@ -264,7 +293,23 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe cm.broadcastStatusUpdate(metadata.DatabaseName, StatusActive) cm.logger.Info("Database instance started and active", - zap.String("database", metadata.DatabaseName)) + zap.String("database", metadata.DatabaseName), + zap.Bool("is_leader", isLeader)) + + // Broadcast metadata sync to all nodes + syncMsg := MetadataSync{Metadata: metadata} + syncData, err := MarshalMetadataMessage(MsgMetadataSync, cm.nodeID, syncMsg) + if err == nil { + topic := "/debros/metadata/v1" + if err := cm.pubsubAdapter.Publish(cm.ctx, topic, syncData); err != nil { + cm.logger.Warn("Failed to broadcast metadata sync", + zap.String("database", metadata.DatabaseName), + zap.Error(err)) + } else { + cm.logger.Debug("Broadcasted metadata sync", + zap.String("database", metadata.DatabaseName)) + } + } } // handleStatusUpdate processes database status updates @@ -345,13 +390,18 @@ func (cm *ClusterManager) handleMetadataSync(msg *MetadataMessage) error { return nil } + cm.logger.Debug("Received metadata sync", + zap.String("database", sync.Metadata.DatabaseName), + zap.String("from_node", msg.NodeID)) + // Check if we need to update local metadata existing := cm.metadataStore.GetDatabase(sync.Metadata.DatabaseName) if existing == nil { // New database we didn't know about cm.metadataStore.SetDatabase(sync.Metadata) cm.logger.Info("Learned about new database via sync", - zap.String("database", sync.Metadata.DatabaseName)) + zap.String("database", sync.Metadata.DatabaseName), + zap.Strings("node_ids", sync.Metadata.NodeIDs)) return nil } @@ -360,7 +410,8 @@ func (cm *ClusterManager) handleMetadataSync(msg *MetadataMessage) error { if winner != existing { cm.metadataStore.SetDatabase(winner) cm.logger.Info("Updated database metadata via sync", - zap.String("database", sync.Metadata.DatabaseName)) + zap.String("database", sync.Metadata.DatabaseName), + zap.Uint64("new_version", winner.Version)) } return nil @@ -699,7 +750,7 @@ func (cm *ClusterManager) wakeupDatabase(dbName string, dbMeta *DatabaseMetadata joinAddr := "" if len(dbMeta.NodeIDs) > 0 && dbMeta.NodeIDs[0] != cm.nodeID { firstNodePorts := dbMeta.PortMappings[dbMeta.NodeIDs[0]] - joinAddr = fmt.Sprintf("http://%s:%d", cm.getAdvertiseAddress(), firstNodePorts.RaftPort) + joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), firstNodePorts.RaftPort) } // Create and start instance @@ -853,7 +904,7 @@ func (cm *ClusterManager) handleNodeReplacementOffer(msg *MetadataMessage) error continue // Skip failed nodes (would need proper tracking) } ports := dbMeta.PortMappings[nodeID] - joinAddr = fmt.Sprintf("http://%s:%d", cm.getAdvertiseAddress(), ports.RaftPort) + joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.RaftPort) break } diff --git a/pkg/rqlite/cluster_manager.go b/pkg/rqlite/cluster_manager.go index d305cf3..aa92fea 100644 --- a/pkg/rqlite/cluster_manager.go +++ b/pkg/rqlite/cluster_manager.go @@ -330,6 +330,35 @@ func (cm *ClusterManager) CreateDatabase(dbName string, replicationFactor int) e cm.logger.Info("Database creation confirmation broadcasted", zap.String("database", dbName)) + + // Create and broadcast metadata immediately + metadata := &DatabaseMetadata{ + DatabaseName: dbName, + NodeIDs: make([]string, len(assignments)), + PortMappings: make(map[string]PortPair), + Status: StatusInitializing, + CreatedAt: time.Now(), + LastAccessed: time.Now(), + LeaderNodeID: assignments[0].NodeID, + Version: 1, + VectorClock: NewVectorClock(), + } + + for i, node := range assignments { + metadata.NodeIDs[i] = node.NodeID + metadata.PortMappings[node.NodeID] = PortPair{ + HTTPPort: node.HTTPPort, + RaftPort: node.RaftPort, + } + } + + // Store locally + cm.metadataStore.SetDatabase(metadata) + + // Broadcast to all nodes + syncMsg := MetadataSync{Metadata: metadata} + syncData, _ := MarshalMetadataMessage(MsgMetadataSync, cm.nodeID, syncMsg) + cm.pubsubAdapter.Publish(cm.ctx, topic, syncData) } return nil @@ -443,15 +472,212 @@ func (cm *ClusterManager) monitorHealth() { // checkDatabaseHealth checks if all active databases are healthy func (cm *ClusterManager) checkDatabaseHealth() { cm.mu.RLock() - defer cm.mu.RUnlock() - + failedDatabases := make([]string, 0) for dbName, instance := range cm.activeClusters { if !instance.IsRunning() { - cm.logger.Warn("Database instance is not running", - zap.String("database", dbName)) - // TODO: Implement recovery logic + failedDatabases = append(failedDatabases, dbName) } } + cm.mu.RUnlock() + + // Attempt recovery for failed databases + for _, dbName := range failedDatabases { + cm.logger.Warn("Database instance is not running, attempting recovery", + zap.String("database", dbName)) + + // Get database metadata + metadata := cm.metadataStore.GetDatabase(dbName) + if metadata == nil { + cm.logger.Error("Cannot recover database: metadata not found", + zap.String("database", dbName)) + continue + } + + // Check if this node is still supposed to host this database + isMember := false + for _, nodeID := range metadata.NodeIDs { + if nodeID == cm.nodeID { + isMember = true + break + } + } + + if !isMember { + cm.logger.Info("Node is no longer a member of database, removing from active clusters", + zap.String("database", dbName)) + cm.mu.Lock() + delete(cm.activeClusters, dbName) + cm.mu.Unlock() + continue + } + + // Attempt to restart the database instance + go cm.attemptDatabaseRecovery(dbName, metadata) + } +} + +// attemptDatabaseRecovery attempts to recover a failed database instance +func (cm *ClusterManager) attemptDatabaseRecovery(dbName string, metadata *DatabaseMetadata) { + cm.logger.Info("Attempting database recovery", + zap.String("database", dbName)) + + // Check if we have quorum before attempting recovery + if !cm.hasQuorum(metadata) { + cm.logger.Warn("Cannot recover database: insufficient quorum", + zap.String("database", dbName), + zap.Int("required", (len(metadata.NodeIDs)/2)+1), + zap.Int("active", len(cm.getActiveMembers(metadata)))) + return + } + + // Determine if this node should be the leader + isLeader := metadata.LeaderNodeID == cm.nodeID + + // Get ports for this database + ports, exists := metadata.PortMappings[cm.nodeID] + if !exists { + cm.logger.Error("Cannot recover database: port mapping not found", + zap.String("database", dbName), + zap.String("node_id", cm.nodeID)) + return + } + + // Create advertised addresses + advHTTPAddr := fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.HTTPPort) + advRaftAddr := fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.RaftPort) + + // Create new instance + instance := NewRQLiteInstance( + dbName, + ports, + cm.dataDir, + advHTTPAddr, + advRaftAddr, + cm.logger.With(zap.String("database", dbName)), + ) + + // Determine join address if not leader + var joinAddr string + if !isLeader && len(metadata.NodeIDs) > 1 { + // Get list of active members + activeMembers := cm.getActiveMembers(metadata) + + // Prefer the leader if healthy + for _, nodeID := range activeMembers { + if nodeID == metadata.LeaderNodeID && nodeID != cm.nodeID { + if leaderPorts, exists := metadata.PortMappings[nodeID]; exists { + joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort) + cm.logger.Info("Recovery: joining healthy leader", + zap.String("database", dbName), + zap.String("leader_node", nodeID), + zap.String("join_address", joinAddr)) + break + } + } + } + + // If leader not available, try any other healthy node + if joinAddr == "" { + for _, nodeID := range activeMembers { + if nodeID != cm.nodeID { + if nodePorts, exists := metadata.PortMappings[nodeID]; exists { + joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), nodePorts.RaftPort) + cm.logger.Info("Recovery: joining healthy follower", + zap.String("database", dbName), + zap.String("node", nodeID), + zap.String("join_address", joinAddr)) + break + } + } + } + } + + // If no healthy nodes found, warn and fail + if joinAddr == "" { + cm.logger.Error("Cannot recover: no healthy nodes available to join", + zap.String("database", dbName), + zap.Int("total_nodes", len(metadata.NodeIDs)), + zap.Int("active_nodes", len(activeMembers))) + return + } + } + + // Check if instance has existing state + if instance.hasExistingData() { + wasInCluster := instance.wasInCluster() + cm.logger.Info("Recovery: found existing RQLite state", + zap.String("database", dbName), + zap.Bool("is_leader", isLeader), + zap.Bool("was_in_cluster", wasInCluster)) + + // For leaders with existing cluster state, peer config will be cleared + if isLeader && wasInCluster { + cm.logger.Info("Recovery: leader will clear peer configuration for clean restart", + zap.String("database", dbName)) + } + + // For followers, ensure join address is valid and will use join-as + if !isLeader { + if joinAddr == "" { + cm.logger.Error("Cannot recover follower without join address", + zap.String("database", dbName)) + return + } + if wasInCluster { + cm.logger.Info("Recovery: follower will rejoin cluster as voter", + zap.String("database", dbName), + zap.String("join_address", joinAddr)) + } + } + } + + // Start the instance + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := instance.Start(ctx, isLeader, joinAddr); err != nil { + cm.logger.Error("Database recovery failed", + zap.String("database", dbName), + zap.Error(err)) + return + } + + // Update active clusters + cm.mu.Lock() + cm.activeClusters[dbName] = instance + cm.mu.Unlock() + + // Update metadata status + metadata.Status = StatusActive + UpdateDatabaseMetadata(metadata, cm.nodeID) + cm.metadataStore.SetDatabase(metadata) + + // Broadcast status update + statusUpdate := DatabaseStatusUpdate{ + DatabaseName: dbName, + NodeID: cm.nodeID, + Status: StatusActive, + HTTPPort: ports.HTTPPort, + RaftPort: ports.RaftPort, + } + + msgData, err := MarshalMetadataMessage(MsgDatabaseStatusUpdate, cm.nodeID, statusUpdate) + if err != nil { + cm.logger.Warn("Failed to marshal status update during recovery", + zap.String("database", dbName), + zap.Error(err)) + } else { + topic := "/debros/metadata/v1" + if err := cm.pubsubAdapter.Publish(cm.ctx, topic, msgData); err != nil { + cm.logger.Warn("Failed to publish status update during recovery", + zap.String("database", dbName), + zap.Error(err)) + } + } + + cm.logger.Info("Database recovery completed successfully", + zap.String("database", dbName), + zap.Bool("is_leader", isLeader)) } // monitorIdleDatabases monitors for idle databases to hibernate @@ -582,6 +808,50 @@ func (cm *ClusterManager) reconcileOrphanedData() { zap.Int("orphans_found", orphanCount)) } +// isNodeHealthy checks if a node is healthy and recently active +func (cm *ClusterManager) isNodeHealthy(nodeID string) bool { + node := cm.metadataStore.GetNode(nodeID) + if node == nil { + return false + } + + // Consider node stale if not heard from in 30 seconds + staleDuration := 30 * time.Second + if time.Since(node.LastHealthCheck) > staleDuration { + return false + } + + return node.IsHealthy +} + +// hasQuorum checks if there are enough healthy nodes for a database +func (cm *ClusterManager) hasQuorum(metadata *DatabaseMetadata) bool { + if metadata == nil { + return false + } + + activeNodes := 0 + for _, nodeID := range metadata.NodeIDs { + if cm.isNodeHealthy(nodeID) || nodeID == cm.nodeID { + activeNodes++ + } + } + + requiredQuorum := (len(metadata.NodeIDs) / 2) + 1 + return activeNodes >= requiredQuorum +} + +// getActiveMembers returns list of active member node IDs for a database +func (cm *ClusterManager) getActiveMembers(metadata *DatabaseMetadata) []string { + activeMembers := make([]string, 0) + for _, nodeID := range metadata.NodeIDs { + if cm.isNodeHealthy(nodeID) || nodeID == cm.nodeID { + activeMembers = append(activeMembers, nodeID) + } + } + return activeMembers +} + // initializeSystemDatabase creates and starts the system database on this node func (cm *ClusterManager) initializeSystemDatabase() error { systemDBName := cm.config.SystemDatabaseName @@ -600,6 +870,34 @@ func (cm *ClusterManager) initializeSystemDatabase() error { // Check if system database already exists in metadata existingDB := cm.metadataStore.GetDatabase(systemDBName) if existingDB != nil { + cm.logger.Info("System database already exists in metadata, checking local instance", + zap.String("database", systemDBName), + zap.Int("member_count", len(existingDB.NodeIDs)), + zap.Strings("members", existingDB.NodeIDs)) + + // Check quorum status + hasQuorum := cm.hasQuorum(existingDB) + cm.logger.Info("System database quorum status", + zap.String("database", systemDBName), + zap.Bool("has_quorum", hasQuorum), + zap.Int("active_members", len(cm.getActiveMembers(existingDB)))) + + // Check if this node is a member + isMember := false + for _, nodeID := range existingDB.NodeIDs { + if nodeID == cm.nodeID { + isMember = true + break + } + } + + if !isMember { + cm.logger.Info("This node is not a member of existing system database, skipping creation", + zap.String("database", systemDBName)) + return nil + } + + // Fall through to wait for activation cm.logger.Info("System database already exists in metadata, waiting for it to become active", zap.String("database", systemDBName)) } else { diff --git a/pkg/rqlite/instance.go b/pkg/rqlite/instance.go index 9589c71..e7267ad 100644 --- a/pkg/rqlite/instance.go +++ b/pkg/rqlite/instance.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "syscall" "time" @@ -44,6 +45,67 @@ func NewRQLiteInstance(dbName string, ports PortPair, dataDir string, advHTTPAdd } } +// hasExistingData checks if the data directory contains existing RQLite state +func (ri *RQLiteInstance) hasExistingData() bool { + // Check for raft.db which indicates existing cluster state + raftDBPath := filepath.Join(ri.DataDir, "raft.db") + if _, err := os.Stat(raftDBPath); err == nil { + return true + } + return false +} + +// wasInCluster checks if this node was previously part of a Raft cluster +func (ri *RQLiteInstance) wasInCluster() bool { + if !ri.hasExistingData() { + return false + } + + // Check for peers.json which indicates cluster membership + peersFile := filepath.Join(ri.DataDir, "raft", "peers.json") + if _, err := os.Stat(peersFile); err == nil { + return true + } + + // Alternative: check raft log size - if > 0, was in cluster + raftDBPath := filepath.Join(ri.DataDir, "raft.db") + if info, err := os.Stat(raftDBPath); err == nil && info.Size() > 0 { + return true + } + + return false +} + +// clearRaftState removes Raft log and snapshots to allow clean leader restart +// This is more aggressive than clearPeerConfiguration and resets the entire cluster state +func (ri *RQLiteInstance) clearRaftState() error { + // Remove Raft log and cluster config without touching SQLite data + paths := []string{ + filepath.Join(ri.DataDir, "raft.db"), + filepath.Join(ri.DataDir, "raft"), // contains peers.json/info and other raft state + filepath.Join(ri.DataDir, "rsnapshots"), // raft snapshots + } + + var firstErr error + for _, p := range paths { + if _, err := os.Stat(p); err == nil { + if err := os.RemoveAll(p); err != nil && firstErr == nil { + firstErr = err + ri.logger.Warn("Failed to remove Raft state path", + zap.String("database", ri.DatabaseName), + zap.String("path", p), + zap.Error(err)) + } else { + ri.logger.Info("Cleared Raft state path", + zap.String("database", ri.DatabaseName), + zap.String("path", p)) + } + } + } + + return firstErr +} + // Start starts the rqlite subprocess func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr string) error { // Create data directory @@ -51,6 +113,35 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str return fmt.Errorf("failed to create data directory: %w", err) } + // Check for existing data and clear peer config if needed BEFORE starting RQLite + hasExisting := ri.hasExistingData() + if hasExisting { + wasInCluster := ri.wasInCluster() + ri.logger.Info("Found existing RQLite data, reusing state", + zap.String("database", ri.DatabaseName), + zap.String("data_dir", ri.DataDir), + zap.Bool("is_leader", isLeader), + zap.Bool("was_in_cluster", wasInCluster), + zap.String("join_address", joinAddr)) + + // Clear Raft state for leaders with existing cluster state BEFORE starting RQLite + if isLeader && wasInCluster && joinAddr == "" { + ri.logger.Warn("Leader has existing cluster state - clearing Raft state for clean restart", + zap.String("database", ri.DatabaseName)) + if err := ri.clearRaftState(); err != nil { + ri.logger.Warn("Failed to clear Raft state", zap.Error(err)) + } else { + ri.logger.Info("Cleared Raft log and snapshots; node will bootstrap as single-node and accept joins", + zap.String("database", ri.DatabaseName)) + } + } + } else { + ri.logger.Info("No existing RQLite data, starting fresh", + zap.String("database", ri.DatabaseName), + zap.String("data_dir", ri.DataDir), + zap.Bool("is_leader", isLeader)) + } + // Build rqlited command args := []string{ "-http-addr", fmt.Sprintf("0.0.0.0:%d", ri.HTTPPort), @@ -68,11 +159,34 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str // Add join address if this is a follower if !isLeader && joinAddr != "" { args = append(args, "-join", joinAddr) + // Force rejoin if we have existing cluster state + if ri.wasInCluster() { + args = append(args, "-join-as", "voter") + ri.logger.Info("Follower will rejoin cluster as voter", + zap.String("database", ri.DatabaseName)) + } } // Add data directory as positional argument args = append(args, ri.DataDir) + // Check for conflicting configuration: bootstrap + existing data + if isLeader && !strings.Contains(strings.Join(args, " "), "-join") { + // This is a bootstrap scenario (leader without join) + if ri.hasExistingData() { + ri.logger.Warn("Detected existing Raft state, will not bootstrap", + zap.String("database", ri.DatabaseName), + zap.String("data_dir", ri.DataDir)) + // Remove any bootstrap-only flags if they exist + // RQLite will detect existing state and continue as member + } + } + + // For followers with existing data, verify join address is set + if !isLeader && joinAddr == "" && ri.hasExistingData() { + return fmt.Errorf("follower has existing Raft state but no join address provided") + } + ri.logger.Info("Starting RQLite instance", zap.String("database", ri.DatabaseName), zap.Int("http_port", ri.HTTPPort), @@ -84,18 +198,24 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str // Start RQLite process ri.Cmd = exec.Command("rqlited", args...) - // Optionally capture stdout/stderr for debugging - // ri.Cmd.Stdout = os.Stdout - // ri.Cmd.Stderr = os.Stderr + // Capture stdout/stderr for debugging + ri.Cmd.Stdout = os.Stdout + ri.Cmd.Stderr = os.Stderr if err := ri.Cmd.Start(); err != nil { - return fmt.Errorf("failed to start rqlited: %w", err) + return fmt.Errorf("failed to start rqlited binary (check if installed): %w", err) } // Wait for RQLite to be ready if err := ri.waitForReady(ctx); err != nil { + ri.logger.Error("RQLite failed to become ready", + zap.String("database", ri.DatabaseName), + zap.String("data_dir", ri.DataDir), + zap.Int("http_port", ri.HTTPPort), + zap.Int("raft_port", ri.RaftPort), + zap.Error(err)) ri.Stop() - return fmt.Errorf("rqlited failed to become ready: %w", err) + return fmt.Errorf("rqlited failed to become ready (check logs above): %w", err) } // Create connection @@ -106,17 +226,34 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str } ri.Connection = conn - // Wait for SQL availability - if err := ri.waitForSQLAvailable(ctx); err != nil { - ri.Stop() - return fmt.Errorf("rqlited SQL not available: %w", err) + // For leaders, wait for SQL to be immediately available + // For followers, SQL will become available after cluster sync + if isLeader { + if err := ri.waitForSQLAvailable(ctx); err != nil { + ri.Stop() + return fmt.Errorf("leader SQL not available: %w", err) + } + ri.logger.Info("Leader SQL is ready", + zap.String("database", ri.DatabaseName)) + } else { + // For followers, just verify the node joined successfully + if err := ri.waitForClusterJoin(ctx, 30*time.Second); err != nil { + ri.logger.Warn("Follower may not have joined cluster yet, but continuing", + zap.String("database", ri.DatabaseName), + zap.Error(err)) + // Don't fail - SQL will become available eventually + } else { + ri.logger.Info("Follower successfully joined cluster", + zap.String("database", ri.DatabaseName)) + } } ri.Status = StatusActive ri.LastQuery = time.Now() ri.logger.Info("RQLite instance started successfully", - zap.String("database", ri.DatabaseName)) + zap.String("database", ri.DatabaseName), + zap.Bool("is_leader", isLeader)) return nil } @@ -205,17 +342,105 @@ func (ri *RQLiteInstance) waitForSQLAvailable(ctx context.Context) error { case <-ticker.C: _, err := ri.Connection.QueryOne("SELECT 1") if err == nil { + ri.logger.Info("SQL queries are now available", + zap.String("database", ri.DatabaseName), + zap.Int("attempts", i+1)) return nil } + // Log every 5 seconds with more detail if i%5 == 0 { ri.logger.Debug("Waiting for RQLite SQL availability", zap.String("database", ri.DatabaseName), + zap.Int("attempt", i+1), + zap.Int("max_attempts", 60), zap.Error(err)) } } } - return fmt.Errorf("rqlited SQL not available within timeout") + return fmt.Errorf("rqlited SQL not available within timeout (60 seconds)") +} + +// waitForClusterJoin waits for a follower node to successfully join the cluster +// This checks the /status endpoint for cluster membership info +func (ri *RQLiteInstance) waitForClusterJoin(ctx context.Context, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + client := &http.Client{Timeout: 2 * time.Second} + statusURL := fmt.Sprintf("http://localhost:%d/status", ri.HTTPPort) + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + attempts := 0 + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + attempts++ + + resp, err := client.Get(statusURL) + if err != nil { + if attempts%5 == 0 { + ri.logger.Debug("Checking cluster join status", + zap.String("database", ri.DatabaseName), + zap.Int("attempt", attempts), + zap.Error(err)) + } + continue + } + + // Check if status code is OK + if resp.StatusCode == http.StatusOK { + resp.Body.Close() + // If we can query status, the node has likely joined + // Try a simple SQL query to confirm + if ri.Connection != nil { + _, err := ri.Connection.QueryOne("SELECT 1") + if err == nil { + return nil // SQL is ready! + } + } + // Even if SQL not ready, status endpoint being available is good enough + if attempts >= 5 { + // After a few attempts, accept status endpoint as sufficient + return nil + } + } else { + resp.Body.Close() + } + } + } + + return fmt.Errorf("cluster join check timed out after %v", timeout) +} + +// StartBackgroundSQLReadinessCheck starts a background check for SQL readiness +// This is used for followers that may take time to sync cluster state +func (ri *RQLiteInstance) StartBackgroundSQLReadinessCheck(ctx context.Context, onReady func()) { + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if ri.Connection != nil { + _, err := ri.Connection.QueryOne("SELECT 1") + if err == nil { + ri.logger.Info("Follower SQL is now ready", + zap.String("database", ri.DatabaseName)) + if onReady != nil { + onReady() + } + return // SQL is ready, stop checking + } + } + } + } + }() } // UpdateLastQuery updates the last query timestamp