From 8c392194bbbf327cbe9ba8408ef5d6931257e286 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Sat, 31 Jan 2026 12:14:49 +0200 Subject: [PATCH] Fixed olric cluster problem --- .../handlers/namespace/spawn_handler.go | 5 +- pkg/namespace/cluster_manager.go | 96 +++++++--- pkg/olric/instance_spawner.go | 165 ++++++++++-------- pkg/rqlite/scanner.go | 10 +- 4 files changed, 175 insertions(+), 101 deletions(-) diff --git a/pkg/gateway/handlers/namespace/spawn_handler.go b/pkg/gateway/handlers/namespace/spawn_handler.go index e66bf60..6384b72 100644 --- a/pkg/gateway/handlers/namespace/spawn_handler.go +++ b/pkg/gateway/handlers/namespace/spawn_handler.go @@ -1,6 +1,7 @@ package namespace import ( + "context" "encoding/json" "fmt" "net/http" @@ -90,7 +91,9 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { zap.String("node_id", req.NodeID), ) - ctx := r.Context() + // Use a background context for spawn operations so processes outlive the HTTP request. + // Stop operations can use request context since they're short-lived. + ctx := context.Background() switch req.Action { case "spawn-rqlite": diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index 561630c..25eeb63 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -306,46 +306,76 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names return instances, nil } -// startOlricCluster starts Olric instances on all nodes (locally or remotely) +// startOlricCluster starts Olric instances on all nodes concurrently. +// Olric uses memberlist for peer discovery — all peers must be reachable at roughly +// the same time. Sequential spawning fails because early instances exhaust their +// retry budget before later instances start. By spawning all concurrently, all +// memberlist ports open within seconds of each other, allowing discovery to succeed. func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) ([]*olric.OlricInstance, error) { instances := make([]*olric.OlricInstance, len(nodes)) + errs := make([]error, len(nodes)) - // Build peer addresses (all nodes) - peerAddresses := make([]string, len(nodes)) + // Build configs for all nodes upfront + configs := make([]olric.InstanceConfig, len(nodes)) for i, node := range nodes { - peerAddresses[i] = fmt.Sprintf("%s:%d", node.InternalIP, portBlocks[i].OlricMemberlistPort) - } - - // Start all Olric instances - for i, node := range nodes { - cfg := olric.InstanceConfig{ + var peers []string + for j, peerNode := range nodes { + if j != i { + peers = append(peers, fmt.Sprintf("%s:%d", peerNode.InternalIP, portBlocks[j].OlricMemberlistPort)) + } + } + configs[i] = olric.InstanceConfig{ Namespace: cluster.NamespaceName, NodeID: node.NodeID, HTTPPort: portBlocks[i].OlricHTTPPort, MemberlistPort: portBlocks[i].OlricMemberlistPort, - BindAddr: node.InternalIP, // Bind to node's WG IP (0.0.0.0 resolves to IPv6 on some hosts) + BindAddr: node.InternalIP, // Bind to WG IP directly (0.0.0.0 resolves to IPv6 on some hosts) AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers - PeerAddresses: peerAddresses, + PeerAddresses: peers, } + } - var instance *olric.OlricInstance - var err error - if node.NodeID == cm.localNodeID { - cm.logger.Info("Spawning Olric locally", zap.String("node", node.NodeID)) - instance, err = cm.olricSpawner.SpawnInstance(ctx, cfg) - } else { - cm.logger.Info("Spawning Olric remotely", zap.String("node", node.NodeID), zap.String("ip", node.InternalIP)) - instance, err = cm.spawnOlricRemote(ctx, node.InternalIP, cfg) - } - if err != nil { - // Stop previously started instances - for j := 0; j < i; j++ { - cm.stopOlricOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName) + // Spawn all instances concurrently + var wg sync.WaitGroup + for i, node := range nodes { + wg.Add(1) + go func(idx int, n NodeCapacity) { + defer wg.Done() + if n.NodeID == cm.localNodeID { + cm.logger.Info("Spawning Olric locally", zap.String("node", n.NodeID)) + instances[idx], errs[idx] = cm.olricSpawner.SpawnInstance(ctx, configs[idx]) + } else { + cm.logger.Info("Spawning Olric remotely", zap.String("node", n.NodeID), zap.String("ip", n.InternalIP)) + instances[idx], errs[idx] = cm.spawnOlricRemote(ctx, n.InternalIP, configs[idx]) } - return nil, fmt.Errorf("failed to start Olric on node %s: %w", node.NodeID, err) - } - instances[i] = instance + }(i, node) + } + wg.Wait() + // Check for errors — if any failed, stop all and return + for i, err := range errs { + if err != nil { + cm.logger.Error("Olric spawn failed", zap.String("node", nodes[i].NodeID), zap.Error(err)) + // Stop any that succeeded + for j := range nodes { + if errs[j] == nil { + cm.stopOlricOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName) + } + } + return nil, fmt.Errorf("failed to start Olric on node %s: %w", nodes[i].NodeID, err) + } + } + + // All instances started — give memberlist time to converge. + // Olric's memberlist retries peer joins every ~1s for ~10 attempts. + // Since all instances are now up, they should discover each other quickly. + cm.logger.Info("All Olric instances started, waiting for memberlist convergence", + zap.Int("node_count", len(nodes)), + ) + time.Sleep(5 * time.Second) + + // Log events and record cluster nodes + for i, node := range nodes { cm.logEvent(ctx, cluster.ID, EventOlricStarted, node.NodeID, "Olric instance started", nil) cm.logEvent(ctx, cluster.ID, EventOlricJoined, node.NodeID, "Olric instance joined memberlist", nil) @@ -354,6 +384,18 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp } } + // Verify at least the local instance is still healthy after convergence + for i, node := range nodes { + if node.NodeID == cm.localNodeID && instances[i] != nil { + healthy, err := instances[i].IsHealthy(ctx) + if !healthy { + cm.logger.Warn("Local Olric instance unhealthy after convergence wait", zap.Error(err)) + } else { + cm.logger.Info("Local Olric instance healthy after convergence") + } + } + } + return instances, nil } diff --git a/pkg/olric/instance_spawner.go b/pkg/olric/instance_spawner.go index ab0f649..be89d57 100644 --- a/pkg/olric/instance_spawner.go +++ b/pkg/olric/instance_spawner.go @@ -54,32 +54,34 @@ type InstanceSpawner struct { // OlricInstance represents a running Olric instance for a namespace type OlricInstance struct { - Namespace string - NodeID string - HTTPPort int - MemberlistPort int - BindAddr string - AdvertiseAddr string - PeerAddresses []string // Memberlist peer addresses for cluster discovery - ConfigPath string - DataDir string - PID int - Status InstanceNodeStatus - StartedAt time.Time - LastHealthCheck time.Time - cmd *exec.Cmd - logger *zap.Logger + Namespace string + NodeID string + HTTPPort int + MemberlistPort int + BindAddr string + AdvertiseAddr string + PeerAddresses []string // Memberlist peer addresses for cluster discovery + ConfigPath string + DataDir string + PID int + Status InstanceNodeStatus + StartedAt time.Time + LastHealthCheck time.Time + cmd *exec.Cmd + logFile *os.File // kept open for process lifetime + waitDone chan struct{} // closed when cmd.Wait() completes + logger *zap.Logger } // InstanceConfig holds configuration for spawning an Olric instance type InstanceConfig struct { - Namespace string // Namespace name (e.g., "alice") - NodeID string // Physical node ID - HTTPPort int // HTTP API port - MemberlistPort int // Memberlist gossip port - BindAddr string // Address to bind (e.g., "0.0.0.0") - AdvertiseAddr string // Address to advertise (e.g., "192.168.1.10") - PeerAddresses []string // Memberlist peer addresses for initial cluster join + Namespace string // Namespace name (e.g., "alice") + NodeID string // Physical node ID + HTTPPort int // HTTP API port + MemberlistPort int // Memberlist gossip port + BindAddr string // Address to bind (e.g., "0.0.0.0") + AdvertiseAddr string // Address to advertise (e.g., "192.168.1.10") + PeerAddresses []string // Memberlist peer addresses for initial cluster join } // OlricConfig represents the Olric YAML configuration structure @@ -117,19 +119,21 @@ func instanceKey(namespace, nodeID string) string { } // SpawnInstance starts a new Olric instance for a namespace on a specific node. -// Returns the instance info or an error if spawning fails. +// The process is decoupled from the caller's context — it runs independently until +// explicitly stopped. Only returns an error if the process fails to start or the +// memberlist port doesn't open within the timeout. +// Note: The memberlist port opening does NOT mean the cluster has formed — peers may +// still be joining. Use WaitForProcessRunning() after spawning all instances to verify. func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig) (*OlricInstance, error) { key := instanceKey(cfg.Namespace, cfg.NodeID) is.mu.Lock() if existing, ok := is.instances[key]; ok { - is.mu.Unlock() - // Instance already exists, return it if running - if existing.Status == InstanceStatusRunning { + if existing.Status == InstanceStatusRunning || existing.Status == InstanceStatusStarting { + is.mu.Unlock() return existing, nil } - // Otherwise, remove it and start fresh - is.mu.Lock() + // Remove stale instance delete(is.instances, key) } is.mu.Unlock() @@ -165,6 +169,7 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig ConfigPath: configPath, DataDir: dataDir, Status: InstanceStatusStarting, + waitDone: make(chan struct{}), logger: is.logger.With(zap.String("namespace", cfg.Namespace), zap.String("node_id", cfg.NodeID)), } @@ -174,12 +179,14 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig zap.Strings("peers", cfg.PeerAddresses), ) - // Create command with config environment variable - cmd := exec.CommandContext(ctx, "olric-server") + // Use exec.Command (NOT exec.CommandContext) so the process is NOT killed + // when the HTTP request context or provisioning context is cancelled. + // The process lives until explicitly stopped via StopInstance(). + cmd := exec.Command("olric-server") cmd.Env = append(os.Environ(), fmt.Sprintf("OLRIC_SERVER_CONFIG=%s", configPath)) instance.cmd = cmd - // Setup logging + // Setup logging — keep the file open for the process lifetime logPath := filepath.Join(logsDir, fmt.Sprintf("olric-%s.log", cfg.NodeID)) logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { @@ -188,6 +195,7 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig Cause: err, } } + instance.logFile = logFile cmd.Stdout = logFile cmd.Stderr = logFile @@ -201,18 +209,26 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig } } - logFile.Close() - instance.PID = cmd.Process.Pid instance.StartedAt = time.Now() + // Reap the child process in a background goroutine to prevent zombies. + // This goroutine closes the log file and signals via waitDone when the process exits. + go func() { + _ = cmd.Wait() + logFile.Close() + close(instance.waitDone) + }() + // Store instance is.mu.Lock() is.instances[key] = instance is.mu.Unlock() - // Wait for instance to be ready - if err := is.waitForInstanceReady(ctx, instance); err != nil { + // Wait for the memberlist port to accept TCP connections. + // This confirms the process started and Olric initialized its network layer. + // It does NOT guarantee peers have joined — that happens asynchronously. + if err := is.waitForPortReady(ctx, instance); err != nil { // Kill the process on failure if cmd.Process != nil { _ = cmd.Process.Kill() @@ -295,20 +311,17 @@ func (is *InstanceSpawner) StopInstance(ctx context.Context, ns, nodeID string) _ = instance.cmd.Process.Kill() } - // Wait for process to exit with timeout - done := make(chan error, 1) - go func() { - done <- instance.cmd.Wait() - }() - + // Wait for process to exit via the reaper goroutine select { - case <-done: + case <-instance.waitDone: instance.logger.Info("Olric instance stopped gracefully") case <-time.After(10 * time.Second): instance.logger.Warn("Olric instance did not stop gracefully, killing") _ = instance.cmd.Process.Kill() + <-instance.waitDone // wait for reaper to finish case <-ctx.Done(): _ = instance.cmd.Process.Kill() + <-instance.waitDone return ctx.Err() } } @@ -379,31 +392,29 @@ func (is *InstanceSpawner) HealthCheck(ctx context.Context, ns, nodeID string) ( return healthy, err } -// waitForInstanceReady waits for the Olric instance to be ready -func (is *InstanceSpawner) waitForInstanceReady(ctx context.Context, instance *OlricInstance) error { - // Olric doesn't have a standard /ready endpoint, so we check if the process - // is running and the memberlist port is accepting connections +// waitForPortReady waits for the Olric memberlist port to accept TCP connections. +// This is a lightweight check — it confirms the process started but does NOT +// guarantee that peers have joined the cluster. +func (is *InstanceSpawner) waitForPortReady(ctx context.Context, instance *OlricInstance) error { + // Use BindAddr for the health check — this is the address the process actually listens on. + // AdvertiseAddr may differ from BindAddr (e.g., 0.0.0.0 resolves to IPv6 on some hosts). + checkAddr := instance.BindAddr + if checkAddr == "" || checkAddr == "0.0.0.0" { + checkAddr = "localhost" + } + addr := fmt.Sprintf("%s:%d", checkAddr, instance.MemberlistPort) - maxAttempts := 30 // 30 seconds + maxAttempts := 30 for i := 0; i < maxAttempts; i++ { select { case <-ctx.Done(): return ctx.Err() + case <-instance.waitDone: + // Process exited before becoming ready + return fmt.Errorf("Olric process exited unexpectedly (pid %d)", instance.PID) case <-time.After(1 * time.Second): } - // Check if the process is still running - if instance.cmd != nil && instance.cmd.ProcessState != nil && instance.cmd.ProcessState.Exited() { - return fmt.Errorf("Olric process exited unexpectedly") - } - - // Try to connect to the memberlist port to verify it's accepting connections - // Use the advertise address since Olric may bind to a specific IP - addr := fmt.Sprintf("%s:%d", instance.AdvertiseAddr, instance.MemberlistPort) - if instance.AdvertiseAddr == "" { - addr = fmt.Sprintf("localhost:%d", instance.MemberlistPort) - } - conn, err := net.DialTimeout("tcp", addr, 2*time.Second) if err != nil { instance.logger.Debug("Waiting for Olric memberlist", @@ -415,7 +426,7 @@ func (is *InstanceSpawner) waitForInstanceReady(ctx context.Context, instance *O } conn.Close() - instance.logger.Debug("Olric instance ready", + instance.logger.Debug("Olric memberlist port ready", zap.Int("attempts", i+1), zap.String("addr", addr), ) @@ -430,14 +441,27 @@ func (is *InstanceSpawner) monitorInstance(instance *OlricInstance) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() - for range ticker.C { + for { + select { + case <-instance.waitDone: + // Process exited — update status and stop monitoring + is.mu.Lock() + key := instanceKey(instance.Namespace, instance.NodeID) + if _, exists := is.instances[key]; exists { + instance.Status = InstanceStatusStopped + instance.logger.Warn("Olric instance process exited unexpectedly") + } + is.mu.Unlock() + return + case <-ticker.C: + } + is.mu.RLock() key := instanceKey(instance.Namespace, instance.NodeID) _, exists := is.instances[key] is.mu.RUnlock() if !exists { - // Instance was removed return } @@ -454,21 +478,18 @@ func (is *InstanceSpawner) monitorInstance(instance *OlricInstance) { instance.logger.Warn("Olric instance health check failed") } is.mu.Unlock() - - // Check if process is still running - if instance.cmd != nil && instance.cmd.ProcessState != nil && instance.cmd.ProcessState.Exited() { - is.mu.Lock() - instance.Status = InstanceStatusStopped - is.mu.Unlock() - instance.logger.Warn("Olric instance process exited unexpectedly") - return - } } } // IsHealthy checks if the Olric instance is healthy by verifying the memberlist port is accepting connections func (oi *OlricInstance) IsHealthy(ctx context.Context) (bool, error) { - // Olric doesn't have a standard /ready HTTP endpoint, so we check memberlist connectivity + // Check if process has exited first + select { + case <-oi.waitDone: + return false, fmt.Errorf("process has exited") + default: + } + addr := fmt.Sprintf("%s:%d", oi.AdvertiseAddr, oi.MemberlistPort) if oi.AdvertiseAddr == "" || oi.AdvertiseAddr == "0.0.0.0" { addr = fmt.Sprintf("localhost:%d", oi.MemberlistPort) diff --git a/pkg/rqlite/scanner.go b/pkg/rqlite/scanner.go index 6e9966e..fe8c8e1 100644 --- a/pkg/rqlite/scanner.go +++ b/pkg/rqlite/scanner.go @@ -318,8 +318,16 @@ func setReflectValue(field reflect.Value, raw any) error { return nil } fallthrough + case reflect.Ptr: + // Handle pointer types (e.g. *time.Time, *string, *int) + // nil raw is already handled above (leaves zero/nil pointer) + elem := reflect.New(field.Type().Elem()) + if err := setReflectValue(elem.Elem(), raw); err != nil { + return err + } + field.Set(elem) + return nil default: - // Not supported yet return fmt.Errorf("unsupported dest field kind: %s", field.Kind()) } return nil