orama/pkg/olric/instance_spawner.go
2026-02-13 14:33:11 +02:00

543 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package olric
import (
"context"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
// InstanceNodeStatus represents the status of an instance (local type to avoid import cycle)
type InstanceNodeStatus string
const (
InstanceStatusPending InstanceNodeStatus = "pending"
InstanceStatusStarting InstanceNodeStatus = "starting"
InstanceStatusRunning InstanceNodeStatus = "running"
InstanceStatusStopped InstanceNodeStatus = "stopped"
InstanceStatusFailed InstanceNodeStatus = "failed"
)
// InstanceError represents an error during instance operations (local type to avoid import cycle)
type InstanceError struct {
Message string
Cause error
}
func (e *InstanceError) Error() string {
if e.Cause != nil {
return e.Message + ": " + e.Cause.Error()
}
return e.Message
}
func (e *InstanceError) Unwrap() error {
return e.Cause
}
// InstanceSpawner manages multiple Olric instances for namespace clusters.
// Each namespace gets its own Olric cluster with dedicated ports and memberlist.
type InstanceSpawner struct {
logger *zap.Logger
baseDir string // Base directory for all namespace data (e.g., ~/.orama/data/namespaces)
instances map[string]*OlricInstance
mu sync.RWMutex
}
// OlricInstance represents a running Olric instance for a namespace
type OlricInstance struct {
Namespace string
NodeID string
HTTPPort int
MemberlistPort int
BindAddr string
AdvertiseAddr string
PeerAddresses []string // Memberlist peer addresses for cluster discovery
ConfigPath string
DataDir string
PID int
StartedAt time.Time
cmd *exec.Cmd
logFile *os.File // kept open for process lifetime
waitDone chan struct{} // closed when cmd.Wait() completes
logger *zap.Logger
// mu protects mutable state (Status, LastHealthCheck) accessed concurrently
// by the monitor goroutine and external callers.
mu sync.RWMutex
Status InstanceNodeStatus
LastHealthCheck time.Time
}
// InstanceConfig holds configuration for spawning an Olric instance
type InstanceConfig struct {
Namespace string // Namespace name (e.g., "alice")
NodeID string // Physical node ID
HTTPPort int // HTTP API port
MemberlistPort int // Memberlist gossip port
BindAddr string // Address to bind (e.g., "0.0.0.0")
AdvertiseAddr string // Address to advertise (e.g., "192.168.1.10")
PeerAddresses []string // Memberlist peer addresses for initial cluster join
}
// OlricConfig represents the Olric YAML configuration structure
type OlricConfig struct {
Server OlricServerConfig `yaml:"server"`
Memberlist OlricMemberlistConfig `yaml:"memberlist"`
PartitionCount uint64 `yaml:"partitionCount"` // Number of partitions (default: 256, we use 12 for namespace isolation)
}
// OlricServerConfig represents the server section of Olric config
type OlricServerConfig struct {
BindAddr string `yaml:"bindAddr"`
BindPort int `yaml:"bindPort"`
}
// OlricMemberlistConfig represents the memberlist section of Olric config
type OlricMemberlistConfig struct {
Environment string `yaml:"environment"`
BindAddr string `yaml:"bindAddr"`
BindPort int `yaml:"bindPort"`
Peers []string `yaml:"peers,omitempty"`
}
// NewInstanceSpawner creates a new Olric instance spawner
func NewInstanceSpawner(baseDir string, logger *zap.Logger) *InstanceSpawner {
return &InstanceSpawner{
logger: logger.With(zap.String("component", "olric-instance-spawner")),
baseDir: baseDir,
instances: make(map[string]*OlricInstance),
}
}
// instanceKey generates a unique key for an instance based on namespace and node
func instanceKey(namespace, nodeID string) string {
return fmt.Sprintf("%s:%s", namespace, nodeID)
}
// SpawnInstance starts a new Olric instance for a namespace on a specific node.
// The process is decoupled from the caller's context — it runs independently until
// explicitly stopped. Only returns an error if the process fails to start or the
// memberlist port doesn't open within the timeout.
// Note: The memberlist port opening does NOT mean the cluster has formed — peers may
// still be joining. Use WaitForProcessRunning() after spawning all instances to verify.
func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig) (*OlricInstance, error) {
key := instanceKey(cfg.Namespace, cfg.NodeID)
is.mu.Lock()
if existing, ok := is.instances[key]; ok {
existing.mu.RLock()
status := existing.Status
existing.mu.RUnlock()
if status == InstanceStatusRunning || status == InstanceStatusStarting {
is.mu.Unlock()
return existing, nil
}
// Remove stale instance
delete(is.instances, key)
}
is.mu.Unlock()
// Create data and config directories
dataDir := filepath.Join(is.baseDir, cfg.Namespace, "olric", cfg.NodeID)
configDir := filepath.Join(is.baseDir, cfg.Namespace, "configs")
logsDir := filepath.Join(is.baseDir, cfg.Namespace, "logs")
for _, dir := range []string{dataDir, configDir, logsDir} {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, &InstanceError{
Message: fmt.Sprintf("failed to create directory %s", dir),
Cause: err,
}
}
}
// Generate config file
configPath := filepath.Join(configDir, fmt.Sprintf("olric-%s.yaml", cfg.NodeID))
if err := is.generateConfig(configPath, cfg); err != nil {
return nil, err
}
instance := &OlricInstance{
Namespace: cfg.Namespace,
NodeID: cfg.NodeID,
HTTPPort: cfg.HTTPPort,
MemberlistPort: cfg.MemberlistPort,
BindAddr: cfg.BindAddr,
AdvertiseAddr: cfg.AdvertiseAddr,
PeerAddresses: cfg.PeerAddresses,
ConfigPath: configPath,
DataDir: dataDir,
Status: InstanceStatusStarting,
waitDone: make(chan struct{}),
logger: is.logger.With(zap.String("namespace", cfg.Namespace), zap.String("node_id", cfg.NodeID)),
}
instance.logger.Info("Starting Olric instance",
zap.Int("http_port", cfg.HTTPPort),
zap.Int("memberlist_port", cfg.MemberlistPort),
zap.Strings("peers", cfg.PeerAddresses),
)
// Use exec.Command (NOT exec.CommandContext) so the process is NOT killed
// when the HTTP request context or provisioning context is cancelled.
// The process lives until explicitly stopped via StopInstance().
cmd := exec.Command("olric-server")
cmd.Env = append(os.Environ(), fmt.Sprintf("OLRIC_SERVER_CONFIG=%s", configPath))
instance.cmd = cmd
// Setup logging — keep the file open for the process lifetime
logPath := filepath.Join(logsDir, fmt.Sprintf("olric-%s.log", cfg.NodeID))
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, &InstanceError{
Message: "failed to open log file",
Cause: err,
}
}
instance.logFile = logFile
cmd.Stdout = logFile
cmd.Stderr = logFile
// Start the process
if err := cmd.Start(); err != nil {
logFile.Close()
return nil, &InstanceError{
Message: "failed to start Olric process",
Cause: err,
}
}
instance.PID = cmd.Process.Pid
instance.StartedAt = time.Now()
// Reap the child process in a background goroutine to prevent zombies.
// This goroutine closes the log file and signals via waitDone when the process exits.
go func() {
_ = cmd.Wait()
logFile.Close()
close(instance.waitDone)
}()
// Store instance
is.mu.Lock()
is.instances[key] = instance
is.mu.Unlock()
// Wait for the memberlist port to accept TCP connections.
// This confirms the process started and Olric initialized its network layer.
// It does NOT guarantee peers have joined — that happens asynchronously.
if err := is.waitForPortReady(ctx, instance); err != nil {
// Kill the process on failure
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
is.mu.Lock()
delete(is.instances, key)
is.mu.Unlock()
return nil, &InstanceError{
Message: "Olric instance did not become ready",
Cause: err,
}
}
instance.mu.Lock()
instance.Status = InstanceStatusRunning
instance.LastHealthCheck = time.Now()
instance.mu.Unlock()
instance.logger.Info("Olric instance started successfully",
zap.Int("pid", instance.PID),
)
// Start background process monitor
go is.monitorInstance(instance)
return instance, nil
}
// generateConfig generates the Olric YAML configuration file
func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig) error {
// Use "lan" environment for namespace clusters (low latency expected)
olricCfg := OlricConfig{
Server: OlricServerConfig{
BindAddr: cfg.BindAddr,
BindPort: cfg.HTTPPort,
},
Memberlist: OlricMemberlistConfig{
Environment: "lan",
BindAddr: cfg.BindAddr,
BindPort: cfg.MemberlistPort,
Peers: cfg.PeerAddresses,
},
// Use 12 partitions for namespace Olric instances (vs 256 default)
// This gives perfect distribution for 2-6 nodes and 20x faster scans
// 12 partitions × 2 (primary+replica) = 24 network calls (~0.6s vs 12s)
PartitionCount: 12,
}
data, err := yaml.Marshal(olricCfg)
if err != nil {
return &InstanceError{
Message: "failed to marshal Olric config",
Cause: err,
}
}
if err := os.WriteFile(configPath, data, 0644); err != nil {
return &InstanceError{
Message: "failed to write Olric config",
Cause: err,
}
}
return nil
}
// StopInstance stops an Olric instance for a namespace on a specific node
func (is *InstanceSpawner) StopInstance(ctx context.Context, ns, nodeID string) error {
key := instanceKey(ns, nodeID)
is.mu.Lock()
instance, ok := is.instances[key]
if !ok {
is.mu.Unlock()
return nil // Already stopped
}
delete(is.instances, key)
is.mu.Unlock()
if instance.cmd != nil && instance.cmd.Process != nil {
instance.logger.Info("Stopping Olric instance", zap.Int("pid", instance.PID))
// Send SIGTERM for graceful shutdown
if err := instance.cmd.Process.Signal(os.Interrupt); err != nil {
// If SIGTERM fails, kill it
_ = instance.cmd.Process.Kill()
}
// Wait for process to exit via the reaper goroutine
select {
case <-instance.waitDone:
instance.logger.Info("Olric instance stopped gracefully")
case <-time.After(10 * time.Second):
instance.logger.Warn("Olric instance did not stop gracefully, killing")
_ = instance.cmd.Process.Kill()
<-instance.waitDone // wait for reaper to finish
case <-ctx.Done():
_ = instance.cmd.Process.Kill()
<-instance.waitDone
return ctx.Err()
}
}
instance.mu.Lock()
instance.Status = InstanceStatusStopped
instance.mu.Unlock()
return nil
}
// StopAllInstances stops all Olric instances for a namespace
func (is *InstanceSpawner) StopAllInstances(ctx context.Context, ns string) error {
is.mu.RLock()
var keys []string
for key, inst := range is.instances {
if inst.Namespace == ns {
keys = append(keys, key)
}
}
is.mu.RUnlock()
var lastErr error
for _, key := range keys {
parts := strings.SplitN(key, ":", 2)
if len(parts) == 2 {
if err := is.StopInstance(ctx, parts[0], parts[1]); err != nil {
lastErr = err
}
}
}
return lastErr
}
// GetInstance returns the instance for a namespace on a specific node
func (is *InstanceSpawner) GetInstance(ns, nodeID string) (*OlricInstance, bool) {
is.mu.RLock()
defer is.mu.RUnlock()
instance, ok := is.instances[instanceKey(ns, nodeID)]
return instance, ok
}
// GetNamespaceInstances returns all instances for a namespace
func (is *InstanceSpawner) GetNamespaceInstances(ns string) []*OlricInstance {
is.mu.RLock()
defer is.mu.RUnlock()
var instances []*OlricInstance
for _, inst := range is.instances {
if inst.Namespace == ns {
instances = append(instances, inst)
}
}
return instances
}
// HealthCheck checks if an instance is healthy
func (is *InstanceSpawner) HealthCheck(ctx context.Context, ns, nodeID string) (bool, error) {
instance, ok := is.GetInstance(ns, nodeID)
if !ok {
return false, &InstanceError{Message: "instance not found"}
}
healthy, err := instance.IsHealthy(ctx)
if healthy {
instance.mu.Lock()
instance.LastHealthCheck = time.Now()
instance.mu.Unlock()
}
return healthy, err
}
// waitForPortReady waits for the Olric memberlist port to accept TCP connections.
// This is a lightweight check — it confirms the process started but does NOT
// guarantee that peers have joined the cluster.
func (is *InstanceSpawner) waitForPortReady(ctx context.Context, instance *OlricInstance) error {
// Use BindAddr for the health check — this is the address the process actually listens on.
// AdvertiseAddr may differ from BindAddr (e.g., 0.0.0.0 resolves to IPv6 on some hosts).
checkAddr := instance.BindAddr
if checkAddr == "" || checkAddr == "0.0.0.0" {
checkAddr = "localhost"
}
addr := fmt.Sprintf("%s:%d", checkAddr, instance.MemberlistPort)
maxAttempts := 30
for i := 0; i < maxAttempts; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-instance.waitDone:
// Process exited before becoming ready
return fmt.Errorf("Olric process exited unexpectedly (pid %d)", instance.PID)
case <-time.After(1 * time.Second):
}
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
if err != nil {
instance.logger.Debug("Waiting for Olric memberlist",
zap.Int("attempt", i+1),
zap.String("addr", addr),
zap.Error(err),
)
continue
}
conn.Close()
instance.logger.Debug("Olric memberlist port ready",
zap.Int("attempts", i+1),
zap.String("addr", addr),
)
return nil
}
return fmt.Errorf("Olric did not become ready within timeout")
}
// monitorInstance monitors an instance and updates its status
func (is *InstanceSpawner) monitorInstance(instance *OlricInstance) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-instance.waitDone:
// Process exited — update status and stop monitoring
is.mu.RLock()
key := instanceKey(instance.Namespace, instance.NodeID)
_, exists := is.instances[key]
is.mu.RUnlock()
if exists {
instance.mu.Lock()
instance.Status = InstanceStatusStopped
instance.mu.Unlock()
instance.logger.Warn("Olric instance process exited unexpectedly")
}
return
case <-ticker.C:
}
is.mu.RLock()
key := instanceKey(instance.Namespace, instance.NodeID)
_, exists := is.instances[key]
is.mu.RUnlock()
if !exists {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
healthy, _ := instance.IsHealthy(ctx)
cancel()
instance.mu.Lock()
if healthy {
instance.Status = InstanceStatusRunning
instance.LastHealthCheck = time.Now()
} else {
instance.Status = InstanceStatusFailed
instance.logger.Warn("Olric instance health check failed")
}
instance.mu.Unlock()
}
}
// IsHealthy checks if the Olric instance is healthy by verifying the memberlist port is accepting connections
func (oi *OlricInstance) IsHealthy(ctx context.Context) (bool, error) {
// Check if process has exited first
select {
case <-oi.waitDone:
return false, fmt.Errorf("process has exited")
default:
}
addr := fmt.Sprintf("%s:%d", oi.AdvertiseAddr, oi.MemberlistPort)
if oi.AdvertiseAddr == "" || oi.AdvertiseAddr == "0.0.0.0" {
addr = fmt.Sprintf("localhost:%d", oi.MemberlistPort)
}
conn, err := net.DialTimeout("tcp", addr, 2*time.Second)
if err != nil {
return false, err
}
conn.Close()
return true, nil
}
// DSN returns the connection address for this Olric instance.
// Uses the bind address if set (e.g. WireGuard IP), since Olric may not listen on localhost.
func (oi *OlricInstance) DSN() string {
if oi.BindAddr != "" {
return fmt.Sprintf("%s:%d", oi.BindAddr, oi.HTTPPort)
}
return fmt.Sprintf("localhost:%d", oi.HTTPPort)
}
// AdvertisedDSN returns the advertised connection address
func (oi *OlricInstance) AdvertisedDSN() string {
return fmt.Sprintf("%s:%d", oi.AdvertiseAddr, oi.HTTPPort)
}
// MemberlistAddress returns the memberlist address for cluster communication
func (oi *OlricInstance) MemberlistAddress() string {
return fmt.Sprintf("%s:%d", oi.AdvertiseAddr, oi.MemberlistPort)
}