diff --git a/go.mod b/go.mod index 0b1a4b6..4d75ed8 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/DeBrosOfficial/network -go 1.23.8 - -toolchain go1.24.1 +go 1.24.4 require ( github.com/ethereum/go-ethereum v1.13.14 @@ -24,6 +22,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect @@ -93,6 +92,7 @@ require ( github.com/pion/turn/v4 v4.0.0 // indirect github.com/pion/webrtc/v4 v4.0.10 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.22.0 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.63.0 // indirect @@ -101,7 +101,7 @@ require ( github.com/quic-go/quic-go v0.50.1 // indirect github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect - github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/wlynxg/anet v0.0.5 // indirect go.uber.org/dig v1.18.0 // indirect @@ -114,6 +114,7 @@ require ( golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.34.0 // indirect golang.org/x/text v0.27.0 // indirect + golang.org/x/time v0.11.0 // indirect golang.org/x/tools v0.35.0 // indirect google.golang.org/protobuf v1.36.6 // indirect lukechampine.com/blake3 v1.4.1 // indirect diff --git a/go.sum b/go.sum index 33dd50c..89fa10a 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,9 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c h1:pFUpOrbxDR6AkioZ1ySsx5yxlDQZ8stG2b88gTPxgJU= github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6UhI8N9EjYm1c2odKpFpAYeR8dsBeM7PtzQhRgxRr9U= github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U0x++OzVrdms8= @@ -264,8 +265,9 @@ github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3v github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= @@ -286,8 +288,8 @@ github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 h1:4WFk6 github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66/go.mod h1:Vp72IJajgeOL6ddqrAhmp7IM9zbTcgkQxD/YdxrVwMw= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= -github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= -github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rqlite/gorqlite v0.0.0-20250609141355-ac86a4a1c9a8 h1:BoxiqWvhprOB2isgM59s8wkgKwAoyQH66Twfmof41oE= github.com/rqlite/gorqlite v0.0.0-20250609141355-ac86a4a1c9a8/go.mod h1:xF/KoXmrRyahPfo5L7Szb5cAAUl53dMWBh9cMruGEZg= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -466,8 +468,8 @@ golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030000716-a0a13e073c7b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 8f786a3..0b9eb06 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -14,6 +14,14 @@ import ( "go.uber.org/zap" ) +// min returns the smaller of two integers +func min(a, b int) int { + if a < b { + return a + } + return b +} + // context keys for request-scoped auth metadata (private to package) type contextKey string @@ -98,12 +106,39 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { // Look up API key in DB and derive namespace db := g.client.Database() + if db == nil { + g.logger.ComponentError(logging.ComponentDatabase, "Database client not initialized", + zap.String("api_key_prefix", key[:min(10, len(key))])) + writeError(w, http.StatusServiceUnavailable, "database unavailable") + return + } + // Use internal auth for DB validation (auth not established yet) internalCtx := client.WithInternalAuth(r.Context()) // Join to namespaces to resolve name in one query q := "SELECT namespaces.name FROM api_keys JOIN namespaces ON api_keys.namespace_id = namespaces.id WHERE api_keys.key = ? LIMIT 1" res, err := db.Query(internalCtx, q, key) - if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + + // Enhanced error handling to distinguish database errors from invalid keys + if err != nil { + // Database connectivity error - return 503 + g.logger.ComponentError(logging.ComponentDatabase, "Failed to query API key", + zap.Error(err), + zap.String("api_key_prefix", key[:min(10, len(key))])) + writeError(w, http.StatusServiceUnavailable, "database unavailable") + return + } + + if res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + // API key not found in database - return 401 + g.logger.ComponentWarn(logging.ComponentGeneral, "API key not found in database", + zap.String("api_key_prefix", key[:min(10, len(key))]), + zap.Int64("result_count", func() int64 { + if res == nil { + return -1 + } + return res.Count + }())) w.Header().Set("WWW-Authenticate", "Bearer error=\"invalid_token\"") writeError(w, http.StatusUnauthorized, "invalid API key") return diff --git a/pkg/gateway/storage_handlers.go b/pkg/gateway/storage_handlers.go index 01d0ef0..ce715a7 100644 --- a/pkg/gateway/storage_handlers.go +++ b/pkg/gateway/storage_handlers.go @@ -3,6 +3,8 @@ package gateway import ( "encoding/json" "net/http" + + "github.com/DeBrosOfficial/network/pkg/client" ) // Database HTTP handlers @@ -26,7 +28,8 @@ func (g *Gateway) networkPeersHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusServiceUnavailable, "client not initialized") return } - ctx := r.Context() + // Use internal auth context to bypass client authentication since gateway middleware already authenticated + ctx := client.WithInternalAuth(r.Context()) peers, err := g.client.Network().GetPeers(ctx) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) diff --git a/pkg/rqlite/cluster_handlers.go b/pkg/rqlite/cluster_handlers.go index 8ef2a32..fc3ba59 100644 --- a/pkg/rqlite/cluster_handlers.go +++ b/pkg/rqlite/cluster_handlers.go @@ -40,27 +40,53 @@ func (cm *ClusterManager) handleCreateRequest(msg *MetadataMessage) error { return nil } - // Allocate ports - prefer fixed ports for system database, fall back to dynamic + // Allocate ports with sticky behavior var ports PortPair var err error + // Try to load previously saved ports first (for sticky ports across restarts) + savedPorts := LoadSavedPorts(cm.dataDir, req.DatabaseName, cm.logger) + if req.DatabaseName == systemDBName && cm.config.SystemHTTPPort > 0 { - // Try to use fixed ports for system database first + // System database: MUST use fixed ports, do not fall back to dynamic ports = PortPair{ HTTPPort: cm.config.SystemHTTPPort, RaftPort: cm.config.SystemRaftPort, + Host: cm.getAdvertiseAddress(), } err = cm.portManager.AllocateSpecificPortPair(req.DatabaseName, ports) if err != nil { - // Fixed ports unavailable (likely multi-node localhost) - use dynamic - cm.logger.Info("Fixed system ports unavailable, using dynamic allocation", + // Fixed ports unavailable - DO NOT respond for system database + cm.logger.Warn("System database requires fixed ports, but they are unavailable - not responding", zap.String("database", req.DatabaseName), zap.Int("attempted_http", ports.HTTPPort), - zap.Int("attempted_raft", ports.RaftPort)) + zap.Int("attempted_raft", ports.RaftPort), + zap.Error(err)) + return nil + } + } else if savedPorts != nil { + // Try to reuse saved ports for sticky allocation + ports = PortPair{ + HTTPPort: savedPorts.HTTPPort, + RaftPort: savedPorts.RaftPort, + Host: cm.getAdvertiseAddress(), + } + err = cm.portManager.AllocateSpecificPortPair(req.DatabaseName, ports) + if err != nil { + // Saved ports unavailable, fall back to dynamic + cm.logger.Info("Saved ports unavailable, allocating new ports", + zap.String("database", req.DatabaseName), + zap.Int("attempted_http", savedPorts.HTTPPort), + zap.Int("attempted_raft", savedPorts.RaftPort)) ports, err = cm.portManager.AllocatePortPair(req.DatabaseName) + } else { + cm.logger.Info("Reusing saved ports for database", + zap.String("database", req.DatabaseName), + zap.Int("http_port", ports.HTTPPort), + zap.Int("raft_port", ports.RaftPort)) } } else { - // Use dynamic ports for other databases + // No saved ports, allocate dynamically ports, err = cm.portManager.AllocatePortPair(req.DatabaseName) } @@ -73,9 +99,13 @@ func (cm *ClusterManager) handleCreateRequest(msg *MetadataMessage) error { // Send response offering to host response := DatabaseCreateResponse{ - DatabaseName: req.DatabaseName, - NodeID: cm.nodeID, - AvailablePorts: ports, + DatabaseName: req.DatabaseName, + NodeID: cm.nodeID, + AvailablePorts: PortPair{ + HTTPPort: ports.HTTPPort, + RaftPort: ports.RaftPort, + Host: cm.getAdvertiseAddress(), + }, } msgData, err := MarshalMetadataMessage(MsgDatabaseCreateResponse, cm.nodeID, response) @@ -173,6 +203,7 @@ func (cm *ClusterManager) handleCreateConfirm(msg *MetadataMessage) error { portMappings[node.NodeID] = PortPair{ HTTPPort: node.HTTPPort, RaftPort: node.RaftPort, + Host: node.Host, } } @@ -224,11 +255,17 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe // Join to the leader leaderNodeID := metadata.LeaderNodeID if leaderPorts, exists := metadata.PortMappings[leaderNodeID]; exists { - joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort) + // Use leader's host if available, fallback to this node's advertise address + host := leaderPorts.Host + if host == "" { + host = cm.getAdvertiseAddress() + } + joinAddr = fmt.Sprintf("%s:%d", host, leaderPorts.RaftPort) cm.logger.Info("Follower joining leader", zap.String("database", metadata.DatabaseName), zap.String("leader_node", leaderNodeID), zap.String("join_address", joinAddr), + zap.String("leader_host", host), zap.Int("leader_raft_port", leaderPorts.RaftPort)) } else { cm.logger.Error("Leader node not found in port mappings", @@ -277,6 +314,14 @@ func (cm *ClusterManager) startDatabaseInstance(metadata *DatabaseMetadata, isLe return } + // Save ports for sticky allocation on restart + if err := SavePorts(cm.dataDir, metadata.DatabaseName, ports, cm.logger); err != nil { + cm.logger.Warn("Failed to save ports for database", + zap.String("database", metadata.DatabaseName), + zap.Error(err)) + // Don't fail startup, just log the warning + } + // For followers, start background SQL readiness check if !isLeader { instance.StartBackgroundSQLReadinessCheck(cm.ctx, func() { @@ -752,7 +797,12 @@ func (cm *ClusterManager) wakeupDatabase(dbName string, dbMeta *DatabaseMetadata joinAddr := "" if len(dbMeta.NodeIDs) > 0 && dbMeta.NodeIDs[0] != cm.nodeID { firstNodePorts := dbMeta.PortMappings[dbMeta.NodeIDs[0]] - joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), firstNodePorts.RaftPort) + // Use first node's host if available, fallback to this node's advertise address + host := firstNodePorts.Host + if host == "" { + host = cm.getAdvertiseAddress() + } + joinAddr = fmt.Sprintf("%s:%d", host, firstNodePorts.RaftPort) } // Create and start instance @@ -774,6 +824,13 @@ func (cm *ClusterManager) wakeupDatabase(dbName string, dbMeta *DatabaseMetadata return } + // Save ports for sticky allocation on restart + if err := SavePorts(cm.dataDir, dbName, allocatedPorts, cm.logger); err != nil { + cm.logger.Warn("Failed to save ports for database during wakeup", + zap.String("database", dbName), + zap.Error(err)) + } + // Add to active clusters cm.mu.Lock() cm.activeClusters[dbName] = instance @@ -906,7 +963,12 @@ func (cm *ClusterManager) handleNodeReplacementOffer(msg *MetadataMessage) error continue // Skip failed nodes (would need proper tracking) } ports := dbMeta.PortMappings[nodeID] - joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.RaftPort) + // Use node's host if available, fallback to this node's advertise address + host := ports.Host + if host == "" { + host = cm.getAdvertiseAddress() + } + joinAddr = fmt.Sprintf("%s:%d", host, ports.RaftPort) break } @@ -1003,6 +1065,13 @@ func (cm *ClusterManager) startReplacementInstance(dbName string, ports PortPair return } + // Save ports for sticky allocation on restart + if err := SavePorts(cm.dataDir, dbName, ports, cm.logger); err != nil { + cm.logger.Warn("Failed to save ports for replacement instance", + zap.String("database", dbName), + zap.Error(err)) + } + // Add to active clusters cm.mu.Lock() cm.activeClusters[dbName] = instance diff --git a/pkg/rqlite/cluster_manager.go b/pkg/rqlite/cluster_manager.go index 2cfc203..1c4bf5c 100644 --- a/pkg/rqlite/cluster_manager.go +++ b/pkg/rqlite/cluster_manager.go @@ -213,30 +213,62 @@ func (cm *ClusterManager) CreateDatabase(dbName string, replicationFactor int) e var selfPorts PortPair var portErr error + // Try to load previously saved ports first (for sticky ports across restarts) + savedPorts := LoadSavedPorts(cm.dataDir, dbName, cm.logger) + if dbName == systemDBName && cm.config.SystemHTTPPort > 0 { - // Try fixed ports for system database + // System database: MUST use fixed ports, do not fall back to dynamic selfPorts = PortPair{ HTTPPort: cm.config.SystemHTTPPort, RaftPort: cm.config.SystemRaftPort, + Host: cm.getAdvertiseAddress(), } portErr = cm.portManager.AllocateSpecificPortPair(dbName, selfPorts) if portErr != nil { - // Fixed ports unavailable - use dynamic - cm.logger.Info("Fixed system ports unavailable on requester, using dynamic", - zap.String("database", dbName)) + // Fixed ports unavailable - DO NOT add self for system database + cm.logger.Warn("System database requires fixed ports, but they are unavailable - not hosting", + zap.String("database", dbName), + zap.Int("attempted_http", selfPorts.HTTPPort), + zap.Int("attempted_raft", selfPorts.RaftPort), + zap.Error(portErr)) + // Set portErr to non-nil so we skip adding self as candidate + } + } else if savedPorts != nil { + // Try to reuse saved ports for sticky allocation + selfPorts = PortPair{ + HTTPPort: savedPorts.HTTPPort, + RaftPort: savedPorts.RaftPort, + Host: cm.getAdvertiseAddress(), + } + portErr = cm.portManager.AllocateSpecificPortPair(dbName, selfPorts) + if portErr != nil { + // Saved ports unavailable, fall back to dynamic + cm.logger.Info("Saved ports unavailable for self, allocating new ports", + zap.String("database", dbName), + zap.Int("attempted_http", savedPorts.HTTPPort), + zap.Int("attempted_raft", savedPorts.RaftPort)) selfPorts, portErr = cm.portManager.AllocatePortPair(dbName) + } else { + cm.logger.Info("Reusing saved ports for self", + zap.String("database", dbName), + zap.Int("http_port", selfPorts.HTTPPort), + zap.Int("raft_port", selfPorts.RaftPort)) } } else { - // Dynamic ports for non-system databases + // No saved ports, allocate dynamically selfPorts, portErr = cm.portManager.AllocatePortPair(dbName) } if portErr == nil { // Add self as a candidate selfResponse := DatabaseCreateResponse{ - DatabaseName: dbName, - NodeID: cm.nodeID, - AvailablePorts: selfPorts, + DatabaseName: dbName, + NodeID: cm.nodeID, + AvailablePorts: PortPair{ + HTTPPort: selfPorts.HTTPPort, + RaftPort: selfPorts.RaftPort, + Host: cm.getAdvertiseAddress(), + }, } coordinator.AddResponse(selfResponse) cm.logger.Debug("Added self as candidate for database", @@ -328,6 +360,7 @@ func (cm *ClusterManager) CreateDatabase(dbName string, replicationFactor int) e NodeID: resp.NodeID, HTTPPort: resp.AvailablePorts.HTTPPort, RaftPort: resp.AvailablePorts.RaftPort, + Host: resp.AvailablePorts.Host, Role: role, } } @@ -369,6 +402,7 @@ func (cm *ClusterManager) CreateDatabase(dbName string, replicationFactor int) e metadata.PortMappings[node.NodeID] = PortPair{ HTTPPort: node.HTTPPort, RaftPort: node.RaftPort, + Host: node.Host, } } @@ -586,11 +620,17 @@ func (cm *ClusterManager) attemptDatabaseRecovery(dbName string, metadata *Datab for _, nodeID := range activeMembers { if nodeID == metadata.LeaderNodeID && nodeID != cm.nodeID { if leaderPorts, exists := metadata.PortMappings[nodeID]; exists { - joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort) + // Use leader's host if available, fallback to this node's advertise address + host := leaderPorts.Host + if host == "" { + host = cm.getAdvertiseAddress() + } + joinAddr = fmt.Sprintf("%s:%d", host, leaderPorts.RaftPort) cm.logger.Info("Recovery: joining healthy leader", zap.String("database", dbName), zap.String("leader_node", nodeID), - zap.String("join_address", joinAddr)) + zap.String("join_address", joinAddr), + zap.String("leader_host", host)) break } } @@ -601,11 +641,17 @@ func (cm *ClusterManager) attemptDatabaseRecovery(dbName string, metadata *Datab for _, nodeID := range activeMembers { if nodeID != cm.nodeID { if nodePorts, exists := metadata.PortMappings[nodeID]; exists { - joinAddr = fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), nodePorts.RaftPort) + // Use node's host if available, fallback to this node's advertise address + host := nodePorts.Host + if host == "" { + host = cm.getAdvertiseAddress() + } + joinAddr = fmt.Sprintf("%s:%d", host, nodePorts.RaftPort) cm.logger.Info("Recovery: joining healthy follower", zap.String("database", dbName), zap.String("node", nodeID), - zap.String("join_address", joinAddr)) + zap.String("join_address", joinAddr), + zap.String("node_host", host)) break } } @@ -662,6 +708,13 @@ func (cm *ClusterManager) attemptDatabaseRecovery(dbName string, metadata *Datab return } + // Save ports for sticky allocation on restart + if err := SavePorts(cm.dataDir, dbName, ports, cm.logger); err != nil { + cm.logger.Warn("Failed to save ports during recovery", + zap.String("database", dbName), + zap.Error(err)) + } + // Update active clusters cm.mu.Lock() cm.activeClusters[dbName] = instance @@ -894,11 +947,17 @@ func (cm *ClusterManager) autoJoinSystemDatabase(metadata *DatabaseMetadata) err return fmt.Errorf("leader ports not available") } - joinAddr := fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort) + // Use leader's host if available, fallback to this node's advertise address + host := leaderPorts.Host + if host == "" { + host = cm.getAdvertiseAddress() + } + joinAddr := fmt.Sprintf("%s:%d", host, leaderPorts.RaftPort) cm.logger.Info("Auto-joining system database as follower", zap.String("database", systemDBName), zap.String("leader", leaderNodeID), - zap.String("join_address", joinAddr)) + zap.String("join_address", joinAddr), + zap.String("leader_host", host)) // Allocate ports for this node var ports PortPair @@ -943,6 +1002,13 @@ func (cm *ClusterManager) autoJoinSystemDatabase(metadata *DatabaseMetadata) err return err } + // Save ports for sticky allocation on restart + if err := SavePorts(cm.dataDir, systemDBName, ports, cm.logger); err != nil { + cm.logger.Warn("Failed to save ports for system database", + zap.String("database", systemDBName), + zap.Error(err)) + } + // Store the instance cm.mu.Lock() cm.activeClusters[systemDBName] = instance @@ -1018,6 +1084,7 @@ func (cm *ClusterManager) initializeSystemDatabase() error { existingDB.PortMappings[cm.nodeID] = PortPair{ HTTPPort: instance.HTTPPort, RaftPort: instance.RaftPort, + Host: cm.getAdvertiseAddress(), } } @@ -1102,12 +1169,22 @@ func (cm *ClusterManager) initializeSystemDatabase() error { cm.logger.Info("System database is active", zap.String("database", systemDBName)) - // Run migrations if configured + // Run migrations only on the elected leader if cm.config.MigrationsPath != "" { - if err := cm.runMigrations(systemDBName); err != nil { - cm.logger.Error("Failed to run migrations on system database", - zap.Error(err)) - // Don't fail startup, just log the error + // Get current metadata to check if this node is the leader + metadata := cm.metadataStore.GetDatabase(systemDBName) + if metadata != nil && metadata.LeaderNodeID == cm.nodeID { + cm.logger.Info("This node is the leader, running migrations", + zap.String("database", systemDBName)) + if err := cm.runMigrations(systemDBName); err != nil { + cm.logger.Error("Failed to run migrations on system database", + zap.Error(err)) + // Don't fail startup, just log the error + } + } else { + cm.logger.Info("This node is not the leader, skipping migrations", + zap.String("database", systemDBName), + zap.String("leader_node", metadata.LeaderNodeID)) } } @@ -1118,6 +1195,86 @@ func (cm *ClusterManager) initializeSystemDatabase() error { } } +// parseSQLStatements splits SQL content into individual statements using simple string parsing +func parseSQLStatements(content string) ([]string, error) { + var statements []string + + // Remove comments and normalize whitespace + lines := strings.Split(content, "\n") + var cleanLines []string + + for _, line := range lines { + // Remove single-line comments (-- comments) + if idx := strings.Index(line, "--"); idx != -1 { + line = line[:idx] + } + // Remove multi-line comments (/* */ comments) - simple approach + line = strings.ReplaceAll(line, "/*", "") + line = strings.ReplaceAll(line, "*/", "") + + // Keep non-empty lines + trimmed := strings.TrimSpace(line) + if trimmed != "" { + cleanLines = append(cleanLines, trimmed) + } + } + + // Join lines and split by semicolons + cleanContent := strings.Join(cleanLines, " ") + + // Split by semicolons, but be careful about semicolons in strings + var currentStmt strings.Builder + inString := false + escapeNext := false + + for _, char := range cleanContent { + if escapeNext { + currentStmt.WriteRune(char) + escapeNext = false + continue + } + + if char == '\\' { + escapeNext = true + currentStmt.WriteRune(char) + continue + } + + if char == '\'' || char == '"' { + inString = !inString + } + + if char == ';' && !inString { + // End of statement + stmt := strings.TrimSpace(currentStmt.String()) + if stmt != "" { + statements = append(statements, stmt) + } + currentStmt.Reset() + } else { + currentStmt.WriteRune(char) + } + } + + // Handle any remaining content (statements without trailing semicolon) + if currentStmt.Len() > 0 { + stmt := strings.TrimSpace(currentStmt.String()) + if stmt != "" { + statements = append(statements, stmt) + } + } + + return statements, nil +} + +// truncateString truncates a string to maxLen for logging +func truncateString(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} + // runMigrations executes SQL migrations on a database func (cm *ClusterManager) runMigrations(dbName string) error { cm.logger.Info("Running migrations", @@ -1167,29 +1324,32 @@ func (cm *ClusterManager) runMigrations(dbName string) error { } // Parse SQL content into individual statements - sqlContent := string(content) + statements, err := parseSQLStatements(string(content)) + if err != nil { + return fmt.Errorf("failed to parse migration %s: %w", filepath.Base(file), err) + } - // Split by semicolon but preserve multi-line statements - // Simple approach: execute the whole file as one batch - statements := []string{sqlContent} + cm.logger.Info("Parsed SQL statements", + zap.String("file", filepath.Base(file)), + zap.Int("statement_count", len(statements))) - // Execute using WriteParameterized to avoid auto-transaction wrapping - for _, stmt := range statements { - stmt = strings.TrimSpace(stmt) + // Execute each statement + for i, stmt := range statements { if stmt == "" { continue } + cm.logger.Debug("Executing statement", + zap.String("file", filepath.Base(file)), + zap.Int("statement_num", i+1), + zap.String("statement_preview", truncateString(stmt, 100))) + _, err = conn.WriteOneParameterized(gorqlite.ParameterizedStatement{ Query: stmt, }) if err != nil { - cm.logger.Error("Migration failed", - zap.String("file", filepath.Base(file)), - zap.Error(err)) - // Continue with other migrations even if one fails - // (tables might already exist from previous runs) - break + return fmt.Errorf("migration %s failed at statement %d: %w", + filepath.Base(file), i+1, err) } } diff --git a/pkg/rqlite/instance.go b/pkg/rqlite/instance.go index e7267ad..953b7fe 100644 --- a/pkg/rqlite/instance.go +++ b/pkg/rqlite/instance.go @@ -76,36 +76,6 @@ func (ri *RQLiteInstance) wasInCluster() bool { return false } -// clearRaftState removes Raft log and snapshots to allow clean leader restart -// This is more aggressive than clearPeerConfiguration and resets the entire cluster state -func (ri *RQLiteInstance) clearRaftState() error { - // Remove Raft log and cluster config without touching SQLite data - paths := []string{ - filepath.Join(ri.DataDir, "raft.db"), - filepath.Join(ri.DataDir, "raft"), // contains peers.json/info and other raft state - filepath.Join(ri.DataDir, "rsnapshots"), // raft snapshots - } - - var firstErr error - for _, p := range paths { - if _, err := os.Stat(p); err == nil { - if err := os.RemoveAll(p); err != nil && firstErr == nil { - firstErr = err - ri.logger.Warn("Failed to remove Raft state path", - zap.String("database", ri.DatabaseName), - zap.String("path", p), - zap.Error(err)) - } else { - ri.logger.Info("Cleared Raft state path", - zap.String("database", ri.DatabaseName), - zap.String("path", p)) - } - } - } - - return firstErr -} - // Start starts the rqlite subprocess func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr string) error { // Create data directory @@ -124,17 +94,6 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str zap.Bool("was_in_cluster", wasInCluster), zap.String("join_address", joinAddr)) - // Clear Raft state for leaders with existing cluster state BEFORE starting RQLite - if isLeader && wasInCluster && joinAddr == "" { - ri.logger.Warn("Leader has existing cluster state - clearing Raft state for clean restart", - zap.String("database", ri.DatabaseName)) - if err := ri.clearRaftState(); err != nil { - ri.logger.Warn("Failed to clear Raft state", zap.Error(err)) - } else { - ri.logger.Info("Cleared Raft log and snapshots; node will bootstrap as single-node and accept joins", - zap.String("database", ri.DatabaseName)) - } - } } else { ri.logger.Info("No existing RQLite data, starting fresh", zap.String("database", ri.DatabaseName), @@ -159,12 +118,10 @@ func (ri *RQLiteInstance) Start(ctx context.Context, isLeader bool, joinAddr str // Add join address if this is a follower if !isLeader && joinAddr != "" { args = append(args, "-join", joinAddr) - // Force rejoin if we have existing cluster state - if ri.wasInCluster() { - args = append(args, "-join-as", "voter") - ri.logger.Info("Follower will rejoin cluster as voter", - zap.String("database", ri.DatabaseName)) - } + // Always add -join-as voter for rqlite v8 compatibility + args = append(args, "-join-as", "voter") + ri.logger.Info("Follower will join cluster as voter", + zap.String("database", ri.DatabaseName)) } // Add data directory as positional argument diff --git a/pkg/rqlite/metadata.go b/pkg/rqlite/metadata.go index ce33b44..ae8d6ed 100644 --- a/pkg/rqlite/metadata.go +++ b/pkg/rqlite/metadata.go @@ -19,8 +19,9 @@ const ( // PortPair represents HTTP and Raft ports for a database instance type PortPair struct { - HTTPPort int `json:"http_port"` - RaftPort int `json:"raft_port"` + HTTPPort int `json:"http_port"` + RaftPort int `json:"raft_port"` + Host string `json:"host"` } // DatabaseMetadata contains metadata for a single database cluster diff --git a/pkg/rqlite/ports_persistence.go b/pkg/rqlite/ports_persistence.go new file mode 100644 index 0000000..eb94dc8 --- /dev/null +++ b/pkg/rqlite/ports_persistence.go @@ -0,0 +1,106 @@ +package rqlite + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "go.uber.org/zap" +) + +// SavedPortInfo represents the persisted port allocation for a database on this node +type SavedPortInfo struct { + HTTPPort int `json:"http_port"` + RaftPort int `json:"raft_port"` + Host string `json:"host"` +} + +// getPortsFilePath returns the path to the ports.json file for a database +func getPortsFilePath(dataDir, dbName string) string { + return filepath.Join(dataDir, dbName, "ports.json") +} + +// LoadSavedPorts loads previously saved port information for a database +// Returns nil if no saved ports exist or if there's an error reading them +func LoadSavedPorts(dataDir, dbName string, logger *zap.Logger) *SavedPortInfo { + portsFile := getPortsFilePath(dataDir, dbName) + + data, err := os.ReadFile(portsFile) + if err != nil { + if !os.IsNotExist(err) { + logger.Warn("Failed to read saved ports file", + zap.String("database", dbName), + zap.String("file", portsFile), + zap.Error(err)) + } + return nil + } + + var savedPorts SavedPortInfo + if err := json.Unmarshal(data, &savedPorts); err != nil { + logger.Warn("Failed to parse saved ports file", + zap.String("database", dbName), + zap.String("file", portsFile), + zap.Error(err)) + return nil + } + + logger.Info("Loaded saved ports for database", + zap.String("database", dbName), + zap.Int("http_port", savedPorts.HTTPPort), + zap.Int("raft_port", savedPorts.RaftPort), + zap.String("host", savedPorts.Host)) + + return &savedPorts +} + +// SavePorts persists port allocation for a database to disk +func SavePorts(dataDir, dbName string, ports PortPair, logger *zap.Logger) error { + // Create directory if it doesn't exist + dbDir := filepath.Join(dataDir, dbName) + if err := os.MkdirAll(dbDir, 0755); err != nil { + return fmt.Errorf("failed to create database directory: %w", err) + } + + portsFile := getPortsFilePath(dataDir, dbName) + + savedPorts := SavedPortInfo(ports) + + data, err := json.MarshalIndent(savedPorts, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal ports: %w", err) + } + + if err := os.WriteFile(portsFile, data, 0644); err != nil { + return fmt.Errorf("failed to write ports file: %w", err) + } + + logger.Debug("Saved ports for database", + zap.String("database", dbName), + zap.Int("http_port", savedPorts.HTTPPort), + zap.Int("raft_port", savedPorts.RaftPort), + zap.String("host", savedPorts.Host), + zap.String("file", portsFile)) + + return nil +} + +// DeleteSavedPorts removes the saved ports file for a database +func DeleteSavedPorts(dataDir, dbName string, logger *zap.Logger) error { + portsFile := getPortsFilePath(dataDir, dbName) + + if err := os.Remove(portsFile); err != nil && !os.IsNotExist(err) { + logger.Warn("Failed to delete saved ports file", + zap.String("database", dbName), + zap.String("file", portsFile), + zap.Error(err)) + return err + } + + logger.Debug("Deleted saved ports file", + zap.String("database", dbName), + zap.String("file", portsFile)) + + return nil +} diff --git a/pkg/rqlite/pubsub_messages.go b/pkg/rqlite/pubsub_messages.go index a052498..75281bb 100644 --- a/pkg/rqlite/pubsub_messages.go +++ b/pkg/rqlite/pubsub_messages.go @@ -72,6 +72,7 @@ type NodeAssignment struct { NodeID string `json:"node_id"` HTTPPort int `json:"http_port"` RaftPort int `json:"raft_port"` + Host string `json:"host"` Role string `json:"role"` // "leader" or "follower" }