diff --git a/.gitignore b/.gitignore index 01a1c94..aaf5f99 100644 --- a/.gitignore +++ b/.gitignore @@ -74,4 +74,6 @@ data/bootstrap/rqlite/ configs/ -.dev/ \ No newline at end of file +.dev/ + +.gocache/ \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b57bb7..ce9b0ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,22 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.69.14] - 2025-11-14 + +### Added +- Added support for background reconnection to the Olric cache cluster in the Gateway, improving resilience if the cache is temporarily unavailable. + +### Changed +- Improved the RQLite database client connection handling to ensure connections are properly closed and reused safely. +- RQLite Manager now updates its advertised addresses if cluster discovery provides more accurate information (e.g., replacing localhost). + +### Deprecated + +### Removed + +### Fixed +- Removed internal RQLite process management from the development runner, as RQLite is now expected to be managed externally or via Docker. + ## [0.69.13] - 2025-11-14 ### Added diff --git a/Makefile b/Makefile index 28d2509..af475ea 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.69.13 +VERSION := 0.69.14 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/client/implementations.go b/pkg/client/implementations.go index ea06381..43f9564 100644 --- a/pkg/client/implementations.go +++ b/pkg/client/implementations.go @@ -160,17 +160,31 @@ func (d *DatabaseClientImpl) isWriteOperation(sql string) bool { func (d *DatabaseClientImpl) clearConnection() { d.mu.Lock() defer d.mu.Unlock() - d.connection = nil + if d.connection != nil { + d.connection.Close() + d.connection = nil + } } // getRQLiteConnection returns a connection to RQLite, creating one if needed func (d *DatabaseClientImpl) getRQLiteConnection() (*gorqlite.Connection, error) { - d.mu.Lock() - defer d.mu.Unlock() + d.mu.RLock() + conn := d.connection + d.mu.RUnlock() - // Always try to get a fresh connection to handle leadership changes - // and node failures gracefully - return d.connectToAvailableNode() + if conn != nil { + return conn, nil + } + + newConn, err := d.connectToAvailableNode() + if err != nil { + return nil, err + } + + d.mu.Lock() + d.connection = newConn + d.mu.Unlock() + return newConn, nil } // getRQLiteNodes returns a list of RQLite node URLs with precedence: @@ -227,7 +241,6 @@ func (d *DatabaseClientImpl) connectToAvailableNode() (*gorqlite.Connection, err continue } - d.connection = conn return conn, nil } diff --git a/pkg/environments/development/runner.go b/pkg/environments/development/runner.go index 5cba913..a2272ef 100644 --- a/pkg/environments/development/runner.go +++ b/pkg/environments/development/runner.go @@ -62,7 +62,6 @@ func (pm *ProcessManager) StartAll(ctx context.Context) error { fn func(context.Context) error }{ {"IPFS", pm.startIPFS}, - {"RQLite", pm.startRQLite}, {"IPFS Cluster", pm.startIPFSCluster}, {"Olric", pm.startOlric}, {"Anon", pm.startAnon}, @@ -111,10 +110,6 @@ func (pm *ProcessManager) StopAll(ctx context.Context) error { node := topology.Nodes[i] services = append(services, fmt.Sprintf("ipfs-cluster-%s", node.Name)) } - for i := len(topology.Nodes) - 1; i >= 0; i-- { - node := topology.Nodes[i] - services = append(services, fmt.Sprintf("rqlite-%s", node.Name)) - } for i := len(topology.Nodes) - 1; i >= 0; i-- { node := topology.Nodes[i] services = append(services, fmt.Sprintf("ipfs-%s", node.Name)) @@ -150,13 +145,6 @@ func (pm *ProcessManager) Status(ctx context.Context) { fmt.Sprintf("%s IPFS", node.Name), []int{node.IPFSAPIPort, node.IPFSSwarmPort}, }) - services = append(services, struct { - name string - ports []int - }{ - fmt.Sprintf("%s RQLite", node.Name), - []int{node.RQLiteHTTPPort, node.RQLiteRaftPort}, - }) services = append(services, struct { name string ports []int @@ -942,72 +930,6 @@ func (pm *ProcessManager) ensureIPFSClusterPorts(clusterPath string, restAPIPort return nil } -func (pm *ProcessManager) startRQLite(ctx context.Context) error { - topology := DefaultTopology() - var nodes []struct { - name string - dataDir string - httpPort int - raftPort int - joinAddr string - } - - for _, nodeSpec := range topology.Nodes { - nodes = append(nodes, struct { - name string - dataDir string - httpPort int - raftPort int - joinAddr string - }{ - nodeSpec.Name, - filepath.Join(pm.debrosDir, nodeSpec.DataDir, "rqlite"), - nodeSpec.RQLiteHTTPPort, - nodeSpec.RQLiteRaftPort, - nodeSpec.RQLiteJoinTarget, - }) - } - - for _, node := range nodes { - os.MkdirAll(node.dataDir, 0755) - - pidPath := filepath.Join(pm.pidsDir, fmt.Sprintf("rqlite-%s.pid", node.name)) - logPath := filepath.Join(pm.debrosDir, "logs", fmt.Sprintf("rqlite-%s.log", node.name)) - - var args []string - args = append(args, fmt.Sprintf("-http-addr=0.0.0.0:%d", node.httpPort)) - args = append(args, fmt.Sprintf("-http-adv-addr=localhost:%d", node.httpPort)) - args = append(args, fmt.Sprintf("-raft-addr=0.0.0.0:%d", node.raftPort)) - args = append(args, fmt.Sprintf("-raft-adv-addr=localhost:%d", node.raftPort)) - if node.joinAddr != "" { - args = append(args, "-join", node.joinAddr, "-join-attempts", "30", "-join-interval", "10s") - } - args = append(args, node.dataDir) - cmd := exec.CommandContext(ctx, "rqlited", args...) - - logFile, _ := os.Create(logPath) - cmd.Stdout = logFile - cmd.Stderr = logFile - - if err := cmd.Start(); err != nil { - return fmt.Errorf("failed to start rqlite-%s: %w", node.name, err) - } - - os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0644) - pm.processes[fmt.Sprintf("rqlite-%s", node.name)] = &ManagedProcess{ - Name: fmt.Sprintf("rqlite-%s", node.name), - PID: cmd.Process.Pid, - StartTime: time.Now(), - LogPath: logPath, - } - - fmt.Fprintf(pm.logWriter, "✓ RQLite (%s) started (PID: %d, HTTP: %d, Raft: %d)\n", node.name, cmd.Process.Pid, node.httpPort, node.raftPort) - } - - time.Sleep(2 * time.Second) - return nil -} - func (pm *ProcessManager) startOlric(ctx context.Context) error { pidPath := filepath.Join(pm.pidsDir, "olric.pid") logPath := filepath.Join(pm.debrosDir, "logs", "olric.log") diff --git a/pkg/gateway/cache_handlers.go b/pkg/gateway/cache_handlers.go index 938a215..0ecffdf 100644 --- a/pkg/gateway/cache_handlers.go +++ b/pkg/gateway/cache_handlers.go @@ -17,7 +17,8 @@ import ( // Cache HTTP handlers for Olric distributed cache func (g *Gateway) cacheHealthHandler(w http.ResponseWriter, r *http.Request) { - if g.olricClient == nil { + client := g.getOlricClient() + if client == nil { writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized") return } @@ -25,7 +26,7 @@ func (g *Gateway) cacheHealthHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - err := g.olricClient.Health(ctx) + err := client.Health(ctx) if err != nil { writeError(w, http.StatusServiceUnavailable, fmt.Sprintf("cache health check failed: %v", err)) return @@ -38,7 +39,8 @@ func (g *Gateway) cacheHealthHandler(w http.ResponseWriter, r *http.Request) { } func (g *Gateway) cacheGetHandler(w http.ResponseWriter, r *http.Request) { - if g.olricClient == nil { + client := g.getOlricClient() + if client == nil { writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized") return } @@ -66,8 +68,8 @@ func (g *Gateway) cacheGetHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel() - client := g.olricClient.GetClient() - dm, err := client.NewDMap(req.DMap) + olricCluster := client.GetClient() + dm, err := olricCluster.NewDMap(req.DMap) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err)) return @@ -142,7 +144,8 @@ func decodeValueFromOlric(gr *olriclib.GetResponse) (any, error) { } func (g *Gateway) cacheMultiGetHandler(w http.ResponseWriter, r *http.Request) { - if g.olricClient == nil { + client := g.getOlricClient() + if client == nil { writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized") return } @@ -175,8 +178,8 @@ func (g *Gateway) cacheMultiGetHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) defer cancel() - client := g.olricClient.GetClient() - dm, err := client.NewDMap(req.DMap) + olricCluster := client.GetClient() + dm, err := olricCluster.NewDMap(req.DMap) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err)) return @@ -220,7 +223,8 @@ func (g *Gateway) cacheMultiGetHandler(w http.ResponseWriter, r *http.Request) { } func (g *Gateway) cachePutHandler(w http.ResponseWriter, r *http.Request) { - if g.olricClient == nil { + client := g.getOlricClient() + if client == nil { writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized") return } @@ -255,8 +259,8 @@ func (g *Gateway) cachePutHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel() - client := g.olricClient.GetClient() - dm, err := client.NewDMap(req.DMap) + olricCluster := client.GetClient() + dm, err := olricCluster.NewDMap(req.DMap) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err)) return @@ -337,7 +341,8 @@ func (g *Gateway) cachePutHandler(w http.ResponseWriter, r *http.Request) { } func (g *Gateway) cacheDeleteHandler(w http.ResponseWriter, r *http.Request) { - if g.olricClient == nil { + client := g.getOlricClient() + if client == nil { writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized") return } @@ -365,8 +370,8 @@ func (g *Gateway) cacheDeleteHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) defer cancel() - client := g.olricClient.GetClient() - dm, err := client.NewDMap(req.DMap) + olricCluster := client.GetClient() + dm, err := olricCluster.NewDMap(req.DMap) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err)) return @@ -395,7 +400,8 @@ func (g *Gateway) cacheDeleteHandler(w http.ResponseWriter, r *http.Request) { } func (g *Gateway) cacheScanHandler(w http.ResponseWriter, r *http.Request) { - if g.olricClient == nil { + client := g.getOlricClient() + if client == nil { writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized") return } @@ -423,8 +429,8 @@ func (g *Gateway) cacheScanHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) defer cancel() - client := g.olricClient.GetClient() - dm, err := client.NewDMap(req.DMap) + olricCluster := client.GetClient() + dm, err := olricCluster.NewDMap(req.DMap) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err)) return diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index ea32dd4..ced87cd 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -74,6 +74,7 @@ type Gateway struct { // Olric cache client olricClient *olric.Client + olricMu sync.RWMutex // IPFS storage client ipfsClient ipfs.IPFSClient @@ -192,8 +193,9 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { olricClient, olricErr := initializeOlricClientWithRetry(olricCfg, logger) if olricErr != nil { logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize Olric cache client; cache endpoints disabled", zap.Error(olricErr)) + gw.startOlricReconnectLoop(olricCfg) } else { - gw.olricClient = olricClient + gw.setOlricClient(olricClient) logger.ComponentInfo(logging.ComponentGeneral, "Olric cache client ready", zap.Strings("servers", olricCfg.Servers), zap.Duration("timeout", olricCfg.Timeout), @@ -312,10 +314,10 @@ func (g *Gateway) Close() { if g.sqlDB != nil { _ = g.sqlDB.Close() } - if g.olricClient != nil { + if client := g.getOlricClient(); client != nil { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := g.olricClient.Close(ctx); err != nil { + if err := client.Close(ctx); err != nil { g.logger.ComponentWarn(logging.ComponentGeneral, "error during Olric client close", zap.Error(err)) } } @@ -337,6 +339,46 @@ func (g *Gateway) getLocalSubscribers(topic, namespace string) []*localSubscribe return nil } +func (g *Gateway) setOlricClient(client *olric.Client) { + g.olricMu.Lock() + defer g.olricMu.Unlock() + g.olricClient = client +} + +func (g *Gateway) getOlricClient() *olric.Client { + g.olricMu.RLock() + defer g.olricMu.RUnlock() + return g.olricClient +} + +func (g *Gateway) startOlricReconnectLoop(cfg olric.Config) { + go func() { + retryDelay := 5 * time.Second + for { + client, err := initializeOlricClientWithRetry(cfg, g.logger) + if err == nil { + g.setOlricClient(client) + g.logger.ComponentInfo(logging.ComponentGeneral, "Olric cache client connected after background retries", + zap.Strings("servers", cfg.Servers), + zap.Duration("timeout", cfg.Timeout)) + return + } + + g.logger.ComponentWarn(logging.ComponentGeneral, "Olric cache client reconnect failed", + zap.Duration("retry_in", retryDelay), + zap.Error(err)) + + time.Sleep(retryDelay) + if retryDelay < olricInitMaxBackoff { + retryDelay *= 2 + if retryDelay > olricInitMaxBackoff { + retryDelay = olricInitMaxBackoff + } + } + } + }() +} + func initializeOlricClientWithRetry(cfg olric.Config, logger *logging.ColoredLogger) (*olric.Client, error) { backoff := olricInitInitialBackoff diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index a1f6e8b..067e7b3 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -809,6 +809,11 @@ func (c *ClusterDiscoveryService) adjustSelfAdvertisedAddresses(meta *discovery. c.raftAddress = meta.RaftAddress c.httpAddress = meta.HTTPAddress c.mu.Unlock() + + if c.rqliteManager != nil { + c.rqliteManager.UpdateAdvertisedAddresses(meta.RaftAddress, meta.HTTPAddress) + } + return true } diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 5af5d7c..5a5439b 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -81,6 +81,24 @@ func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) { r.discoveryService = service } +// UpdateAdvertisedAddresses overrides the discovery advertised addresses when cluster discovery +// infers a better host than what was provided via configuration (e.g. replacing localhost). +func (r *RQLiteManager) UpdateAdvertisedAddresses(raftAddr, httpAddr string) { + if r == nil || r.discoverConfig == nil { + return + } + + if raftAddr != "" && r.discoverConfig.RaftAdvAddress != raftAddr { + r.logger.Info("Updating Raft advertised address", zap.String("addr", raftAddr)) + r.discoverConfig.RaftAdvAddress = raftAddr + } + + if httpAddr != "" && r.discoverConfig.HttpAdvAddress != httpAddr { + r.logger.Info("Updating HTTP advertised address", zap.String("addr", httpAddr)) + r.discoverConfig.HttpAdvAddress = httpAddr + } +} + // Start starts the RQLite node func (r *RQLiteManager) Start(ctx context.Context) error { rqliteDataDir, err := r.prepareDataDir()