package rqlite import ( "context" "encoding/json" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" "strings" "sync" "time" "github.com/DeBrosOfficial/network/pkg/tlsutil" "go.uber.org/zap" ) // InstanceNodeStatus represents the status of an instance (local type to avoid import cycle) type InstanceNodeStatus string const ( InstanceStatusPending InstanceNodeStatus = "pending" InstanceStatusStarting InstanceNodeStatus = "starting" InstanceStatusRunning InstanceNodeStatus = "running" InstanceStatusStopped InstanceNodeStatus = "stopped" InstanceStatusFailed InstanceNodeStatus = "failed" ) // InstanceError represents an error during instance operations (local type to avoid import cycle) type InstanceError struct { Message string Cause error } func (e *InstanceError) Error() string { if e.Cause != nil { return e.Message + ": " + e.Cause.Error() } return e.Message } func (e *InstanceError) Unwrap() error { return e.Cause } // InstanceSpawner manages multiple RQLite instances for namespace clusters. // Each namespace gets its own RQLite cluster with dedicated ports and data directories. type InstanceSpawner struct { logger *zap.Logger baseDir string // Base directory for all namespace data (e.g., ~/.orama/data/namespaces) instances map[string]*RQLiteInstance mu sync.RWMutex } // RQLiteInstance represents a running RQLite instance for a namespace type RQLiteInstance struct { Namespace string NodeID string HTTPPort int RaftPort int HTTPAdvAddress string RaftAdvAddress string JoinAddresses []string DataDir string IsLeader bool PID int Status InstanceNodeStatus StartedAt time.Time LastHealthCheck time.Time cmd *exec.Cmd logger *zap.Logger } // InstanceConfig holds configuration for spawning an RQLite instance type InstanceConfig struct { Namespace string // Namespace name (e.g., "alice") NodeID string // Physical node ID HTTPPort int // HTTP API port RaftPort int // Raft consensus port HTTPAdvAddress string // Advertised HTTP address (e.g., "192.168.1.10:10000") RaftAdvAddress string // Advertised Raft address (e.g., "192.168.1.10:10001") JoinAddresses []string // Addresses of existing cluster members to join IsLeader bool // Whether this is the initial leader node } // NewInstanceSpawner creates a new RQLite instance spawner func NewInstanceSpawner(baseDir string, logger *zap.Logger) *InstanceSpawner { return &InstanceSpawner{ logger: logger.With(zap.String("component", "rqlite-instance-spawner")), baseDir: baseDir, instances: make(map[string]*RQLiteInstance), } } // instanceKey generates a unique key for an instance based on namespace and node func instanceKey(namespace, nodeID string) string { return fmt.Sprintf("%s:%s", namespace, nodeID) } // SpawnInstance starts a new RQLite instance for a namespace on a specific node. // Returns the instance info or an error if spawning fails. func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig) (*RQLiteInstance, error) { key := instanceKey(cfg.Namespace, cfg.NodeID) is.mu.Lock() if existing, ok := is.instances[key]; ok { is.mu.Unlock() // Instance already exists, return it if running if existing.Status == InstanceStatusRunning { return existing, nil } // Otherwise, remove it and start fresh is.mu.Lock() delete(is.instances, key) } is.mu.Unlock() // Create data directory dataDir := filepath.Join(is.baseDir, cfg.Namespace, "rqlite", cfg.NodeID) if err := os.MkdirAll(dataDir, 0755); err != nil { return nil, &InstanceError{ Message: "failed to create data directory", Cause: err, } } // Create logs directory logsDir := filepath.Join(is.baseDir, cfg.Namespace, "logs") if err := os.MkdirAll(logsDir, 0755); err != nil { return nil, &InstanceError{ Message: "failed to create logs directory", Cause: err, } } instance := &RQLiteInstance{ Namespace: cfg.Namespace, NodeID: cfg.NodeID, HTTPPort: cfg.HTTPPort, RaftPort: cfg.RaftPort, HTTPAdvAddress: cfg.HTTPAdvAddress, RaftAdvAddress: cfg.RaftAdvAddress, JoinAddresses: cfg.JoinAddresses, DataDir: dataDir, IsLeader: cfg.IsLeader, Status: InstanceStatusStarting, logger: is.logger.With(zap.String("namespace", cfg.Namespace), zap.String("node_id", cfg.NodeID)), } // Build command arguments args := []string{ "-http-addr", fmt.Sprintf("0.0.0.0:%d", cfg.HTTPPort), "-http-adv-addr", cfg.HTTPAdvAddress, "-raft-addr", fmt.Sprintf("0.0.0.0:%d", cfg.RaftPort), "-raft-adv-addr", cfg.RaftAdvAddress, } // Handle cluster joining if len(cfg.JoinAddresses) > 0 && !cfg.IsLeader { // Remove peers.json if it exists to avoid stale cluster state peersJSONPath := filepath.Join(dataDir, "raft", "peers.json") if _, err := os.Stat(peersJSONPath); err == nil { instance.logger.Debug("Removing existing peers.json before joining cluster", zap.String("path", peersJSONPath)) _ = os.Remove(peersJSONPath) } // Prepare join addresses (strip http:// prefix if present) joinAddrs := make([]string, 0, len(cfg.JoinAddresses)) for _, addr := range cfg.JoinAddresses { addr = strings.TrimPrefix(addr, "http://") addr = strings.TrimPrefix(addr, "https://") joinAddrs = append(joinAddrs, addr) } // Wait for join targets to be available if err := is.waitForJoinTargets(ctx, cfg.JoinAddresses); err != nil { instance.logger.Warn("Join targets not all reachable, will still attempt join", zap.Error(err)) } args = append(args, "-join", strings.Join(joinAddrs, ","), "-join-as", cfg.RaftAdvAddress, "-join-attempts", "30", "-join-interval", "10s", ) } // Add data directory as final argument args = append(args, dataDir) instance.logger.Info("Starting RQLite instance", zap.Int("http_port", cfg.HTTPPort), zap.Int("raft_port", cfg.RaftPort), zap.Strings("join_addresses", cfg.JoinAddresses), zap.Bool("is_leader", cfg.IsLeader), ) // Create command cmd := exec.CommandContext(ctx, "rqlited", args...) instance.cmd = cmd // Setup logging logPath := filepath.Join(logsDir, fmt.Sprintf("rqlite-%s.log", cfg.NodeID)) logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return nil, &InstanceError{ Message: "failed to open log file", Cause: err, } } cmd.Stdout = logFile cmd.Stderr = logFile // Start the process if err := cmd.Start(); err != nil { logFile.Close() return nil, &InstanceError{ Message: "failed to start RQLite process", Cause: err, } } logFile.Close() instance.PID = cmd.Process.Pid instance.StartedAt = time.Now() // Store instance is.mu.Lock() is.instances[key] = instance is.mu.Unlock() // Wait for instance to be ready if err := is.waitForInstanceReady(ctx, instance); err != nil { // Kill the process on failure if cmd.Process != nil { _ = cmd.Process.Kill() } is.mu.Lock() delete(is.instances, key) is.mu.Unlock() return nil, &InstanceError{ Message: "RQLite instance did not become ready", Cause: err, } } instance.Status = InstanceStatusRunning instance.LastHealthCheck = time.Now() instance.logger.Info("RQLite instance started successfully", zap.Int("pid", instance.PID), ) // Start background process monitor go is.monitorInstance(instance) return instance, nil } // StopInstance stops an RQLite instance for a namespace on a specific node func (is *InstanceSpawner) StopInstance(ctx context.Context, namespace, nodeID string) error { key := instanceKey(namespace, nodeID) is.mu.Lock() instance, ok := is.instances[key] if !ok { is.mu.Unlock() return nil // Already stopped } delete(is.instances, key) is.mu.Unlock() if instance.cmd != nil && instance.cmd.Process != nil { instance.logger.Info("Stopping RQLite instance", zap.Int("pid", instance.PID)) // Send SIGTERM for graceful shutdown if err := instance.cmd.Process.Signal(os.Interrupt); err != nil { // If SIGTERM fails, kill it _ = instance.cmd.Process.Kill() } // Wait for process to exit with timeout done := make(chan error, 1) go func() { done <- instance.cmd.Wait() }() select { case <-done: instance.logger.Info("RQLite instance stopped gracefully") case <-time.After(10 * time.Second): instance.logger.Warn("RQLite instance did not stop gracefully, killing") _ = instance.cmd.Process.Kill() case <-ctx.Done(): _ = instance.cmd.Process.Kill() return ctx.Err() } } instance.Status = InstanceStatusStopped return nil } // StopAllInstances stops all RQLite instances for a namespace func (is *InstanceSpawner) StopAllInstances(ctx context.Context, ns string) error { is.mu.RLock() var keys []string for key, inst := range is.instances { if inst.Namespace == ns { keys = append(keys, key) } } is.mu.RUnlock() var lastErr error for _, key := range keys { parts := strings.SplitN(key, ":", 2) if len(parts) == 2 { if err := is.StopInstance(ctx, parts[0], parts[1]); err != nil { lastErr = err } } } return lastErr } // GetInstance returns the instance for a namespace on a specific node func (is *InstanceSpawner) GetInstance(namespace, nodeID string) (*RQLiteInstance, bool) { is.mu.RLock() defer is.mu.RUnlock() instance, ok := is.instances[instanceKey(namespace, nodeID)] return instance, ok } // GetNamespaceInstances returns all instances for a namespace func (is *InstanceSpawner) GetNamespaceInstances(ns string) []*RQLiteInstance { is.mu.RLock() defer is.mu.RUnlock() var instances []*RQLiteInstance for _, inst := range is.instances { if inst.Namespace == ns { instances = append(instances, inst) } } return instances } // HealthCheck checks if an instance is healthy func (is *InstanceSpawner) HealthCheck(ctx context.Context, namespace, nodeID string) (bool, error) { instance, ok := is.GetInstance(namespace, nodeID) if !ok { return false, &InstanceError{Message: "instance not found"} } healthy, err := instance.IsHealthy(ctx) if healthy { is.mu.Lock() instance.LastHealthCheck = time.Now() is.mu.Unlock() } return healthy, err } // waitForJoinTargets waits for join target nodes to be reachable func (is *InstanceSpawner) waitForJoinTargets(ctx context.Context, joinAddresses []string) error { timeout := 2 * time.Minute deadline := time.Now().Add(timeout) client := tlsutil.NewHTTPClient(5 * time.Second) for time.Now().Before(deadline) { allReachable := true for _, addr := range joinAddresses { statusURL := addr if !strings.HasPrefix(addr, "http") { statusURL = "http://" + addr } statusURL = strings.TrimRight(statusURL, "/") + "/status" resp, err := client.Get(statusURL) if err != nil { allReachable = false break } resp.Body.Close() if resp.StatusCode != http.StatusOK { allReachable = false break } } if allReachable { return nil } select { case <-ctx.Done(): return ctx.Err() case <-time.After(2 * time.Second): } } return fmt.Errorf("join targets not reachable within timeout") } // waitForInstanceReady waits for the RQLite instance to be ready func (is *InstanceSpawner) waitForInstanceReady(ctx context.Context, instance *RQLiteInstance) error { url := fmt.Sprintf("http://localhost:%d/status", instance.HTTPPort) client := tlsutil.NewHTTPClient(2 * time.Second) // Longer timeout for joining nodes as they need to sync maxAttempts := 180 // 3 minutes if len(instance.JoinAddresses) > 0 { maxAttempts = 300 // 5 minutes for joiners } for i := 0; i < maxAttempts; i++ { select { case <-ctx.Done(): return ctx.Err() case <-time.After(1 * time.Second): } resp, err := client.Get(url) if err != nil { continue } if resp.StatusCode == http.StatusOK { body, _ := io.ReadAll(resp.Body) resp.Body.Close() var statusResp map[string]interface{} if err := json.Unmarshal(body, &statusResp); err == nil { if raft, ok := statusResp["raft"].(map[string]interface{}); ok { state, _ := raft["state"].(string) if state == "leader" || state == "follower" { instance.logger.Debug("RQLite instance ready", zap.String("state", state), zap.Int("attempts", i+1), ) return nil } } else { // Backwards compatibility - if no raft status, consider ready return nil } } } resp.Body.Close() } return fmt.Errorf("RQLite did not become ready within timeout") } // monitorInstance monitors an instance and updates its status func (is *InstanceSpawner) monitorInstance(instance *RQLiteInstance) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for range ticker.C { is.mu.RLock() key := instanceKey(instance.Namespace, instance.NodeID) _, exists := is.instances[key] is.mu.RUnlock() if !exists { // Instance was removed return } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) healthy, _ := instance.IsHealthy(ctx) cancel() is.mu.Lock() if healthy { instance.Status = InstanceStatusRunning instance.LastHealthCheck = time.Now() } else { instance.Status = InstanceStatusFailed instance.logger.Warn("RQLite instance health check failed") } is.mu.Unlock() // Check if process is still running if instance.cmd != nil && instance.cmd.ProcessState != nil && instance.cmd.ProcessState.Exited() { is.mu.Lock() instance.Status = InstanceStatusStopped is.mu.Unlock() instance.logger.Warn("RQLite instance process exited unexpectedly") return } } } // IsHealthy checks if the RQLite instance is healthy func (ri *RQLiteInstance) IsHealthy(ctx context.Context) (bool, error) { url := fmt.Sprintf("http://localhost:%d/status", ri.HTTPPort) client := tlsutil.NewHTTPClient(5 * time.Second) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return false, err } resp, err := client.Do(req) if err != nil { return false, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return false, fmt.Errorf("status endpoint returned %d", resp.StatusCode) } body, err := io.ReadAll(resp.Body) if err != nil { return false, err } var statusResp map[string]interface{} if err := json.Unmarshal(body, &statusResp); err != nil { return false, err } if raft, ok := statusResp["raft"].(map[string]interface{}); ok { state, _ := raft["state"].(string) return state == "leader" || state == "follower", nil } // Backwards compatibility return true, nil } // GetLeaderAddress returns the leader's address for the cluster func (ri *RQLiteInstance) GetLeaderAddress(ctx context.Context) (string, error) { url := fmt.Sprintf("http://localhost:%d/status", ri.HTTPPort) client := tlsutil.NewHTTPClient(5 * time.Second) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return "", err } resp, err := client.Do(req) if err != nil { return "", err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return "", err } var statusResp map[string]interface{} if err := json.Unmarshal(body, &statusResp); err != nil { return "", err } if raft, ok := statusResp["raft"].(map[string]interface{}); ok { if leader, ok := raft["leader_addr"].(string); ok { return leader, nil } } return "", fmt.Errorf("leader address not found in status response") } // DSN returns the connection string for this RQLite instance func (ri *RQLiteInstance) DSN() string { return fmt.Sprintf("http://localhost:%d", ri.HTTPPort) } // AdvertisedDSN returns the advertised connection string for cluster communication func (ri *RQLiteInstance) AdvertisedDSN() string { return fmt.Sprintf("http://%s", ri.HTTPAdvAddress) }