Add memory leak prevention in pubsub handler and improve database recovery logic

- Implemented a cleanup goroutine in the pubsub handler to remove old entries from the recent connections map, preventing memory leaks.
- Enhanced the database recovery process in the cluster manager by adding checks for quorum and ensuring proper handling of existing data during instance startup.
- Updated instance management to clear Raft state for leaders with existing cluster state and added background SQL readiness checks for followers.
- Improved logging for better visibility during recovery and instance startup processes.
This commit is contained in:
anonpenguin23 2025-10-16 10:10:33 +03:00
parent 94248e40b0
commit dd4cb832dc
No known key found for this signature in database
GPG Key ID: 1CBB1FE35AFBEE30
4 changed files with 640 additions and 44 deletions

View File

@ -62,6 +62,28 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
ctx := client.WithInternalAuth(r.Context()) ctx := client.WithInternalAuth(r.Context())
// Subscribe to the topic; push data into msgs with simple per-connection de-dup // Subscribe to the topic; push data into msgs with simple per-connection de-dup
recent := make(map[string]time.Time) recent := make(map[string]time.Time)
// Start cleanup goroutine to prevent memory leak
cleanupTicker := time.NewTicker(30 * time.Second)
defer cleanupTicker.Stop()
go func() {
for {
select {
case <-cleanupTicker.C:
// Clean up old entries to prevent memory leak
now := time.Now()
for key, timestamp := range recent {
if now.Sub(timestamp) > 2*time.Minute {
delete(recent, key)
}
}
case <-ctx.Done():
return
}
}
}()
h := func(_ string, data []byte) error { h := func(_ string, data []byte) error {
// Drop duplicates seen in the last 2 seconds // Drop duplicates seen in the last 2 seconds
sum := sha256.Sum256(data) sum := sha256.Sum256(data)

View File

@ -125,21 +125,7 @@ func (cm *ClusterManager) handleCreateConfirm(msg *MetadataMessage) error {
zap.String("coordinator", confirm.CoordinatorNodeID), zap.String("coordinator", confirm.CoordinatorNodeID),
zap.Int("nodes", len(confirm.SelectedNodes))) zap.Int("nodes", len(confirm.SelectedNodes)))
// Check if database already exists or is being initialized (ignore duplicate confirmations) // Check if this node was selected first (before any locking)
cm.mu.RLock()
_, alreadyActive := cm.activeClusters[confirm.DatabaseName]
_, alreadyInitializing := cm.initializingDBs[confirm.DatabaseName]
cm.mu.RUnlock()
if alreadyActive || alreadyInitializing {
cm.logger.Debug("Database already active or initializing on this node, ignoring confirmation",
zap.String("database", confirm.DatabaseName),
zap.Bool("active", alreadyActive),
zap.Bool("initializing", alreadyInitializing))
return nil
}
// Check if this node was selected
var myAssignment *NodeAssignment var myAssignment *NodeAssignment
for i, node := range confirm.SelectedNodes { for i, node := range confirm.SelectedNodes {
if node.NodeID == cm.nodeID { if node.NodeID == cm.nodeID {
@ -154,15 +140,29 @@ func (cm *ClusterManager) handleCreateConfirm(msg *MetadataMessage) error {
return nil return nil
} }
// Use atomic check-and-set to prevent race conditions
cm.mu.Lock()
defer cm.mu.Unlock()
// Check if database already exists or is being initialized (atomic check)
_, alreadyActive := cm.activeClusters[confirm.DatabaseName]
_, alreadyInitializing := cm.initializingDBs[confirm.DatabaseName]
if alreadyActive || alreadyInitializing {
cm.logger.Debug("Database already active or initializing on this node, ignoring confirmation",
zap.String("database", confirm.DatabaseName),
zap.Bool("active", alreadyActive),
zap.Bool("initializing", alreadyInitializing))
return nil
}
// Atomically mark database as initializing to prevent duplicate confirmations
cm.initializingDBs[confirm.DatabaseName] = true
cm.logger.Info("Selected to host database", cm.logger.Info("Selected to host database",
zap.String("database", confirm.DatabaseName), zap.String("database", confirm.DatabaseName),
zap.String("role", myAssignment.Role)) zap.String("role", myAssignment.Role))
// Mark database as initializing to prevent duplicate confirmations
cm.mu.Lock()
cm.initializingDBs[confirm.DatabaseName] = true
cm.mu.Unlock()
// Create database metadata // Create database metadata
portMappings := make(map[string]PortPair) portMappings := make(map[string]PortPair)
nodeIDs := make([]string, len(confirm.SelectedNodes)) nodeIDs := make([]string, len(confirm.SelectedNodes))
@ -222,7 +222,7 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe
// Join to the leader // Join to the leader
leaderNodeID := metadata.LeaderNodeID leaderNodeID := metadata.LeaderNodeID
if leaderPorts, exists := metadata.PortMappings[leaderNodeID]; exists { if leaderPorts, exists := metadata.PortMappings[leaderNodeID]; exists {
joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.HTTPPort) joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort)
cm.logger.Info("Follower joining leader", cm.logger.Info("Follower joining leader",
zap.String("database", metadata.DatabaseName), zap.String("database", metadata.DatabaseName),
zap.String("leader_node", leaderNodeID), zap.String("leader_node", leaderNodeID),
@ -235,13 +235,34 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe
} }
} }
// Start the instance with longer timeout for bootstrap // For followers with existing data, ensure we have a join address
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) if !isLeader && instance.hasExistingData() {
if joinAddr == "" {
cm.logger.Error("Follower has existing data but no join address available",
zap.String("database", metadata.DatabaseName))
// Clear initializing flag
cm.mu.Lock()
delete(cm.initializingDBs, metadata.DatabaseName)
cm.mu.Unlock()
return
}
cm.logger.Info("Follower restarting with existing data, will rejoin cluster",
zap.String("database", metadata.DatabaseName),
zap.String("join_address", joinAddr))
}
// Start the instance with appropriate timeout
timeout := 60 * time.Second
if isLeader {
timeout = 90 * time.Second // Leaders need more time for bootstrap
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
if err := instance.Start(ctx, isLeader, joinAddr); err != nil { if err := instance.Start(ctx, isLeader, joinAddr); err != nil {
cm.logger.Error("Failed to start database instance", cm.logger.Error("Failed to start database instance",
zap.String("database", metadata.DatabaseName), zap.String("database", metadata.DatabaseName),
zap.Bool("is_leader", isLeader),
zap.Error(err)) zap.Error(err))
// Clear initializing flag on failure // Clear initializing flag on failure
@ -254,6 +275,14 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe
return return
} }
// For followers, start background SQL readiness check
if !isLeader {
instance.StartBackgroundSQLReadinessCheck(cm.ctx, func() {
cm.logger.Info("Follower SQL became ready",
zap.String("database", metadata.DatabaseName))
})
}
// Store active instance and clear initializing flag // Store active instance and clear initializing flag
cm.mu.Lock() cm.mu.Lock()
cm.activeClusters[metadata.DatabaseName] = instance cm.activeClusters[metadata.DatabaseName] = instance
@ -264,7 +293,23 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe
cm.broadcastStatusUpdate(metadata.DatabaseName, StatusActive) cm.broadcastStatusUpdate(metadata.DatabaseName, StatusActive)
cm.logger.Info("Database instance started and active", cm.logger.Info("Database instance started and active",
zap.String("database", metadata.DatabaseName)) zap.String("database", metadata.DatabaseName),
zap.Bool("is_leader", isLeader))
// Broadcast metadata sync to all nodes
syncMsg := MetadataSync{Metadata: metadata}
syncData, err := MarshalMetadataMessage(MsgMetadataSync, cm.nodeID, syncMsg)
if err == nil {
topic := "/debros/metadata/v1"
if err := cm.pubsubAdapter.Publish(cm.ctx, topic, syncData); err != nil {
cm.logger.Warn("Failed to broadcast metadata sync",
zap.String("database", metadata.DatabaseName),
zap.Error(err))
} else {
cm.logger.Debug("Broadcasted metadata sync",
zap.String("database", metadata.DatabaseName))
}
}
} }
// handleStatusUpdate processes database status updates // handleStatusUpdate processes database status updates
@ -345,13 +390,18 @@ func (cm *ClusterManager) handleMetadataSync(msg *MetadataMessage) error {
return nil return nil
} }
cm.logger.Debug("Received metadata sync",
zap.String("database", sync.Metadata.DatabaseName),
zap.String("from_node", msg.NodeID))
// Check if we need to update local metadata // Check if we need to update local metadata
existing := cm.metadataStore.GetDatabase(sync.Metadata.DatabaseName) existing := cm.metadataStore.GetDatabase(sync.Metadata.DatabaseName)
if existing == nil { if existing == nil {
// New database we didn't know about // New database we didn't know about
cm.metadataStore.SetDatabase(sync.Metadata) cm.metadataStore.SetDatabase(sync.Metadata)
cm.logger.Info("Learned about new database via sync", cm.logger.Info("Learned about new database via sync",
zap.String("database", sync.Metadata.DatabaseName)) zap.String("database", sync.Metadata.DatabaseName),
zap.Strings("node_ids", sync.Metadata.NodeIDs))
return nil return nil
} }
@ -360,7 +410,8 @@ func (cm *ClusterManager) handleMetadataSync(msg *MetadataMessage) error {
if winner != existing { if winner != existing {
cm.metadataStore.SetDatabase(winner) cm.metadataStore.SetDatabase(winner)
cm.logger.Info("Updated database metadata via sync", cm.logger.Info("Updated database metadata via sync",
zap.String("database", sync.Metadata.DatabaseName)) zap.String("database", sync.Metadata.DatabaseName),
zap.Uint64("new_version", winner.Version))
} }
return nil return nil
@ -699,7 +750,7 @@ func (cm *ClusterManager) wakeupDatabase(dbName string, dbMeta *DatabaseMetadata
joinAddr := "" joinAddr := ""
if len(dbMeta.NodeIDs) > 0 && dbMeta.NodeIDs[0] != cm.nodeID { if len(dbMeta.NodeIDs) > 0 && dbMeta.NodeIDs[0] != cm.nodeID {
firstNodePorts := dbMeta.PortMappings[dbMeta.NodeIDs[0]] firstNodePorts := dbMeta.PortMappings[dbMeta.NodeIDs[0]]
joinAddr = fmt.Sprintf("http://%s:%d", cm.getAdvertiseAddress(), firstNodePorts.RaftPort) joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), firstNodePorts.RaftPort)
} }
// Create and start instance // Create and start instance
@ -853,7 +904,7 @@ func (cm *ClusterManager) handleNodeReplacementOffer(msg *MetadataMessage) error
continue // Skip failed nodes (would need proper tracking) continue // Skip failed nodes (would need proper tracking)
} }
ports := dbMeta.PortMappings[nodeID] ports := dbMeta.PortMappings[nodeID]
joinAddr = fmt.Sprintf("http://%s:%d", cm.getAdvertiseAddress(), ports.RaftPort) joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.RaftPort)
break break
} }

View File

@ -330,6 +330,35 @@ func (cm *ClusterManager) CreateDatabase(dbName string, replicationFactor int) e
cm.logger.Info("Database creation confirmation broadcasted", cm.logger.Info("Database creation confirmation broadcasted",
zap.String("database", dbName)) zap.String("database", dbName))
// Create and broadcast metadata immediately
metadata := &DatabaseMetadata{
DatabaseName: dbName,
NodeIDs: make([]string, len(assignments)),
PortMappings: make(map[string]PortPair),
Status: StatusInitializing,
CreatedAt: time.Now(),
LastAccessed: time.Now(),
LeaderNodeID: assignments[0].NodeID,
Version: 1,
VectorClock: NewVectorClock(),
}
for i, node := range assignments {
metadata.NodeIDs[i] = node.NodeID
metadata.PortMappings[node.NodeID] = PortPair{
HTTPPort: node.HTTPPort,
RaftPort: node.RaftPort,
}
}
// Store locally
cm.metadataStore.SetDatabase(metadata)
// Broadcast to all nodes
syncMsg := MetadataSync{Metadata: metadata}
syncData, _ := MarshalMetadataMessage(MsgMetadataSync, cm.nodeID, syncMsg)
cm.pubsubAdapter.Publish(cm.ctx, topic, syncData)
} }
return nil return nil
@ -443,15 +472,212 @@ func (cm *ClusterManager) monitorHealth() {
// checkDatabaseHealth checks if all active databases are healthy // checkDatabaseHealth checks if all active databases are healthy
func (cm *ClusterManager) checkDatabaseHealth() { func (cm *ClusterManager) checkDatabaseHealth() {
cm.mu.RLock() cm.mu.RLock()
defer cm.mu.RUnlock() failedDatabases := make([]string, 0)
for dbName, instance := range cm.activeClusters { for dbName, instance := range cm.activeClusters {
if !instance.IsRunning() { if !instance.IsRunning() {
cm.logger.Warn("Database instance is not running", failedDatabases = append(failedDatabases, dbName)
zap.String("database", dbName))
// TODO: Implement recovery logic
} }
} }
cm.mu.RUnlock()
// Attempt recovery for failed databases
for _, dbName := range failedDatabases {
cm.logger.Warn("Database instance is not running, attempting recovery",
zap.String("database", dbName))
// Get database metadata
metadata := cm.metadataStore.GetDatabase(dbName)
if metadata == nil {
cm.logger.Error("Cannot recover database: metadata not found",
zap.String("database", dbName))
continue
}
// Check if this node is still supposed to host this database
isMember := false
for _, nodeID := range metadata.NodeIDs {
if nodeID == cm.nodeID {
isMember = true
break
}
}
if !isMember {
cm.logger.Info("Node is no longer a member of database, removing from active clusters",
zap.String("database", dbName))
cm.mu.Lock()
delete(cm.activeClusters, dbName)
cm.mu.Unlock()
continue
}
// Attempt to restart the database instance
go cm.attemptDatabaseRecovery(dbName, metadata)
}
}
// attemptDatabaseRecovery attempts to recover a failed database instance
func (cm *ClusterManager) attemptDatabaseRecovery(dbName string, metadata *DatabaseMetadata) {
cm.logger.Info("Attempting database recovery",
zap.String("database", dbName))
// Check if we have quorum before attempting recovery
if !cm.hasQuorum(metadata) {
cm.logger.Warn("Cannot recover database: insufficient quorum",
zap.String("database", dbName),
zap.Int("required", (len(metadata.NodeIDs)/2)+1),
zap.Int("active", len(cm.getActiveMembers(metadata))))
return
}
// Determine if this node should be the leader
isLeader := metadata.LeaderNodeID == cm.nodeID
// Get ports for this database
ports, exists := metadata.PortMappings[cm.nodeID]
if !exists {
cm.logger.Error("Cannot recover database: port mapping not found",
zap.String("database", dbName),
zap.String("node_id", cm.nodeID))
return
}
// Create advertised addresses
advHTTPAddr := fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.HTTPPort)
advRaftAddr := fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.RaftPort)
// Create new instance
instance := NewRQLiteInstance(
dbName,
ports,
cm.dataDir,
advHTTPAddr,
advRaftAddr,
cm.logger.With(zap.String("database", dbName)),
)
// Determine join address if not leader
var joinAddr string
if !isLeader && len(metadata.NodeIDs) > 1 {
// Get list of active members
activeMembers := cm.getActiveMembers(metadata)
// Prefer the leader if healthy
for _, nodeID := range activeMembers {
if nodeID == metadata.LeaderNodeID && nodeID != cm.nodeID {
if leaderPorts, exists := metadata.PortMappings[nodeID]; exists {
joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort)
cm.logger.Info("Recovery: joining healthy leader",
zap.String("database", dbName),
zap.String("leader_node", nodeID),
zap.String("join_address", joinAddr))
break
}
}
}
// If leader not available, try any other healthy node
if joinAddr == "" {
for _, nodeID := range activeMembers {
if nodeID != cm.nodeID {
if nodePorts, exists := metadata.PortMappings[nodeID]; exists {
joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), nodePorts.RaftPort)
cm.logger.Info("Recovery: joining healthy follower",
zap.String("database", dbName),
zap.String("node", nodeID),
zap.String("join_address", joinAddr))
break
}
}
}
}
// If no healthy nodes found, warn and fail
if joinAddr == "" {
cm.logger.Error("Cannot recover: no healthy nodes available to join",
zap.String("database", dbName),
zap.Int("total_nodes", len(metadata.NodeIDs)),
zap.Int("active_nodes", len(activeMembers)))
return
}
}
// Check if instance has existing state
if instance.hasExistingData() {
wasInCluster := instance.wasInCluster()
cm.logger.Info("Recovery: found existing RQLite state",
zap.String("database", dbName),
zap.Bool("is_leader", isLeader),
zap.Bool("was_in_cluster", wasInCluster))
// For leaders with existing cluster state, peer config will be cleared
if isLeader && wasInCluster {
cm.logger.Info("Recovery: leader will clear peer configuration for clean restart",
zap.String("database", dbName))
}
// For followers, ensure join address is valid and will use join-as
if !isLeader {
if joinAddr == "" {
cm.logger.Error("Cannot recover follower without join address",
zap.String("database", dbName))
return
}
if wasInCluster {
cm.logger.Info("Recovery: follower will rejoin cluster as voter",
zap.String("database", dbName),
zap.String("join_address", joinAddr))
}
}
}
// Start the instance
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := instance.Start(ctx, isLeader, joinAddr); err != nil {
cm.logger.Error("Database recovery failed",
zap.String("database", dbName),
zap.Error(err))
return
}
// Update active clusters
cm.mu.Lock()
cm.activeClusters[dbName] = instance
cm.mu.Unlock()
// Update metadata status
metadata.Status = StatusActive
UpdateDatabaseMetadata(metadata, cm.nodeID)
cm.metadataStore.SetDatabase(metadata)
// Broadcast status update
statusUpdate := DatabaseStatusUpdate{
DatabaseName: dbName,
NodeID: cm.nodeID,
Status: StatusActive,
HTTPPort: ports.HTTPPort,
RaftPort: ports.RaftPort,
}
msgData, err := MarshalMetadataMessage(MsgDatabaseStatusUpdate, cm.nodeID, statusUpdate)
if err != nil {
cm.logger.Warn("Failed to marshal status update during recovery",
zap.String("database", dbName),
zap.Error(err))
} else {
topic := "/debros/metadata/v1"
if err := cm.pubsubAdapter.Publish(cm.ctx, topic, msgData); err != nil {
cm.logger.Warn("Failed to publish status update during recovery",
zap.String("database", dbName),
zap.Error(err))
}
}
cm.logger.Info("Database recovery completed successfully",
zap.String("database", dbName),
zap.Bool("is_leader", isLeader))
} }
// monitorIdleDatabases monitors for idle databases to hibernate // monitorIdleDatabases monitors for idle databases to hibernate
@ -582,6 +808,50 @@ func (cm *ClusterManager) reconcileOrphanedData() {
zap.Int("orphans_found", orphanCount)) zap.Int("orphans_found", orphanCount))
} }
// isNodeHealthy checks if a node is healthy and recently active
func (cm *ClusterManager) isNodeHealthy(nodeID string) bool {
node := cm.metadataStore.GetNode(nodeID)
if node == nil {
return false
}
// Consider node stale if not heard from in 30 seconds
staleDuration := 30 * time.Second
if time.Since(node.LastHealthCheck) > staleDuration {
return false
}
return node.IsHealthy
}
// hasQuorum checks if there are enough healthy nodes for a database
func (cm *ClusterManager) hasQuorum(metadata *DatabaseMetadata) bool {
if metadata == nil {
return false
}
activeNodes := 0
for _, nodeID := range metadata.NodeIDs {
if cm.isNodeHealthy(nodeID) || nodeID == cm.nodeID {
activeNodes++
}
}
requiredQuorum := (len(metadata.NodeIDs) / 2) + 1
return activeNodes >= requiredQuorum
}
// getActiveMembers returns list of active member node IDs for a database
func (cm *ClusterManager) getActiveMembers(metadata *DatabaseMetadata) []string {
activeMembers := make([]string, 0)
for _, nodeID := range metadata.NodeIDs {
if cm.isNodeHealthy(nodeID) || nodeID == cm.nodeID {
activeMembers = append(activeMembers, nodeID)
}
}
return activeMembers
}
// initializeSystemDatabase creates and starts the system database on this node // initializeSystemDatabase creates and starts the system database on this node
func (cm *ClusterManager) initializeSystemDatabase() error { func (cm *ClusterManager) initializeSystemDatabase() error {
systemDBName := cm.config.SystemDatabaseName systemDBName := cm.config.SystemDatabaseName
@ -600,6 +870,34 @@ func (cm *ClusterManager) initializeSystemDatabase() error {
// Check if system database already exists in metadata // Check if system database already exists in metadata
existingDB := cm.metadataStore.GetDatabase(systemDBName) existingDB := cm.metadataStore.GetDatabase(systemDBName)
if existingDB != nil { if existingDB != nil {
cm.logger.Info("System database already exists in metadata, checking local instance",
zap.String("database", systemDBName),
zap.Int("member_count", len(existingDB.NodeIDs)),
zap.Strings("members", existingDB.NodeIDs))
// Check quorum status
hasQuorum := cm.hasQuorum(existingDB)
cm.logger.Info("System database quorum status",
zap.String("database", systemDBName),
zap.Bool("has_quorum", hasQuorum),
zap.Int("active_members", len(cm.getActiveMembers(existingDB))))
// Check if this node is a member
isMember := false
for _, nodeID := range existingDB.NodeIDs {
if nodeID == cm.nodeID {
isMember = true
break
}
}
if !isMember {
cm.logger.Info("This node is not a member of existing system database, skipping creation",
zap.String("database", systemDBName))
return nil
}
// Fall through to wait for activation
cm.logger.Info("System database already exists in metadata, waiting for it to become active", cm.logger.Info("System database already exists in metadata, waiting for it to become active",
zap.String("database", systemDBName)) zap.String("database", systemDBName))
} else { } else {

View File

@ -8,6 +8,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings"
"syscall" "syscall"
"time" "time"
@ -44,6 +45,67 @@ func NewRQLiteInstance(dbName string, ports PortPair, dataDir string, advHTTPAdd
} }
} }
// hasExistingData checks if the data directory contains existing RQLite state
func (ri *RQLiteInstance) hasExistingData() bool {
// Check for raft.db which indicates existing cluster state
raftDBPath := filepath.Join(ri.DataDir, "raft.db")
if _, err := os.Stat(raftDBPath); err == nil {
return true
}
return false
}
// wasInCluster checks if this node was previously part of a Raft cluster
func (ri *RQLiteInstance) wasInCluster() bool {
if !ri.hasExistingData() {
return false
}
// Check for peers.json which indicates cluster membership
peersFile := filepath.Join(ri.DataDir, "raft", "peers.json")
if _, err := os.Stat(peersFile); err == nil {
return true
}
// Alternative: check raft log size - if > 0, was in cluster
raftDBPath := filepath.Join(ri.DataDir, "raft.db")
if info, err := os.Stat(raftDBPath); err == nil && info.Size() > 0 {
return true
}
return false
}
// clearRaftState removes Raft log and snapshots to allow clean leader restart
// This is more aggressive than clearPeerConfiguration and resets the entire cluster state
func (ri *RQLiteInstance) clearRaftState() error {
// Remove Raft log and cluster config without touching SQLite data
paths := []string{
filepath.Join(ri.DataDir, "raft.db"),
filepath.Join(ri.DataDir, "raft"), // contains peers.json/info and other raft state
filepath.Join(ri.DataDir, "rsnapshots"), // raft snapshots
}
var firstErr error
for _, p := range paths {
if _, err := os.Stat(p); err == nil {
if err := os.RemoveAll(p); err != nil && firstErr == nil {
firstErr = err
ri.logger.Warn("Failed to remove Raft state path",
zap.String("database", ri.DatabaseName),
zap.String("path", p),
zap.Error(err))
} else {
ri.logger.Info("Cleared Raft state path",
zap.String("database", ri.DatabaseName),
zap.String("path", p))
}
}
}
return firstErr
}
// Start starts the rqlite subprocess // Start starts the rqlite subprocess
func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr string) error { func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr string) error {
// Create data directory // Create data directory
@ -51,6 +113,35 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str
return fmt.Errorf("failed to create data directory: %w", err) return fmt.Errorf("failed to create data directory: %w", err)
} }
// Check for existing data and clear peer config if needed BEFORE starting RQLite
hasExisting := ri.hasExistingData()
if hasExisting {
wasInCluster := ri.wasInCluster()
ri.logger.Info("Found existing RQLite data, reusing state",
zap.String("database", ri.DatabaseName),
zap.String("data_dir", ri.DataDir),
zap.Bool("is_leader", isLeader),
zap.Bool("was_in_cluster", wasInCluster),
zap.String("join_address", joinAddr))
// Clear Raft state for leaders with existing cluster state BEFORE starting RQLite
if isLeader && wasInCluster && joinAddr == "" {
ri.logger.Warn("Leader has existing cluster state - clearing Raft state for clean restart",
zap.String("database", ri.DatabaseName))
if err := ri.clearRaftState(); err != nil {
ri.logger.Warn("Failed to clear Raft state", zap.Error(err))
} else {
ri.logger.Info("Cleared Raft log and snapshots; node will bootstrap as single-node and accept joins",
zap.String("database", ri.DatabaseName))
}
}
} else {
ri.logger.Info("No existing RQLite data, starting fresh",
zap.String("database", ri.DatabaseName),
zap.String("data_dir", ri.DataDir),
zap.Bool("is_leader", isLeader))
}
// Build rqlited command // Build rqlited command
args := []string{ args := []string{
"-http-addr", fmt.Sprintf("0.0.0.0:%d", ri.HTTPPort), "-http-addr", fmt.Sprintf("0.0.0.0:%d", ri.HTTPPort),
@ -68,11 +159,34 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str
// Add join address if this is a follower // Add join address if this is a follower
if !isLeader && joinAddr != "" { if !isLeader && joinAddr != "" {
args = append(args, "-join", joinAddr) args = append(args, "-join", joinAddr)
// Force rejoin if we have existing cluster state
if ri.wasInCluster() {
args = append(args, "-join-as", "voter")
ri.logger.Info("Follower will rejoin cluster as voter",
zap.String("database", ri.DatabaseName))
}
} }
// Add data directory as positional argument // Add data directory as positional argument
args = append(args, ri.DataDir) args = append(args, ri.DataDir)
// Check for conflicting configuration: bootstrap + existing data
if isLeader && !strings.Contains(strings.Join(args, " "), "-join") {
// This is a bootstrap scenario (leader without join)
if ri.hasExistingData() {
ri.logger.Warn("Detected existing Raft state, will not bootstrap",
zap.String("database", ri.DatabaseName),
zap.String("data_dir", ri.DataDir))
// Remove any bootstrap-only flags if they exist
// RQLite will detect existing state and continue as member
}
}
// For followers with existing data, verify join address is set
if !isLeader && joinAddr == "" && ri.hasExistingData() {
return fmt.Errorf("follower has existing Raft state but no join address provided")
}
ri.logger.Info("Starting RQLite instance", ri.logger.Info("Starting RQLite instance",
zap.String("database", ri.DatabaseName), zap.String("database", ri.DatabaseName),
zap.Int("http_port", ri.HTTPPort), zap.Int("http_port", ri.HTTPPort),
@ -84,18 +198,24 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str
// Start RQLite process // Start RQLite process
ri.Cmd = exec.Command("rqlited", args...) ri.Cmd = exec.Command("rqlited", args...)
// Optionally capture stdout/stderr for debugging // Capture stdout/stderr for debugging
// ri.Cmd.Stdout = os.Stdout ri.Cmd.Stdout = os.Stdout
// ri.Cmd.Stderr = os.Stderr ri.Cmd.Stderr = os.Stderr
if err := ri.Cmd.Start(); err != nil { if err := ri.Cmd.Start(); err != nil {
return fmt.Errorf("failed to start rqlited: %w", err) return fmt.Errorf("failed to start rqlited binary (check if installed): %w", err)
} }
// Wait for RQLite to be ready // Wait for RQLite to be ready
if err := ri.waitForReady(ctx); err != nil { if err := ri.waitForReady(ctx); err != nil {
ri.logger.Error("RQLite failed to become ready",
zap.String("database", ri.DatabaseName),
zap.String("data_dir", ri.DataDir),
zap.Int("http_port", ri.HTTPPort),
zap.Int("raft_port", ri.RaftPort),
zap.Error(err))
ri.Stop() ri.Stop()
return fmt.Errorf("rqlited failed to become ready: %w", err) return fmt.Errorf("rqlited failed to become ready (check logs above): %w", err)
} }
// Create connection // Create connection
@ -106,17 +226,34 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str
} }
ri.Connection = conn ri.Connection = conn
// Wait for SQL availability // For leaders, wait for SQL to be immediately available
if err := ri.waitForSQLAvailable(ctx); err != nil { // For followers, SQL will become available after cluster sync
ri.Stop() if isLeader {
return fmt.Errorf("rqlited SQL not available: %w", err) if err := ri.waitForSQLAvailable(ctx); err != nil {
ri.Stop()
return fmt.Errorf("leader SQL not available: %w", err)
}
ri.logger.Info("Leader SQL is ready",
zap.String("database", ri.DatabaseName))
} else {
// For followers, just verify the node joined successfully
if err := ri.waitForClusterJoin(ctx, 30*time.Second); err != nil {
ri.logger.Warn("Follower may not have joined cluster yet, but continuing",
zap.String("database", ri.DatabaseName),
zap.Error(err))
// Don't fail - SQL will become available eventually
} else {
ri.logger.Info("Follower successfully joined cluster",
zap.String("database", ri.DatabaseName))
}
} }
ri.Status = StatusActive ri.Status = StatusActive
ri.LastQuery = time.Now() ri.LastQuery = time.Now()
ri.logger.Info("RQLite instance started successfully", ri.logger.Info("RQLite instance started successfully",
zap.String("database", ri.DatabaseName)) zap.String("database", ri.DatabaseName),
zap.Bool("is_leader", isLeader))
return nil return nil
} }
@ -205,17 +342,105 @@ func (ri *RQLiteInstance) waitForSQLAvailable(ctx context.Context) error {
case <-ticker.C: case <-ticker.C:
_, err := ri.Connection.QueryOne("SELECT 1") _, err := ri.Connection.QueryOne("SELECT 1")
if err == nil { if err == nil {
ri.logger.Info("SQL queries are now available",
zap.String("database", ri.DatabaseName),
zap.Int("attempts", i+1))
return nil return nil
} }
// Log every 5 seconds with more detail
if i%5 == 0 { if i%5 == 0 {
ri.logger.Debug("Waiting for RQLite SQL availability", ri.logger.Debug("Waiting for RQLite SQL availability",
zap.String("database", ri.DatabaseName), zap.String("database", ri.DatabaseName),
zap.Int("attempt", i+1),
zap.Int("max_attempts", 60),
zap.Error(err)) zap.Error(err))
} }
} }
} }
return fmt.Errorf("rqlited SQL not available within timeout") return fmt.Errorf("rqlited SQL not available within timeout (60 seconds)")
}
// waitForClusterJoin waits for a follower node to successfully join the cluster
// This checks the /status endpoint for cluster membership info
func (ri *RQLiteInstance) waitForClusterJoin(ctx context.Context, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
client := &http.Client{Timeout: 2 * time.Second}
statusURL := fmt.Sprintf("http://localhost:%d/status", ri.HTTPPort)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
attempts := 0
for time.Now().Before(deadline) {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
attempts++
resp, err := client.Get(statusURL)
if err != nil {
if attempts%5 == 0 {
ri.logger.Debug("Checking cluster join status",
zap.String("database", ri.DatabaseName),
zap.Int("attempt", attempts),
zap.Error(err))
}
continue
}
// Check if status code is OK
if resp.StatusCode == http.StatusOK {
resp.Body.Close()
// If we can query status, the node has likely joined
// Try a simple SQL query to confirm
if ri.Connection != nil {
_, err := ri.Connection.QueryOne("SELECT 1")
if err == nil {
return nil // SQL is ready!
}
}
// Even if SQL not ready, status endpoint being available is good enough
if attempts >= 5 {
// After a few attempts, accept status endpoint as sufficient
return nil
}
} else {
resp.Body.Close()
}
}
}
return fmt.Errorf("cluster join check timed out after %v", timeout)
}
// StartBackgroundSQLReadinessCheck starts a background check for SQL readiness
// This is used for followers that may take time to sync cluster state
func (ri *RQLiteInstance) StartBackgroundSQLReadinessCheck(ctx context.Context, onReady func()) {
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if ri.Connection != nil {
_, err := ri.Connection.QueryOne("SELECT 1")
if err == nil {
ri.logger.Info("Follower SQL is now ready",
zap.String("database", ri.DatabaseName))
if onReady != nil {
onReady()
}
return // SQL is ready, stop checking
}
}
}
}
}()
} }
// UpdateLastQuery updates the last query timestamp // UpdateLastQuery updates the last query timestamp