diff --git a/cmd/rqlite-mcp/main.go b/cmd/rqlite-mcp/main.go index acf5348..5a8690e 100644 --- a/cmd/rqlite-mcp/main.go +++ b/cmd/rqlite-mcp/main.go @@ -60,6 +60,12 @@ type MCPServer struct { } func NewMCPServer(rqliteURL string) (*MCPServer, error) { + // Disable gorqlite cluster discovery to avoid /nodes timeouts from unreachable peers + if strings.Contains(rqliteURL, "?") { + rqliteURL += "&disableClusterDiscovery=true" + } else { + rqliteURL += "?disableClusterDiscovery=true" + } conn, err := gorqlite.Open(rqliteURL) if err != nil { return nil, err diff --git a/pkg/client/database_client.go b/pkg/client/database_client.go index d60417a..e1104e7 100644 --- a/pkg/client/database_client.go +++ b/pkg/client/database_client.go @@ -224,7 +224,14 @@ func (d *DatabaseClientImpl) connectToAvailableNode() (*gorqlite.Connection, err var conn *gorqlite.Connection var err error - conn, err = gorqlite.Open(rqliteURL) + // Disable gorqlite cluster discovery to avoid /nodes timeouts from unreachable peers + openURL := rqliteURL + if strings.Contains(openURL, "?") { + openURL += "&disableClusterDiscovery=true" + } else { + openURL += "?disableClusterDiscovery=true" + } + conn, err = gorqlite.Open(openURL) if err != nil { lastErr = err continue diff --git a/pkg/environments/production/installers/ipfs.go b/pkg/environments/production/installers/ipfs.go index 9138c4e..d8fa906 100644 --- a/pkg/environments/production/installers/ipfs.go +++ b/pkg/environments/production/installers/ipfs.go @@ -256,6 +256,9 @@ func (ii *IPFSInstaller) configureAddresses(ipfsRepoPath string, apiPort, gatewa addresses["Swarm"] = []string{ fmt.Sprintf("/ip4/%s/tcp/%d", bindIP, swarmPort), } + // Clear NoAnnounce — the server profile blocks private IPs (10.0.0.0/8, etc.) + // which prevents nodes from advertising their WireGuard swarm addresses via DHT + addresses["NoAnnounce"] = []string{} config["Addresses"] = addresses diff --git a/pkg/environments/production/services.go b/pkg/environments/production/services.go index 1fffcd3..6fec55e 100644 --- a/pkg/environments/production/services.go +++ b/pkg/environments/production/services.go @@ -235,10 +235,7 @@ SyslogIdentifier=debros-node PrivateTmp=yes ProtectHome=read-only -ProtectKernelTunables=yes -ProtectKernelModules=yes ProtectControlGroups=yes -RestrictRealtime=yes ReadWritePaths=%[2]s /etc/systemd/system [Install] diff --git a/pkg/gateway/dependencies.go b/pkg/gateway/dependencies.go index b728d55..d3f1eda 100644 --- a/pkg/gateway/dependencies.go +++ b/pkg/gateway/dependencies.go @@ -126,6 +126,11 @@ func initializeRQLite(logger *logging.ColoredLogger, cfg *Config, deps *Dependen dsn = "http://localhost:5001" } + if strings.Contains(dsn, "?") { + dsn += "&disableClusterDiscovery=true" + } else { + dsn += "?disableClusterDiscovery=true" + } db, err := sql.Open("rqlite", dsn) if err != nil { return fmt.Errorf("failed to open rqlite sql db: %w", err) diff --git a/pkg/node/wireguard_sync.go b/pkg/node/wireguard_sync.go index f2ec56d..451b525 100644 --- a/pkg/node/wireguard_sync.go +++ b/pkg/node/wireguard_sync.go @@ -212,6 +212,9 @@ func (n *Node) startWireGuardSyncLoop(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: + // Re-register self on every tick to pick up IPFS peer ID if it wasn't + // ready at startup (INSERT OR REPLACE is idempotent) + n.ensureWireGuardSelfRegistered(ctx) if err := n.syncWireGuardPeers(ctx); err != nil { n.logger.ComponentWarn(logging.ComponentNode, "WireGuard peer sync failed", zap.Error(err)) } diff --git a/pkg/rqlite/adapter.go b/pkg/rqlite/adapter.go index fa84c7c..8b1b1cf 100644 --- a/pkg/rqlite/adapter.go +++ b/pkg/rqlite/adapter.go @@ -17,7 +17,7 @@ type RQLiteAdapter struct { // NewRQLiteAdapter creates a new adapter that provides sql.DB interface for RQLite func NewRQLiteAdapter(manager *RQLiteManager) (*RQLiteAdapter, error) { // Use the gorqlite database/sql driver - db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", manager.config.RQLitePort)) + db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", manager.config.RQLitePort)) if err != nil { return nil, fmt.Errorf("failed to open RQLite SQL connection: %w", err) } diff --git a/pkg/rqlite/migrations.go b/pkg/rqlite/migrations.go index 1506062..e817960 100644 --- a/pkg/rqlite/migrations.go +++ b/pkg/rqlite/migrations.go @@ -119,7 +119,7 @@ func ApplyMigrationsDirs(ctx context.Context, db *sql.DB, dirs []string, logger // ApplyMigrationsFromManager is a convenience helper bound to RQLiteManager. func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error { - db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) + db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort)) if err != nil { return fmt.Errorf("open rqlite db: %w", err) } @@ -130,7 +130,7 @@ func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error { // ApplyMigrationsDirs is the multi-dir variant on RQLiteManager. func (r *RQLiteManager) ApplyMigrationsDirs(ctx context.Context, dirs []string) error { - db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) + db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort)) if err != nil { return fmt.Errorf("open rqlite db: %w", err) } @@ -474,7 +474,7 @@ func ApplyEmbeddedMigrations(ctx context.Context, db *sql.DB, fsys fs.FS, logger // ApplyEmbeddedMigrations is a convenience helper bound to RQLiteManager. func (r *RQLiteManager) ApplyEmbeddedMigrations(ctx context.Context, fsys fs.FS) error { - db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) + db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort)) if err != nil { return fmt.Errorf("open rqlite db: %w", err) } diff --git a/pkg/rqlite/process.go b/pkg/rqlite/process.go index f938776..9d1a947 100644 --- a/pkg/rqlite/process.go +++ b/pkg/rqlite/process.go @@ -118,10 +118,12 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) args = append(args, "-join", joinArg, "-join-as", r.discoverConfig.RaftAdvAddress, "-join-attempts", "30", "-join-interval", "10s") // Check if this node should join as a non-voter (read replica). - // The discovery service determines voter status based on WG IP ordering. - if r.discoveryService != nil && !r.discoveryService.IsVoter(r.discoverConfig.RaftAdvAddress) { - r.logger.Info("Joining as non-voter (read replica)", - zap.String("raft_address", r.discoverConfig.RaftAdvAddress)) + // Query the join target's /nodes endpoint to count existing voters, + // rather than relying on LibP2P peer count which is incomplete at join time. + if shouldBeNonVoter := r.checkShouldBeNonVoter(r.config.RQLiteJoinAddress); shouldBeNonVoter { + r.logger.Info("Joining as non-voter (read replica) - cluster already has max voters", + zap.String("raft_address", r.discoverConfig.RaftAdvAddress), + zap.Int("max_voters", MaxDefaultVoters)) args = append(args, "-raft-non-voter") } } @@ -168,16 +170,26 @@ func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error { var conn *gorqlite.Connection var err error maxConnectAttempts := 10 - connectBackoff := 500 * time.Millisecond + connectBackoff := 1 * time.Second + + // Use disableClusterDiscovery=true to avoid gorqlite calling /nodes on Open(). + // The /nodes endpoint probes all cluster members including unreachable ones, + // which can block for the full HTTP timeout (~10s per attempt). + // This is safe because rqlited followers automatically forward writes to the leader. + connURL := fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort) for attempt := 0; attempt < maxConnectAttempts; attempt++ { - conn, err = gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort)) + conn, err = gorqlite.Open(connURL) if err == nil { r.connection = conn break } - if strings.Contains(err.Error(), "store is not open") { + errMsg := err.Error() + if strings.Contains(errMsg, "store is not open") { + r.logger.Debug("RQLite not ready yet, retrying", + zap.Int("attempt", attempt+1), + zap.Error(err)) time.Sleep(connectBackoff) connectBackoff = time.Duration(float64(connectBackoff) * 1.5) if connectBackoff > 5*time.Second { @@ -288,6 +300,58 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error { return nil } +// checkShouldBeNonVoter queries the join target's /nodes endpoint to count +// existing voters. Returns true if the cluster already has MaxDefaultVoters +// voters, meaning this node should join as a non-voter. +func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool { + // Derive HTTP API URL from the join address (which is a raft address like 10.0.0.1:7001) + host := joinAddress + if strings.HasPrefix(host, "http://") || strings.HasPrefix(host, "https://") { + host = strings.TrimPrefix(host, "http://") + host = strings.TrimPrefix(host, "https://") + } + if idx := strings.Index(host, ":"); idx != -1 { + host = host[:idx] + } + nodesURL := fmt.Sprintf("http://%s:%d/nodes?timeout=2s", host, r.config.RQLitePort) + + client := tlsutil.NewHTTPClient(5 * time.Second) + resp, err := client.Get(nodesURL) + if err != nil { + r.logger.Warn("Could not query /nodes to check voter count, defaulting to voter", zap.Error(err)) + return false + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + r.logger.Warn("Could not read /nodes response, defaulting to voter", zap.Error(err)) + return false + } + + var nodes map[string]struct { + Voter bool `json:"voter"` + Reachable bool `json:"reachable"` + } + if err := json.Unmarshal(body, &nodes); err != nil { + r.logger.Warn("Could not parse /nodes response, defaulting to voter", zap.Error(err)) + return false + } + + voterCount := 0 + for _, n := range nodes { + if n.Voter && n.Reachable { + voterCount++ + } + } + + r.logger.Info("Checked existing voter count from join target", + zap.Int("reachable_voters", voterCount), + zap.Int("max_voters", MaxDefaultVoters)) + + return voterCount >= MaxDefaultVoters +} + // waitForJoinTarget waits until the join target's HTTP status becomes reachable func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress string, timeout time.Duration) error { deadline := time.Now().Add(timeout)