mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 06:43:01 +00:00
Fixed olric cluster problem
This commit is contained in:
parent
51371e199d
commit
8c392194bb
@ -1,6 +1,7 @@
|
||||
package namespace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@ -90,7 +91,9 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
zap.String("node_id", req.NodeID),
|
||||
)
|
||||
|
||||
ctx := r.Context()
|
||||
// Use a background context for spawn operations so processes outlive the HTTP request.
|
||||
// Stop operations can use request context since they're short-lived.
|
||||
ctx := context.Background()
|
||||
|
||||
switch req.Action {
|
||||
case "spawn-rqlite":
|
||||
|
||||
@ -306,46 +306,76 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names
|
||||
return instances, nil
|
||||
}
|
||||
|
||||
// startOlricCluster starts Olric instances on all nodes (locally or remotely)
|
||||
// startOlricCluster starts Olric instances on all nodes concurrently.
|
||||
// Olric uses memberlist for peer discovery — all peers must be reachable at roughly
|
||||
// the same time. Sequential spawning fails because early instances exhaust their
|
||||
// retry budget before later instances start. By spawning all concurrently, all
|
||||
// memberlist ports open within seconds of each other, allowing discovery to succeed.
|
||||
func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) ([]*olric.OlricInstance, error) {
|
||||
instances := make([]*olric.OlricInstance, len(nodes))
|
||||
errs := make([]error, len(nodes))
|
||||
|
||||
// Build peer addresses (all nodes)
|
||||
peerAddresses := make([]string, len(nodes))
|
||||
// Build configs for all nodes upfront
|
||||
configs := make([]olric.InstanceConfig, len(nodes))
|
||||
for i, node := range nodes {
|
||||
peerAddresses[i] = fmt.Sprintf("%s:%d", node.InternalIP, portBlocks[i].OlricMemberlistPort)
|
||||
}
|
||||
|
||||
// Start all Olric instances
|
||||
for i, node := range nodes {
|
||||
cfg := olric.InstanceConfig{
|
||||
var peers []string
|
||||
for j, peerNode := range nodes {
|
||||
if j != i {
|
||||
peers = append(peers, fmt.Sprintf("%s:%d", peerNode.InternalIP, portBlocks[j].OlricMemberlistPort))
|
||||
}
|
||||
}
|
||||
configs[i] = olric.InstanceConfig{
|
||||
Namespace: cluster.NamespaceName,
|
||||
NodeID: node.NodeID,
|
||||
HTTPPort: portBlocks[i].OlricHTTPPort,
|
||||
MemberlistPort: portBlocks[i].OlricMemberlistPort,
|
||||
BindAddr: node.InternalIP, // Bind to node's WG IP (0.0.0.0 resolves to IPv6 on some hosts)
|
||||
BindAddr: node.InternalIP, // Bind to WG IP directly (0.0.0.0 resolves to IPv6 on some hosts)
|
||||
AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers
|
||||
PeerAddresses: peerAddresses,
|
||||
PeerAddresses: peers,
|
||||
}
|
||||
}
|
||||
|
||||
var instance *olric.OlricInstance
|
||||
var err error
|
||||
if node.NodeID == cm.localNodeID {
|
||||
cm.logger.Info("Spawning Olric locally", zap.String("node", node.NodeID))
|
||||
instance, err = cm.olricSpawner.SpawnInstance(ctx, cfg)
|
||||
} else {
|
||||
cm.logger.Info("Spawning Olric remotely", zap.String("node", node.NodeID), zap.String("ip", node.InternalIP))
|
||||
instance, err = cm.spawnOlricRemote(ctx, node.InternalIP, cfg)
|
||||
}
|
||||
if err != nil {
|
||||
// Stop previously started instances
|
||||
for j := 0; j < i; j++ {
|
||||
cm.stopOlricOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName)
|
||||
// Spawn all instances concurrently
|
||||
var wg sync.WaitGroup
|
||||
for i, node := range nodes {
|
||||
wg.Add(1)
|
||||
go func(idx int, n NodeCapacity) {
|
||||
defer wg.Done()
|
||||
if n.NodeID == cm.localNodeID {
|
||||
cm.logger.Info("Spawning Olric locally", zap.String("node", n.NodeID))
|
||||
instances[idx], errs[idx] = cm.olricSpawner.SpawnInstance(ctx, configs[idx])
|
||||
} else {
|
||||
cm.logger.Info("Spawning Olric remotely", zap.String("node", n.NodeID), zap.String("ip", n.InternalIP))
|
||||
instances[idx], errs[idx] = cm.spawnOlricRemote(ctx, n.InternalIP, configs[idx])
|
||||
}
|
||||
return nil, fmt.Errorf("failed to start Olric on node %s: %w", node.NodeID, err)
|
||||
}
|
||||
instances[i] = instance
|
||||
}(i, node)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Check for errors — if any failed, stop all and return
|
||||
for i, err := range errs {
|
||||
if err != nil {
|
||||
cm.logger.Error("Olric spawn failed", zap.String("node", nodes[i].NodeID), zap.Error(err))
|
||||
// Stop any that succeeded
|
||||
for j := range nodes {
|
||||
if errs[j] == nil {
|
||||
cm.stopOlricOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("failed to start Olric on node %s: %w", nodes[i].NodeID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// All instances started — give memberlist time to converge.
|
||||
// Olric's memberlist retries peer joins every ~1s for ~10 attempts.
|
||||
// Since all instances are now up, they should discover each other quickly.
|
||||
cm.logger.Info("All Olric instances started, waiting for memberlist convergence",
|
||||
zap.Int("node_count", len(nodes)),
|
||||
)
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
// Log events and record cluster nodes
|
||||
for i, node := range nodes {
|
||||
cm.logEvent(ctx, cluster.ID, EventOlricStarted, node.NodeID, "Olric instance started", nil)
|
||||
cm.logEvent(ctx, cluster.ID, EventOlricJoined, node.NodeID, "Olric instance joined memberlist", nil)
|
||||
|
||||
@ -354,6 +384,18 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp
|
||||
}
|
||||
}
|
||||
|
||||
// Verify at least the local instance is still healthy after convergence
|
||||
for i, node := range nodes {
|
||||
if node.NodeID == cm.localNodeID && instances[i] != nil {
|
||||
healthy, err := instances[i].IsHealthy(ctx)
|
||||
if !healthy {
|
||||
cm.logger.Warn("Local Olric instance unhealthy after convergence wait", zap.Error(err))
|
||||
} else {
|
||||
cm.logger.Info("Local Olric instance healthy after convergence")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return instances, nil
|
||||
}
|
||||
|
||||
|
||||
@ -54,32 +54,34 @@ type InstanceSpawner struct {
|
||||
|
||||
// OlricInstance represents a running Olric instance for a namespace
|
||||
type OlricInstance struct {
|
||||
Namespace string
|
||||
NodeID string
|
||||
HTTPPort int
|
||||
MemberlistPort int
|
||||
BindAddr string
|
||||
AdvertiseAddr string
|
||||
PeerAddresses []string // Memberlist peer addresses for cluster discovery
|
||||
ConfigPath string
|
||||
DataDir string
|
||||
PID int
|
||||
Status InstanceNodeStatus
|
||||
StartedAt time.Time
|
||||
LastHealthCheck time.Time
|
||||
cmd *exec.Cmd
|
||||
logger *zap.Logger
|
||||
Namespace string
|
||||
NodeID string
|
||||
HTTPPort int
|
||||
MemberlistPort int
|
||||
BindAddr string
|
||||
AdvertiseAddr string
|
||||
PeerAddresses []string // Memberlist peer addresses for cluster discovery
|
||||
ConfigPath string
|
||||
DataDir string
|
||||
PID int
|
||||
Status InstanceNodeStatus
|
||||
StartedAt time.Time
|
||||
LastHealthCheck time.Time
|
||||
cmd *exec.Cmd
|
||||
logFile *os.File // kept open for process lifetime
|
||||
waitDone chan struct{} // closed when cmd.Wait() completes
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// InstanceConfig holds configuration for spawning an Olric instance
|
||||
type InstanceConfig struct {
|
||||
Namespace string // Namespace name (e.g., "alice")
|
||||
NodeID string // Physical node ID
|
||||
HTTPPort int // HTTP API port
|
||||
MemberlistPort int // Memberlist gossip port
|
||||
BindAddr string // Address to bind (e.g., "0.0.0.0")
|
||||
AdvertiseAddr string // Address to advertise (e.g., "192.168.1.10")
|
||||
PeerAddresses []string // Memberlist peer addresses for initial cluster join
|
||||
Namespace string // Namespace name (e.g., "alice")
|
||||
NodeID string // Physical node ID
|
||||
HTTPPort int // HTTP API port
|
||||
MemberlistPort int // Memberlist gossip port
|
||||
BindAddr string // Address to bind (e.g., "0.0.0.0")
|
||||
AdvertiseAddr string // Address to advertise (e.g., "192.168.1.10")
|
||||
PeerAddresses []string // Memberlist peer addresses for initial cluster join
|
||||
}
|
||||
|
||||
// OlricConfig represents the Olric YAML configuration structure
|
||||
@ -117,19 +119,21 @@ func instanceKey(namespace, nodeID string) string {
|
||||
}
|
||||
|
||||
// SpawnInstance starts a new Olric instance for a namespace on a specific node.
|
||||
// Returns the instance info or an error if spawning fails.
|
||||
// The process is decoupled from the caller's context — it runs independently until
|
||||
// explicitly stopped. Only returns an error if the process fails to start or the
|
||||
// memberlist port doesn't open within the timeout.
|
||||
// Note: The memberlist port opening does NOT mean the cluster has formed — peers may
|
||||
// still be joining. Use WaitForProcessRunning() after spawning all instances to verify.
|
||||
func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig) (*OlricInstance, error) {
|
||||
key := instanceKey(cfg.Namespace, cfg.NodeID)
|
||||
|
||||
is.mu.Lock()
|
||||
if existing, ok := is.instances[key]; ok {
|
||||
is.mu.Unlock()
|
||||
// Instance already exists, return it if running
|
||||
if existing.Status == InstanceStatusRunning {
|
||||
if existing.Status == InstanceStatusRunning || existing.Status == InstanceStatusStarting {
|
||||
is.mu.Unlock()
|
||||
return existing, nil
|
||||
}
|
||||
// Otherwise, remove it and start fresh
|
||||
is.mu.Lock()
|
||||
// Remove stale instance
|
||||
delete(is.instances, key)
|
||||
}
|
||||
is.mu.Unlock()
|
||||
@ -165,6 +169,7 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig
|
||||
ConfigPath: configPath,
|
||||
DataDir: dataDir,
|
||||
Status: InstanceStatusStarting,
|
||||
waitDone: make(chan struct{}),
|
||||
logger: is.logger.With(zap.String("namespace", cfg.Namespace), zap.String("node_id", cfg.NodeID)),
|
||||
}
|
||||
|
||||
@ -174,12 +179,14 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig
|
||||
zap.Strings("peers", cfg.PeerAddresses),
|
||||
)
|
||||
|
||||
// Create command with config environment variable
|
||||
cmd := exec.CommandContext(ctx, "olric-server")
|
||||
// Use exec.Command (NOT exec.CommandContext) so the process is NOT killed
|
||||
// when the HTTP request context or provisioning context is cancelled.
|
||||
// The process lives until explicitly stopped via StopInstance().
|
||||
cmd := exec.Command("olric-server")
|
||||
cmd.Env = append(os.Environ(), fmt.Sprintf("OLRIC_SERVER_CONFIG=%s", configPath))
|
||||
instance.cmd = cmd
|
||||
|
||||
// Setup logging
|
||||
// Setup logging — keep the file open for the process lifetime
|
||||
logPath := filepath.Join(logsDir, fmt.Sprintf("olric-%s.log", cfg.NodeID))
|
||||
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
if err != nil {
|
||||
@ -188,6 +195,7 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig
|
||||
Cause: err,
|
||||
}
|
||||
}
|
||||
instance.logFile = logFile
|
||||
|
||||
cmd.Stdout = logFile
|
||||
cmd.Stderr = logFile
|
||||
@ -201,18 +209,26 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig
|
||||
}
|
||||
}
|
||||
|
||||
logFile.Close()
|
||||
|
||||
instance.PID = cmd.Process.Pid
|
||||
instance.StartedAt = time.Now()
|
||||
|
||||
// Reap the child process in a background goroutine to prevent zombies.
|
||||
// This goroutine closes the log file and signals via waitDone when the process exits.
|
||||
go func() {
|
||||
_ = cmd.Wait()
|
||||
logFile.Close()
|
||||
close(instance.waitDone)
|
||||
}()
|
||||
|
||||
// Store instance
|
||||
is.mu.Lock()
|
||||
is.instances[key] = instance
|
||||
is.mu.Unlock()
|
||||
|
||||
// Wait for instance to be ready
|
||||
if err := is.waitForInstanceReady(ctx, instance); err != nil {
|
||||
// Wait for the memberlist port to accept TCP connections.
|
||||
// This confirms the process started and Olric initialized its network layer.
|
||||
// It does NOT guarantee peers have joined — that happens asynchronously.
|
||||
if err := is.waitForPortReady(ctx, instance); err != nil {
|
||||
// Kill the process on failure
|
||||
if cmd.Process != nil {
|
||||
_ = cmd.Process.Kill()
|
||||
@ -295,20 +311,17 @@ func (is *InstanceSpawner) StopInstance(ctx context.Context, ns, nodeID string)
|
||||
_ = instance.cmd.Process.Kill()
|
||||
}
|
||||
|
||||
// Wait for process to exit with timeout
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- instance.cmd.Wait()
|
||||
}()
|
||||
|
||||
// Wait for process to exit via the reaper goroutine
|
||||
select {
|
||||
case <-done:
|
||||
case <-instance.waitDone:
|
||||
instance.logger.Info("Olric instance stopped gracefully")
|
||||
case <-time.After(10 * time.Second):
|
||||
instance.logger.Warn("Olric instance did not stop gracefully, killing")
|
||||
_ = instance.cmd.Process.Kill()
|
||||
<-instance.waitDone // wait for reaper to finish
|
||||
case <-ctx.Done():
|
||||
_ = instance.cmd.Process.Kill()
|
||||
<-instance.waitDone
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
@ -379,31 +392,29 @@ func (is *InstanceSpawner) HealthCheck(ctx context.Context, ns, nodeID string) (
|
||||
return healthy, err
|
||||
}
|
||||
|
||||
// waitForInstanceReady waits for the Olric instance to be ready
|
||||
func (is *InstanceSpawner) waitForInstanceReady(ctx context.Context, instance *OlricInstance) error {
|
||||
// Olric doesn't have a standard /ready endpoint, so we check if the process
|
||||
// is running and the memberlist port is accepting connections
|
||||
// waitForPortReady waits for the Olric memberlist port to accept TCP connections.
|
||||
// This is a lightweight check — it confirms the process started but does NOT
|
||||
// guarantee that peers have joined the cluster.
|
||||
func (is *InstanceSpawner) waitForPortReady(ctx context.Context, instance *OlricInstance) error {
|
||||
// Use BindAddr for the health check — this is the address the process actually listens on.
|
||||
// AdvertiseAddr may differ from BindAddr (e.g., 0.0.0.0 resolves to IPv6 on some hosts).
|
||||
checkAddr := instance.BindAddr
|
||||
if checkAddr == "" || checkAddr == "0.0.0.0" {
|
||||
checkAddr = "localhost"
|
||||
}
|
||||
addr := fmt.Sprintf("%s:%d", checkAddr, instance.MemberlistPort)
|
||||
|
||||
maxAttempts := 30 // 30 seconds
|
||||
maxAttempts := 30
|
||||
for i := 0; i < maxAttempts; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-instance.waitDone:
|
||||
// Process exited before becoming ready
|
||||
return fmt.Errorf("Olric process exited unexpectedly (pid %d)", instance.PID)
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
|
||||
// Check if the process is still running
|
||||
if instance.cmd != nil && instance.cmd.ProcessState != nil && instance.cmd.ProcessState.Exited() {
|
||||
return fmt.Errorf("Olric process exited unexpectedly")
|
||||
}
|
||||
|
||||
// Try to connect to the memberlist port to verify it's accepting connections
|
||||
// Use the advertise address since Olric may bind to a specific IP
|
||||
addr := fmt.Sprintf("%s:%d", instance.AdvertiseAddr, instance.MemberlistPort)
|
||||
if instance.AdvertiseAddr == "" {
|
||||
addr = fmt.Sprintf("localhost:%d", instance.MemberlistPort)
|
||||
}
|
||||
|
||||
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
|
||||
if err != nil {
|
||||
instance.logger.Debug("Waiting for Olric memberlist",
|
||||
@ -415,7 +426,7 @@ func (is *InstanceSpawner) waitForInstanceReady(ctx context.Context, instance *O
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
instance.logger.Debug("Olric instance ready",
|
||||
instance.logger.Debug("Olric memberlist port ready",
|
||||
zap.Int("attempts", i+1),
|
||||
zap.String("addr", addr),
|
||||
)
|
||||
@ -430,14 +441,27 @@ func (is *InstanceSpawner) monitorInstance(instance *OlricInstance) {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
for {
|
||||
select {
|
||||
case <-instance.waitDone:
|
||||
// Process exited — update status and stop monitoring
|
||||
is.mu.Lock()
|
||||
key := instanceKey(instance.Namespace, instance.NodeID)
|
||||
if _, exists := is.instances[key]; exists {
|
||||
instance.Status = InstanceStatusStopped
|
||||
instance.logger.Warn("Olric instance process exited unexpectedly")
|
||||
}
|
||||
is.mu.Unlock()
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
is.mu.RLock()
|
||||
key := instanceKey(instance.Namespace, instance.NodeID)
|
||||
_, exists := is.instances[key]
|
||||
is.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
// Instance was removed
|
||||
return
|
||||
}
|
||||
|
||||
@ -454,21 +478,18 @@ func (is *InstanceSpawner) monitorInstance(instance *OlricInstance) {
|
||||
instance.logger.Warn("Olric instance health check failed")
|
||||
}
|
||||
is.mu.Unlock()
|
||||
|
||||
// Check if process is still running
|
||||
if instance.cmd != nil && instance.cmd.ProcessState != nil && instance.cmd.ProcessState.Exited() {
|
||||
is.mu.Lock()
|
||||
instance.Status = InstanceStatusStopped
|
||||
is.mu.Unlock()
|
||||
instance.logger.Warn("Olric instance process exited unexpectedly")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// IsHealthy checks if the Olric instance is healthy by verifying the memberlist port is accepting connections
|
||||
func (oi *OlricInstance) IsHealthy(ctx context.Context) (bool, error) {
|
||||
// Olric doesn't have a standard /ready HTTP endpoint, so we check memberlist connectivity
|
||||
// Check if process has exited first
|
||||
select {
|
||||
case <-oi.waitDone:
|
||||
return false, fmt.Errorf("process has exited")
|
||||
default:
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", oi.AdvertiseAddr, oi.MemberlistPort)
|
||||
if oi.AdvertiseAddr == "" || oi.AdvertiseAddr == "0.0.0.0" {
|
||||
addr = fmt.Sprintf("localhost:%d", oi.MemberlistPort)
|
||||
|
||||
@ -318,8 +318,16 @@ func setReflectValue(field reflect.Value, raw any) error {
|
||||
return nil
|
||||
}
|
||||
fallthrough
|
||||
case reflect.Ptr:
|
||||
// Handle pointer types (e.g. *time.Time, *string, *int)
|
||||
// nil raw is already handled above (leaves zero/nil pointer)
|
||||
elem := reflect.New(field.Type().Elem())
|
||||
if err := setReflectValue(elem.Elem(), raw); err != nil {
|
||||
return err
|
||||
}
|
||||
field.Set(elem)
|
||||
return nil
|
||||
default:
|
||||
// Not supported yet
|
||||
return fmt.Errorf("unsupported dest field kind: %s", field.Kind())
|
||||
}
|
||||
return nil
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user