refactor: remove RQLite service management and improve Olric client handling

- Eliminated the RQLite service management functions from the ProcessManager, streamlining the service startup and shutdown processes.
- Updated the Gateway to utilize a mutex for thread-safe access to the Olric client, enhancing concurrency handling.
- Refactored cache handler methods to consistently retrieve the Olric client, improving code clarity and maintainability.
- Added a reconnect loop for the Olric client to ensure resilience during connection failures, enhancing overall system reliability.
This commit is contained in:
anonpenguin23 2025-11-14 17:49:27 +02:00
parent 5d6de3b0b8
commit 6f7b7606b0
9 changed files with 131 additions and 107 deletions

4
.gitignore vendored
View File

@ -74,4 +74,6 @@ data/bootstrap/rqlite/
configs/
.dev/
.dev/
.gocache/

View File

@ -13,6 +13,22 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
### Deprecated
### Fixed
## [0.69.14] - 2025-11-14
### Added
- Added support for background reconnection to the Olric cache cluster in the Gateway, improving resilience if the cache is temporarily unavailable.
### Changed
- Improved the RQLite database client connection handling to ensure connections are properly closed and reused safely.
- RQLite Manager now updates its advertised addresses if cluster discovery provides more accurate information (e.g., replacing localhost).
### Deprecated
### Removed
### Fixed
- Removed internal RQLite process management from the development runner, as RQLite is now expected to be managed externally or via Docker.
## [0.69.13] - 2025-11-14
### Added

View File

@ -19,7 +19,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
VERSION := 0.69.13
VERSION := 0.69.14
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -160,17 +160,31 @@ func (d *DatabaseClientImpl) isWriteOperation(sql string) bool {
func (d *DatabaseClientImpl) clearConnection() {
d.mu.Lock()
defer d.mu.Unlock()
d.connection = nil
if d.connection != nil {
d.connection.Close()
d.connection = nil
}
}
// getRQLiteConnection returns a connection to RQLite, creating one if needed
func (d *DatabaseClientImpl) getRQLiteConnection() (*gorqlite.Connection, error) {
d.mu.Lock()
defer d.mu.Unlock()
d.mu.RLock()
conn := d.connection
d.mu.RUnlock()
// Always try to get a fresh connection to handle leadership changes
// and node failures gracefully
return d.connectToAvailableNode()
if conn != nil {
return conn, nil
}
newConn, err := d.connectToAvailableNode()
if err != nil {
return nil, err
}
d.mu.Lock()
d.connection = newConn
d.mu.Unlock()
return newConn, nil
}
// getRQLiteNodes returns a list of RQLite node URLs with precedence:
@ -227,7 +241,6 @@ func (d *DatabaseClientImpl) connectToAvailableNode() (*gorqlite.Connection, err
continue
}
d.connection = conn
return conn, nil
}

View File

@ -62,7 +62,6 @@ func (pm *ProcessManager) StartAll(ctx context.Context) error {
fn func(context.Context) error
}{
{"IPFS", pm.startIPFS},
{"RQLite", pm.startRQLite},
{"IPFS Cluster", pm.startIPFSCluster},
{"Olric", pm.startOlric},
{"Anon", pm.startAnon},
@ -111,10 +110,6 @@ func (pm *ProcessManager) StopAll(ctx context.Context) error {
node := topology.Nodes[i]
services = append(services, fmt.Sprintf("ipfs-cluster-%s", node.Name))
}
for i := len(topology.Nodes) - 1; i >= 0; i-- {
node := topology.Nodes[i]
services = append(services, fmt.Sprintf("rqlite-%s", node.Name))
}
for i := len(topology.Nodes) - 1; i >= 0; i-- {
node := topology.Nodes[i]
services = append(services, fmt.Sprintf("ipfs-%s", node.Name))
@ -150,13 +145,6 @@ func (pm *ProcessManager) Status(ctx context.Context) {
fmt.Sprintf("%s IPFS", node.Name),
[]int{node.IPFSAPIPort, node.IPFSSwarmPort},
})
services = append(services, struct {
name string
ports []int
}{
fmt.Sprintf("%s RQLite", node.Name),
[]int{node.RQLiteHTTPPort, node.RQLiteRaftPort},
})
services = append(services, struct {
name string
ports []int
@ -942,72 +930,6 @@ func (pm *ProcessManager) ensureIPFSClusterPorts(clusterPath string, restAPIPort
return nil
}
func (pm *ProcessManager) startRQLite(ctx context.Context) error {
topology := DefaultTopology()
var nodes []struct {
name string
dataDir string
httpPort int
raftPort int
joinAddr string
}
for _, nodeSpec := range topology.Nodes {
nodes = append(nodes, struct {
name string
dataDir string
httpPort int
raftPort int
joinAddr string
}{
nodeSpec.Name,
filepath.Join(pm.debrosDir, nodeSpec.DataDir, "rqlite"),
nodeSpec.RQLiteHTTPPort,
nodeSpec.RQLiteRaftPort,
nodeSpec.RQLiteJoinTarget,
})
}
for _, node := range nodes {
os.MkdirAll(node.dataDir, 0755)
pidPath := filepath.Join(pm.pidsDir, fmt.Sprintf("rqlite-%s.pid", node.name))
logPath := filepath.Join(pm.debrosDir, "logs", fmt.Sprintf("rqlite-%s.log", node.name))
var args []string
args = append(args, fmt.Sprintf("-http-addr=0.0.0.0:%d", node.httpPort))
args = append(args, fmt.Sprintf("-http-adv-addr=localhost:%d", node.httpPort))
args = append(args, fmt.Sprintf("-raft-addr=0.0.0.0:%d", node.raftPort))
args = append(args, fmt.Sprintf("-raft-adv-addr=localhost:%d", node.raftPort))
if node.joinAddr != "" {
args = append(args, "-join", node.joinAddr, "-join-attempts", "30", "-join-interval", "10s")
}
args = append(args, node.dataDir)
cmd := exec.CommandContext(ctx, "rqlited", args...)
logFile, _ := os.Create(logPath)
cmd.Stdout = logFile
cmd.Stderr = logFile
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start rqlite-%s: %w", node.name, err)
}
os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0644)
pm.processes[fmt.Sprintf("rqlite-%s", node.name)] = &ManagedProcess{
Name: fmt.Sprintf("rqlite-%s", node.name),
PID: cmd.Process.Pid,
StartTime: time.Now(),
LogPath: logPath,
}
fmt.Fprintf(pm.logWriter, "✓ RQLite (%s) started (PID: %d, HTTP: %d, Raft: %d)\n", node.name, cmd.Process.Pid, node.httpPort, node.raftPort)
}
time.Sleep(2 * time.Second)
return nil
}
func (pm *ProcessManager) startOlric(ctx context.Context) error {
pidPath := filepath.Join(pm.pidsDir, "olric.pid")
logPath := filepath.Join(pm.debrosDir, "logs", "olric.log")

View File

@ -17,7 +17,8 @@ import (
// Cache HTTP handlers for Olric distributed cache
func (g *Gateway) cacheHealthHandler(w http.ResponseWriter, r *http.Request) {
if g.olricClient == nil {
client := g.getOlricClient()
if client == nil {
writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized")
return
}
@ -25,7 +26,7 @@ func (g *Gateway) cacheHealthHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
err := g.olricClient.Health(ctx)
err := client.Health(ctx)
if err != nil {
writeError(w, http.StatusServiceUnavailable, fmt.Sprintf("cache health check failed: %v", err))
return
@ -38,7 +39,8 @@ func (g *Gateway) cacheHealthHandler(w http.ResponseWriter, r *http.Request) {
}
func (g *Gateway) cacheGetHandler(w http.ResponseWriter, r *http.Request) {
if g.olricClient == nil {
client := g.getOlricClient()
if client == nil {
writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized")
return
}
@ -66,8 +68,8 @@ func (g *Gateway) cacheGetHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
client := g.olricClient.GetClient()
dm, err := client.NewDMap(req.DMap)
olricCluster := client.GetClient()
dm, err := olricCluster.NewDMap(req.DMap)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err))
return
@ -142,7 +144,8 @@ func decodeValueFromOlric(gr *olriclib.GetResponse) (any, error) {
}
func (g *Gateway) cacheMultiGetHandler(w http.ResponseWriter, r *http.Request) {
if g.olricClient == nil {
client := g.getOlricClient()
if client == nil {
writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized")
return
}
@ -175,8 +178,8 @@ func (g *Gateway) cacheMultiGetHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
client := g.olricClient.GetClient()
dm, err := client.NewDMap(req.DMap)
olricCluster := client.GetClient()
dm, err := olricCluster.NewDMap(req.DMap)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err))
return
@ -220,7 +223,8 @@ func (g *Gateway) cacheMultiGetHandler(w http.ResponseWriter, r *http.Request) {
}
func (g *Gateway) cachePutHandler(w http.ResponseWriter, r *http.Request) {
if g.olricClient == nil {
client := g.getOlricClient()
if client == nil {
writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized")
return
}
@ -255,8 +259,8 @@ func (g *Gateway) cachePutHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
client := g.olricClient.GetClient()
dm, err := client.NewDMap(req.DMap)
olricCluster := client.GetClient()
dm, err := olricCluster.NewDMap(req.DMap)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err))
return
@ -337,7 +341,8 @@ func (g *Gateway) cachePutHandler(w http.ResponseWriter, r *http.Request) {
}
func (g *Gateway) cacheDeleteHandler(w http.ResponseWriter, r *http.Request) {
if g.olricClient == nil {
client := g.getOlricClient()
if client == nil {
writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized")
return
}
@ -365,8 +370,8 @@ func (g *Gateway) cacheDeleteHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
client := g.olricClient.GetClient()
dm, err := client.NewDMap(req.DMap)
olricCluster := client.GetClient()
dm, err := olricCluster.NewDMap(req.DMap)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err))
return
@ -395,7 +400,8 @@ func (g *Gateway) cacheDeleteHandler(w http.ResponseWriter, r *http.Request) {
}
func (g *Gateway) cacheScanHandler(w http.ResponseWriter, r *http.Request) {
if g.olricClient == nil {
client := g.getOlricClient()
if client == nil {
writeError(w, http.StatusServiceUnavailable, "Olric cache client not initialized")
return
}
@ -423,8 +429,8 @@ func (g *Gateway) cacheScanHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
client := g.olricClient.GetClient()
dm, err := client.NewDMap(req.DMap)
olricCluster := client.GetClient()
dm, err := olricCluster.NewDMap(req.DMap)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create DMap: %v", err))
return

View File

@ -74,6 +74,7 @@ type Gateway struct {
// Olric cache client
olricClient *olric.Client
olricMu sync.RWMutex
// IPFS storage client
ipfsClient ipfs.IPFSClient
@ -192,8 +193,9 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
olricClient, olricErr := initializeOlricClientWithRetry(olricCfg, logger)
if olricErr != nil {
logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize Olric cache client; cache endpoints disabled", zap.Error(olricErr))
gw.startOlricReconnectLoop(olricCfg)
} else {
gw.olricClient = olricClient
gw.setOlricClient(olricClient)
logger.ComponentInfo(logging.ComponentGeneral, "Olric cache client ready",
zap.Strings("servers", olricCfg.Servers),
zap.Duration("timeout", olricCfg.Timeout),
@ -312,10 +314,10 @@ func (g *Gateway) Close() {
if g.sqlDB != nil {
_ = g.sqlDB.Close()
}
if g.olricClient != nil {
if client := g.getOlricClient(); client != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := g.olricClient.Close(ctx); err != nil {
if err := client.Close(ctx); err != nil {
g.logger.ComponentWarn(logging.ComponentGeneral, "error during Olric client close", zap.Error(err))
}
}
@ -337,6 +339,46 @@ func (g *Gateway) getLocalSubscribers(topic, namespace string) []*localSubscribe
return nil
}
func (g *Gateway) setOlricClient(client *olric.Client) {
g.olricMu.Lock()
defer g.olricMu.Unlock()
g.olricClient = client
}
func (g *Gateway) getOlricClient() *olric.Client {
g.olricMu.RLock()
defer g.olricMu.RUnlock()
return g.olricClient
}
func (g *Gateway) startOlricReconnectLoop(cfg olric.Config) {
go func() {
retryDelay := 5 * time.Second
for {
client, err := initializeOlricClientWithRetry(cfg, g.logger)
if err == nil {
g.setOlricClient(client)
g.logger.ComponentInfo(logging.ComponentGeneral, "Olric cache client connected after background retries",
zap.Strings("servers", cfg.Servers),
zap.Duration("timeout", cfg.Timeout))
return
}
g.logger.ComponentWarn(logging.ComponentGeneral, "Olric cache client reconnect failed",
zap.Duration("retry_in", retryDelay),
zap.Error(err))
time.Sleep(retryDelay)
if retryDelay < olricInitMaxBackoff {
retryDelay *= 2
if retryDelay > olricInitMaxBackoff {
retryDelay = olricInitMaxBackoff
}
}
}
}()
}
func initializeOlricClientWithRetry(cfg olric.Config, logger *logging.ColoredLogger) (*olric.Client, error) {
backoff := olricInitInitialBackoff

View File

@ -809,6 +809,11 @@ func (c *ClusterDiscoveryService) adjustSelfAdvertisedAddresses(meta *discovery.
c.raftAddress = meta.RaftAddress
c.httpAddress = meta.HTTPAddress
c.mu.Unlock()
if c.rqliteManager != nil {
c.rqliteManager.UpdateAdvertisedAddresses(meta.RaftAddress, meta.HTTPAddress)
}
return true
}

View File

@ -81,6 +81,24 @@ func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) {
r.discoveryService = service
}
// UpdateAdvertisedAddresses overrides the discovery advertised addresses when cluster discovery
// infers a better host than what was provided via configuration (e.g. replacing localhost).
func (r *RQLiteManager) UpdateAdvertisedAddresses(raftAddr, httpAddr string) {
if r == nil || r.discoverConfig == nil {
return
}
if raftAddr != "" && r.discoverConfig.RaftAdvAddress != raftAddr {
r.logger.Info("Updating Raft advertised address", zap.String("addr", raftAddr))
r.discoverConfig.RaftAdvAddress = raftAddr
}
if httpAddr != "" && r.discoverConfig.HttpAdvAddress != httpAddr {
r.logger.Info("Updating HTTP advertised address", zap.String("addr", httpAddr))
r.discoverConfig.HttpAdvAddress = httpAddr
}
}
// Start starts the RQLite node
func (r *RQLiteManager) Start(ctx context.Context) error {
rqliteDataDir, err := r.prepareDataDir()