From c855b790f823d30c06c1314a869fb8a6f1433850 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Wed, 4 Feb 2026 17:17:01 +0200 Subject: [PATCH] Updated the way we spawn services on namespace added systemd --- pkg/cli/production/install/orchestrator.go | 56 +++ pkg/cli/production/lifecycle/stop.go | 44 +++ pkg/cli/production/upgrade/orchestrator.go | 89 +++++ .../handlers/namespace/spawn_handler.go | 58 +--- pkg/namespace/cluster_manager.go | 263 +++++++++++--- pkg/namespace/systemd_spawner.go | 195 +++++++++++ pkg/node/gateway.go | 8 +- pkg/systemd/manager.go | 326 ++++++++++++++++++ systemd/debros-namespace-gateway@.service | 32 ++ systemd/debros-namespace-olric@.service | 34 ++ systemd/debros-namespace-rqlite@.service | 44 +++ 11 files changed, 1051 insertions(+), 98 deletions(-) create mode 100644 pkg/namespace/systemd_spawner.go create mode 100644 pkg/systemd/manager.go create mode 100644 systemd/debros-namespace-gateway@.service create mode 100644 systemd/debros-namespace-olric@.service create mode 100644 systemd/debros-namespace-rqlite@.service diff --git a/pkg/cli/production/install/orchestrator.go b/pkg/cli/production/install/orchestrator.go index 3e19ad9..87d3a5e 100644 --- a/pkg/cli/production/install/orchestrator.go +++ b/pkg/cli/production/install/orchestrator.go @@ -205,6 +205,12 @@ func (o *Orchestrator) executeGenesisFlow() error { return fmt.Errorf("service creation failed: %w", err) } + // Install namespace systemd template units + fmt.Printf("\n🔧 Phase 5b: Installing namespace systemd templates...\n") + if err := o.installNamespaceTemplates(); err != nil { + fmt.Fprintf(os.Stderr, "âš ī¸ Template installation warning: %v\n", err) + } + // Phase 7: Seed DNS records (with retry — migrations may still be running) if o.flags.Nameserver && o.flags.BaseDomain != "" { fmt.Printf("\n🌐 Phase 7: Seeding DNS records...\n") @@ -330,6 +336,12 @@ func (o *Orchestrator) executeJoinFlow() error { return fmt.Errorf("service creation failed: %w", err) } + // Install namespace systemd template units + fmt.Printf("\n🔧 Installing namespace systemd templates...\n") + if err := o.installNamespaceTemplates(); err != nil { + fmt.Fprintf(os.Stderr, "âš ī¸ Template installation warning: %v\n", err) + } + o.setup.LogSetupComplete(o.setup.NodePeerID) fmt.Printf("✅ Production installation complete! Joined cluster via %s\n\n", o.flags.JoinAddress) return nil @@ -529,3 +541,47 @@ func promptForBaseDomain() string { return "orama-devnet.network" } } + +// installNamespaceTemplates installs systemd template unit files for namespace services +func (o *Orchestrator) installNamespaceTemplates() error { + sourceDir := filepath.Join(o.oramaHome, "src", "systemd") + systemdDir := "/etc/systemd/system" + + templates := []string{ + "debros-namespace-rqlite@.service", + "debros-namespace-olric@.service", + "debros-namespace-gateway@.service", + } + + installedCount := 0 + for _, template := range templates { + sourcePath := filepath.Join(sourceDir, template) + destPath := filepath.Join(systemdDir, template) + + // Read template file + data, err := os.ReadFile(sourcePath) + if err != nil { + fmt.Printf(" âš ī¸ Warning: Failed to read %s: %v\n", template, err) + continue + } + + // Write to systemd directory + if err := os.WriteFile(destPath, data, 0644); err != nil { + fmt.Printf(" âš ī¸ Warning: Failed to install %s: %v\n", template, err) + continue + } + + installedCount++ + fmt.Printf(" ✓ Installed %s\n", template) + } + + if installedCount > 0 { + // Reload systemd daemon to pick up new templates + if err := exec.Command("systemctl", "daemon-reload").Run(); err != nil { + return fmt.Errorf("failed to reload systemd daemon: %w", err) + } + fmt.Printf(" ✓ Systemd daemon reloaded (%d templates installed)\n", installedCount) + } + + return nil +} diff --git a/pkg/cli/production/lifecycle/stop.go b/pkg/cli/production/lifecycle/stop.go index ffd7332..e179745 100644 --- a/pkg/cli/production/lifecycle/stop.go +++ b/pkg/cli/production/lifecycle/stop.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "os/exec" + "strings" "time" "github.com/DeBrosOfficial/network/pkg/cli/utils" @@ -18,12 +19,18 @@ func HandleStop() { fmt.Printf("Stopping all DeBros production services...\n") + // First, stop all namespace services + fmt.Printf("\n Stopping namespace services...\n") + stopAllNamespaceServices() + services := utils.GetProductionServices() if len(services) == 0 { fmt.Printf(" âš ī¸ No DeBros services found\n") return } + fmt.Printf("\n Stopping main services...\n") + // First, disable all services to prevent auto-restart disableArgs := []string{"disable"} disableArgs = append(disableArgs, services...) @@ -110,3 +117,40 @@ func HandleStop() { fmt.Printf(" Use 'orama prod start' to start and re-enable services\n") } } + +// stopAllNamespaceServices stops all running namespace services +func stopAllNamespaceServices() { + // Find all running namespace services using systemctl list-units + cmd := exec.Command("systemctl", "list-units", "--type=service", "--all", "--no-pager", "--no-legend", "debros-namespace-*@*.service") + output, err := cmd.Output() + if err != nil { + fmt.Printf(" âš ī¸ Warning: Failed to list namespace services: %v\n", err) + return + } + + lines := strings.Split(string(output), "\n") + var namespaceServices []string + for _, line := range lines { + fields := strings.Fields(line) + if len(fields) > 0 { + serviceName := fields[0] + if strings.HasPrefix(serviceName, "debros-namespace-") { + namespaceServices = append(namespaceServices, serviceName) + } + } + } + + if len(namespaceServices) == 0 { + fmt.Printf(" No namespace services found\n") + return + } + + // Stop all namespace services + for _, svc := range namespaceServices { + if err := exec.Command("systemctl", "stop", svc).Run(); err != nil { + fmt.Printf(" âš ī¸ Warning: Failed to stop %s: %v\n", svc, err) + } + } + + fmt.Printf(" ✓ Stopped %d namespace service(s)\n", len(namespaceServices)) +} diff --git a/pkg/cli/production/upgrade/orchestrator.go b/pkg/cli/production/upgrade/orchestrator.go index 9ff1870..6d017f9 100644 --- a/pkg/cli/production/upgrade/orchestrator.go +++ b/pkg/cli/production/upgrade/orchestrator.go @@ -156,6 +156,12 @@ func (o *Orchestrator) Execute() error { fmt.Fprintf(os.Stderr, "âš ī¸ Service update warning: %v\n", err) } + // Install namespace systemd template units + fmt.Printf("\n🔧 Phase 5b: Installing namespace systemd templates...\n") + if err := o.installNamespaceTemplates(); err != nil { + fmt.Fprintf(os.Stderr, "âš ī¸ Template installation warning: %v\n", err) + } + // Re-apply UFW firewall rules (idempotent) fmt.Printf("\nđŸ›Ąī¸ Re-applying firewall rules...\n") if err := o.setup.Phase6bSetupFirewall(false); err != nil { @@ -333,6 +339,13 @@ func (o *Orchestrator) stopServices() error { fmt.Printf("\nâšī¸ Stopping all services before upgrade...\n") serviceController := production.NewSystemdController() + + // First, stop all namespace services (debros-namespace-*@*.service) + fmt.Printf(" Stopping namespace services...\n") + if err := o.stopAllNamespaceServices(serviceController); err != nil { + fmt.Printf(" âš ī¸ Warning: Failed to stop namespace services: %v\n", err) + } + // Stop services in reverse dependency order services := []string{ "caddy.service", // Depends on node @@ -360,6 +373,82 @@ func (o *Orchestrator) stopServices() error { return nil } +// stopAllNamespaceServices stops all running namespace services +func (o *Orchestrator) stopAllNamespaceServices(serviceController *production.SystemdController) error { + // Find all running namespace services using systemctl list-units + cmd := exec.Command("systemctl", "list-units", "--type=service", "--state=running", "--no-pager", "--no-legend", "debros-namespace-*@*.service") + output, err := cmd.Output() + if err != nil { + return fmt.Errorf("failed to list namespace services: %w", err) + } + + lines := strings.Split(string(output), "\n") + stoppedCount := 0 + for _, line := range lines { + fields := strings.Fields(line) + if len(fields) > 0 { + serviceName := fields[0] + if strings.HasPrefix(serviceName, "debros-namespace-") { + if err := serviceController.StopService(serviceName); err != nil { + fmt.Printf(" âš ī¸ Warning: Failed to stop %s: %v\n", serviceName, err) + } else { + stoppedCount++ + } + } + } + } + + if stoppedCount > 0 { + fmt.Printf(" ✓ Stopped %d namespace service(s)\n", stoppedCount) + } + + return nil +} + +// installNamespaceTemplates installs systemd template unit files for namespace services +func (o *Orchestrator) installNamespaceTemplates() error { + sourceDir := filepath.Join(o.oramaHome, "src", "systemd") + systemdDir := "/etc/systemd/system" + + templates := []string{ + "debros-namespace-rqlite@.service", + "debros-namespace-olric@.service", + "debros-namespace-gateway@.service", + } + + installedCount := 0 + for _, template := range templates { + sourcePath := filepath.Join(sourceDir, template) + destPath := filepath.Join(systemdDir, template) + + // Read template file + data, err := os.ReadFile(sourcePath) + if err != nil { + fmt.Printf(" âš ī¸ Warning: Failed to read %s: %v\n", template, err) + continue + } + + // Write to systemd directory + if err := os.WriteFile(destPath, data, 0644); err != nil { + fmt.Printf(" âš ī¸ Warning: Failed to install %s: %v\n", template, err) + continue + } + + installedCount++ + fmt.Printf(" ✓ Installed %s\n", template) + } + + if installedCount > 0 { + // Reload systemd daemon to pick up new templates + if err := exec.Command("systemctl", "daemon-reload").Run(); err != nil { + return fmt.Errorf("failed to reload systemd daemon: %w", err) + } + fmt.Printf(" ✓ Systemd daemon reloaded (%d templates installed)\n", installedCount) + } + + return nil +} + func (o *Orchestrator) extractPeers() []string { nodeConfigPath := filepath.Join(o.oramaDir, "configs", "node.yaml") var peers []string diff --git a/pkg/gateway/handlers/namespace/spawn_handler.go b/pkg/gateway/handlers/namespace/spawn_handler.go index 4806b39..4f16128 100644 --- a/pkg/gateway/handlers/namespace/spawn_handler.go +++ b/pkg/gateway/handlers/namespace/spawn_handler.go @@ -5,10 +5,10 @@ import ( "encoding/json" "fmt" "net/http" - "sync" "time" "github.com/DeBrosOfficial/network/pkg/gateway" + namespacepkg "github.com/DeBrosOfficial/network/pkg/namespace" "github.com/DeBrosOfficial/network/pkg/olric" "github.com/DeBrosOfficial/network/pkg/rqlite" "go.uber.org/zap" @@ -54,24 +54,17 @@ type SpawnResponse struct { } // SpawnHandler handles remote namespace instance spawn/stop requests. -// It tracks spawned RQLite instances locally so they can be stopped later. +// Now uses systemd for service management instead of direct process spawning. type SpawnHandler struct { - rqliteSpawner *rqlite.InstanceSpawner - olricSpawner *olric.InstanceSpawner - gatewaySpawner *gateway.InstanceSpawner - logger *zap.Logger - rqliteInstances map[string]*rqlite.Instance // key: "namespace:nodeID" - rqliteInstanceMu sync.RWMutex + systemdSpawner *namespacepkg.SystemdSpawner + logger *zap.Logger } // NewSpawnHandler creates a new spawn handler -func NewSpawnHandler(rs *rqlite.InstanceSpawner, os *olric.InstanceSpawner, gs *gateway.InstanceSpawner, logger *zap.Logger) *SpawnHandler { +func NewSpawnHandler(systemdSpawner *namespacepkg.SystemdSpawner, logger *zap.Logger) *SpawnHandler { return &SpawnHandler{ - rqliteSpawner: rs, - olricSpawner: os, - gatewaySpawner: gs, - logger: logger.With(zap.String("component", "namespace-spawn-handler")), - rqliteInstances: make(map[string]*rqlite.Instance), + systemdSpawner: systemdSpawner, + logger: logger.With(zap.String("component", "namespace-spawn-handler")), } } @@ -121,18 +114,12 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { JoinAddresses: req.RQLiteJoinAddrs, IsLeader: req.RQLiteIsLeader, } - instance, err := h.rqliteSpawner.SpawnInstance(ctx, cfg) - if err != nil { + if err := h.systemdSpawner.SpawnRQLite(ctx, req.Namespace, req.NodeID, cfg); err != nil { h.logger.Error("Failed to spawn RQLite instance", zap.Error(err)) writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) return } - // Track the instance for later stop requests - key := fmt.Sprintf("%s:%s", req.Namespace, req.NodeID) - h.rqliteInstanceMu.Lock() - h.rqliteInstances[key] = instance - h.rqliteInstanceMu.Unlock() - writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID}) + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) case "spawn-olric": cfg := olric.InstanceConfig{ @@ -144,27 +131,15 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { AdvertiseAddr: req.OlricAdvertiseAddr, PeerAddresses: req.OlricPeerAddresses, } - instance, err := h.olricSpawner.SpawnInstance(ctx, cfg) - if err != nil { + if err := h.systemdSpawner.SpawnOlric(ctx, req.Namespace, req.NodeID, cfg); err != nil { h.logger.Error("Failed to spawn Olric instance", zap.Error(err)) writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) return } - writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID}) + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) case "stop-rqlite": - key := fmt.Sprintf("%s:%s", req.Namespace, req.NodeID) - h.rqliteInstanceMu.Lock() - instance, ok := h.rqliteInstances[key] - if ok { - delete(h.rqliteInstances, key) - } - h.rqliteInstanceMu.Unlock() - if !ok { - writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) // Already stopped - return - } - if err := h.rqliteSpawner.StopInstance(ctx, instance); err != nil { + if err := h.systemdSpawner.StopRQLite(ctx, req.Namespace, req.NodeID); err != nil { h.logger.Error("Failed to stop RQLite instance", zap.Error(err)) writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) return @@ -172,7 +147,7 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) case "stop-olric": - if err := h.olricSpawner.StopInstance(ctx, req.Namespace, req.NodeID); err != nil { + if err := h.systemdSpawner.StopOlric(ctx, req.Namespace, req.NodeID); err != nil { h.logger.Error("Failed to stop Olric instance", zap.Error(err)) writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) return @@ -203,16 +178,15 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { IPFSTimeout: ipfsTimeout, IPFSReplicationFactor: req.IPFSReplicationFactor, } - instance, err := h.gatewaySpawner.SpawnInstance(ctx, cfg) - if err != nil { + if err := h.systemdSpawner.SpawnGateway(ctx, req.Namespace, req.NodeID, cfg); err != nil { h.logger.Error("Failed to spawn Gateway instance", zap.Error(err)) writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) return } - writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID}) + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) case "stop-gateway": - if err := h.gatewaySpawner.StopInstance(ctx, req.Namespace, req.NodeID); err != nil { + if err := h.systemdSpawner.StopGateway(ctx, req.Namespace, req.NodeID); err != nil { h.logger.Error("Failed to stop Gateway instance", zap.Error(err)) writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) return diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index 51f5342..419112d 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -18,8 +18,10 @@ import ( "github.com/DeBrosOfficial/network/pkg/gateway" "github.com/DeBrosOfficial/network/pkg/olric" "github.com/DeBrosOfficial/network/pkg/rqlite" + "github.com/DeBrosOfficial/network/pkg/systemd" "github.com/google/uuid" "go.uber.org/zap" + "gopkg.in/yaml.v3" ) // ClusterManagerConfig contains configuration for the cluster manager @@ -36,12 +38,10 @@ type ClusterManagerConfig struct { // ClusterManager orchestrates namespace cluster provisioning and lifecycle type ClusterManager struct { - db rqlite.Client - portAllocator *NamespacePortAllocator - nodeSelector *ClusterNodeSelector - rqliteSpawner *rqlite.InstanceSpawner - olricSpawner *olric.InstanceSpawner - gatewaySpawner *gateway.InstanceSpawner + db rqlite.Client + portAllocator *NamespacePortAllocator + nodeSelector *ClusterNodeSelector + systemdSpawner *SystemdSpawner // NEW: Systemd-based spawner replaces old spawners logger *zap.Logger baseDomain string baseDataDir string @@ -70,9 +70,7 @@ func NewClusterManager( // Create internal components portAllocator := NewNamespacePortAllocator(db, logger) nodeSelector := NewClusterNodeSelector(db, portAllocator, logger) - rqliteSpawner := rqlite.NewInstanceSpawner(cfg.BaseDataDir, logger) - olricSpawner := olric.NewInstanceSpawner(cfg.BaseDataDir, logger) - gatewaySpawner := gateway.NewInstanceSpawner(cfg.BaseDataDir, logger) + systemdSpawner := NewSystemdSpawner(cfg.BaseDataDir, logger) // Set IPFS defaults ipfsClusterAPIURL := cfg.IPFSClusterAPIURL @@ -96,9 +94,7 @@ func NewClusterManager( db: db, portAllocator: portAllocator, nodeSelector: nodeSelector, - rqliteSpawner: rqliteSpawner, - olricSpawner: olricSpawner, - gatewaySpawner: gatewaySpawner, + systemdSpawner: systemdSpawner, baseDomain: cfg.BaseDomain, baseDataDir: cfg.BaseDataDir, globalRQLiteDSN: cfg.GlobalRQLiteDSN, @@ -116,9 +112,7 @@ func NewClusterManagerWithComponents( db rqlite.Client, portAllocator *NamespacePortAllocator, nodeSelector *ClusterNodeSelector, - rqliteSpawner *rqlite.InstanceSpawner, - olricSpawner *olric.InstanceSpawner, - gatewaySpawner *gateway.InstanceSpawner, + systemdSpawner *SystemdSpawner, cfg ClusterManagerConfig, logger *zap.Logger, ) *ClusterManager { @@ -144,9 +138,7 @@ func NewClusterManagerWithComponents( db: db, portAllocator: portAllocator, nodeSelector: nodeSelector, - rqliteSpawner: rqliteSpawner, - olricSpawner: olricSpawner, - gatewaySpawner: gatewaySpawner, + systemdSpawner: systemdSpawner, baseDomain: cfg.BaseDomain, baseDataDir: cfg.BaseDataDir, globalRQLiteDSN: cfg.GlobalRQLiteDSN, @@ -165,6 +157,135 @@ func (cm *ClusterManager) SetLocalNodeID(id string) { cm.logger.Info("Local node ID set for distributed provisioning", zap.String("local_node_id", id)) } +// spawnRQLiteWithSystemd generates config and spawns RQLite via systemd +func (cm *ClusterManager) spawnRQLiteWithSystemd(ctx context.Context, cfg rqlite.InstanceConfig) error { + // RQLite uses command-line args, no config file needed + // Just call systemd spawner which will generate env file and start service + return cm.systemdSpawner.SpawnRQLite(ctx, cfg.Namespace, cfg.NodeID, cfg) +} + +// spawnOlricWithSystemd generates config and spawns Olric via systemd +func (cm *ClusterManager) spawnOlricWithSystemd(ctx context.Context, cfg olric.InstanceConfig) error { + // Generate Olric config file (YAML) + configDir := filepath.Join(cm.baseDataDir, cfg.Namespace, "configs") + if err := os.MkdirAll(configDir, 0755); err != nil { + return fmt.Errorf("failed to create config directory: %w", err) + } + + configPath := filepath.Join(configDir, fmt.Sprintf("olric-%s.yaml", cfg.NodeID)) + + // Generate Olric YAML config + type olricServerConfig struct { + BindAddr string `yaml:"bindAddr"` + BindPort int `yaml:"bindPort"` + } + type olricMemberlistConfig struct { + Environment string `yaml:"environment"` + BindAddr string `yaml:"bindAddr"` + BindPort int `yaml:"bindPort"` + Peers []string `yaml:"peers,omitempty"` + } + type olricConfig struct { + Server olricServerConfig `yaml:"server"` + Memberlist olricMemberlistConfig `yaml:"memberlist"` + PartitionCount uint64 `yaml:"partitionCount"` + } + + config := olricConfig{ + Server: olricServerConfig{ + BindAddr: cfg.BindAddr, + BindPort: cfg.HTTPPort, + }, + Memberlist: olricMemberlistConfig{ + Environment: "lan", + BindAddr: cfg.BindAddr, + BindPort: cfg.MemberlistPort, + Peers: cfg.PeerAddresses, + }, + PartitionCount: 12, // Optimized for namespace clusters (vs 256 default) + } + + configBytes, err := yaml.Marshal(config) + if err != nil { + return fmt.Errorf("failed to marshal Olric config: %w", err) + } + + if err := os.WriteFile(configPath, configBytes, 0644); err != nil { + return fmt.Errorf("failed to write Olric config: %w", err) + } + + // Start via systemd + return cm.systemdSpawner.SpawnOlric(ctx, cfg.Namespace, cfg.NodeID, cfg) +} + +// writePeersJSON writes RQLite peers.json file for Raft cluster recovery +func (cm *ClusterManager) writePeersJSON(dataDir string, peers []rqlite.RaftPeer) error { + raftDir := filepath.Join(dataDir, "raft") + if err := os.MkdirAll(raftDir, 0755); err != nil { + return fmt.Errorf("failed to create raft directory: %w", err) + } + + peersFile := filepath.Join(raftDir, "peers.json") + data, err := json.Marshal(peers) + if err != nil { + return fmt.Errorf("failed to marshal peers: %w", err) + } + + return os.WriteFile(peersFile, data, 0644) +} + +// spawnGatewayWithSystemd generates config and spawns Gateway via systemd +func (cm *ClusterManager) spawnGatewayWithSystemd(ctx context.Context, cfg gateway.InstanceConfig) error { + // Generate Gateway config file (YAML) + configDir := filepath.Join(cm.baseDataDir, cfg.Namespace, "configs") + if err := os.MkdirAll(configDir, 0755); err != nil { + return fmt.Errorf("failed to create config directory: %w", err) + } + + configPath := filepath.Join(configDir, fmt.Sprintf("gateway-%s.yaml", cfg.NodeID)) + + // Build Gateway YAML config + type gatewayYAMLConfig struct { + ListenAddr string `yaml:"listen_addr"` + ClientNamespace string `yaml:"client_namespace"` + RQLiteDSN string `yaml:"rqlite_dsn"` + GlobalRQLiteDSN string `yaml:"global_rqlite_dsn,omitempty"` + DomainName string `yaml:"domain_name"` + OlricServers []string `yaml:"olric_servers"` + OlricTimeout string `yaml:"olric_timeout"` + IPFSClusterAPIURL string `yaml:"ipfs_cluster_api_url"` + IPFSAPIURL string `yaml:"ipfs_api_url"` + IPFSTimeout string `yaml:"ipfs_timeout"` + IPFSReplicationFactor int `yaml:"ipfs_replication_factor"` + } + + gatewayConfig := gatewayYAMLConfig{ + ListenAddr: fmt.Sprintf(":%d", cfg.HTTPPort), + ClientNamespace: cfg.Namespace, + RQLiteDSN: cfg.RQLiteDSN, + GlobalRQLiteDSN: cfg.GlobalRQLiteDSN, + DomainName: cfg.BaseDomain, + OlricServers: cfg.OlricServers, + OlricTimeout: cfg.OlricTimeout.String(), + IPFSClusterAPIURL: cfg.IPFSClusterAPIURL, + IPFSAPIURL: cfg.IPFSAPIURL, + IPFSTimeout: cfg.IPFSTimeout.String(), + IPFSReplicationFactor: cfg.IPFSReplicationFactor, + } + + configBytes, err := yaml.Marshal(gatewayConfig) + if err != nil { + return fmt.Errorf("failed to marshal Gateway config: %w", err) + } + + if err := os.WriteFile(configPath, configBytes, 0644); err != nil { + return fmt.Errorf("failed to write Gateway config: %w", err) + } + + // Start via systemd + return cm.systemdSpawner.SpawnGateway(ctx, cfg.Namespace, cfg.NodeID, cfg) +} + // ProvisionCluster provisions a new 3-node cluster for a namespace // This is an async operation - returns immediately with cluster ID for polling func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, namespaceName, provisionedBy string) (*NamespaceCluster, error) { @@ -306,19 +427,23 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names IsLeader: true, } - var leaderInstance *rqlite.Instance var err error if nodes[0].NodeID == cm.localNodeID { cm.logger.Info("Spawning RQLite leader locally", zap.String("node", nodes[0].NodeID)) - leaderInstance, err = cm.rqliteSpawner.SpawnInstance(ctx, leaderCfg) + err = cm.spawnRQLiteWithSystemd(ctx, leaderCfg) + if err == nil { + // Create Instance object for consistency with existing code + instances[0] = &rqlite.Instance{ + Config: leaderCfg, + } + } } else { cm.logger.Info("Spawning RQLite leader remotely", zap.String("node", nodes[0].NodeID), zap.String("ip", nodes[0].InternalIP)) - leaderInstance, err = cm.spawnRQLiteRemote(ctx, nodes[0].InternalIP, leaderCfg) + instances[0], err = cm.spawnRQLiteRemote(ctx, nodes[0].InternalIP, leaderCfg) } if err != nil { return nil, fmt.Errorf("failed to start RQLite leader: %w", err) } - instances[0] = leaderInstance cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, nodes[0].NodeID, "RQLite leader started", nil) cm.logEvent(ctx, cluster.ID, EventRQLiteLeaderElected, nodes[0].NodeID, "RQLite leader elected", nil) @@ -344,7 +469,12 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names var followerInstance *rqlite.Instance if nodes[i].NodeID == cm.localNodeID { cm.logger.Info("Spawning RQLite follower locally", zap.String("node", nodes[i].NodeID)) - followerInstance, err = cm.rqliteSpawner.SpawnInstance(ctx, followerCfg) + err = cm.spawnRQLiteWithSystemd(ctx, followerCfg) + if err == nil { + followerInstance = &rqlite.Instance{ + Config: followerCfg, + } + } } else { cm.logger.Info("Spawning RQLite follower remotely", zap.String("node", nodes[i].NodeID), zap.String("ip", nodes[i].InternalIP)) followerInstance, err = cm.spawnRQLiteRemote(ctx, nodes[i].InternalIP, followerCfg) @@ -406,7 +536,20 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp defer wg.Done() if n.NodeID == cm.localNodeID { cm.logger.Info("Spawning Olric locally", zap.String("node", n.NodeID)) - instances[idx], errs[idx] = cm.olricSpawner.SpawnInstance(ctx, configs[idx]) + errs[idx] = cm.spawnOlricWithSystemd(ctx, configs[idx]) + if errs[idx] == nil { + instances[idx] = &olric.OlricInstance{ + Namespace: configs[idx].Namespace, + NodeID: configs[idx].NodeID, + HTTPPort: configs[idx].HTTPPort, + MemberlistPort: configs[idx].MemberlistPort, + BindAddr: configs[idx].BindAddr, + AdvertiseAddr: configs[idx].AdvertiseAddr, + PeerAddresses: configs[idx].PeerAddresses, + Status: olric.InstanceStatusRunning, + StartedAt: time.Now(), + } + } } else { cm.logger.Info("Spawning Olric remotely", zap.String("node", n.NodeID), zap.String("ip", n.InternalIP)) instances[idx], errs[idx] = cm.spawnOlricRemote(ctx, n.InternalIP, configs[idx]) @@ -496,7 +639,19 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name var err error if node.NodeID == cm.localNodeID { cm.logger.Info("Spawning Gateway locally", zap.String("node", node.NodeID)) - instance, err = cm.gatewaySpawner.SpawnInstance(ctx, cfg) + err = cm.spawnGatewayWithSystemd(ctx, cfg) + if err == nil { + instance = &gateway.GatewayInstance{ + Namespace: cfg.Namespace, + NodeID: cfg.NodeID, + HTTPPort: cfg.HTTPPort, + BaseDomain: cfg.BaseDomain, + RQLiteDSN: cfg.RQLiteDSN, + OlricServers: cfg.OlricServers, + Status: gateway.InstanceStatusRunning, + StartedAt: time.Now(), + } + } } else { cm.logger.Info("Spawning Gateway remotely", zap.String("node", node.NodeID), zap.String("ip", node.InternalIP)) instance, err = cm.spawnGatewayRemote(ctx, node.InternalIP, cfg) @@ -646,9 +801,7 @@ func (cm *ClusterManager) sendSpawnRequest(ctx context.Context, nodeIP string, r // stopRQLiteOnNode stops a RQLite instance on a node (local or remote) func (cm *ClusterManager) stopRQLiteOnNode(ctx context.Context, nodeID, nodeIP, namespace string, inst *rqlite.Instance) { if nodeID == cm.localNodeID { - if inst != nil { - cm.rqliteSpawner.StopInstance(ctx, inst) - } + cm.systemdSpawner.StopRQLite(ctx, namespace, nodeID) } else { cm.sendStopRequest(ctx, nodeIP, "stop-rqlite", namespace, nodeID) } @@ -657,7 +810,7 @@ func (cm *ClusterManager) stopRQLiteOnNode(ctx context.Context, nodeID, nodeIP, // stopOlricOnNode stops an Olric instance on a node (local or remote) func (cm *ClusterManager) stopOlricOnNode(ctx context.Context, nodeID, nodeIP, namespace string) { if nodeID == cm.localNodeID { - cm.olricSpawner.StopInstance(ctx, namespace, nodeID) + cm.systemdSpawner.StopOlric(ctx, namespace, nodeID) } else { cm.sendStopRequest(ctx, nodeIP, "stop-olric", namespace, nodeID) } @@ -666,7 +819,7 @@ func (cm *ClusterManager) stopOlricOnNode(ctx context.Context, nodeID, nodeIP, n // stopGatewayOnNode stops a Gateway instance on a node (local or remote) func (cm *ClusterManager) stopGatewayOnNode(ctx context.Context, nodeID, nodeIP, namespace string) { if nodeID == cm.localNodeID { - cm.gatewaySpawner.StopInstance(ctx, namespace, nodeID) + cm.systemdSpawner.StopGateway(ctx, namespace, nodeID) } else { cm.sendStopRequest(ctx, nodeIP, "stop-gateway", namespace, nodeID) } @@ -763,8 +916,8 @@ func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *Namespa func (cm *ClusterManager) rollbackProvisioning(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) { cm.logger.Info("Rolling back failed provisioning", zap.String("cluster_id", cluster.ID)) - // Stop Gateway instances (local only for now) - cm.gatewaySpawner.StopAllInstances(ctx, cluster.NamespaceName) + // Stop all namespace services (Gateway, Olric, RQLite) using systemd + cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName) // Stop Olric instances on each node if olricInstances != nil && nodes != nil { @@ -808,10 +961,8 @@ func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID in cm.logEvent(ctx, cluster.ID, EventDeprovisionStarted, "", "Cluster deprovisioning started", nil) cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusDeprovisioning, "") - // Stop all services - cm.gatewaySpawner.StopAllInstances(ctx, cluster.NamespaceName) - cm.olricSpawner.StopAllInstances(ctx, cluster.NamespaceName) - // Note: RQLite instances need to be stopped individually based on stored PIDs + // Stop all services using systemd + cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName) // Deallocate all ports cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID) @@ -1293,8 +1444,15 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n } // 1. Restore RQLite - if !cm.rqliteSpawner.IsInstanceRunning(pb.RQLiteHTTPPort) { - hasExistingData := cm.rqliteSpawner.HasExistingData(namespaceName, cm.localNodeID) + // Check if RQLite systemd service is already running + rqliteRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(namespaceName, systemd.ServiceTypeRQLite) + if !rqliteRunning { + // Check if RQLite data directory exists (has existing data) + dataDir := filepath.Join(cm.baseDataDir, namespaceName, "rqlite", cm.localNodeID) + hasExistingData := false + if _, err := os.Stat(filepath.Join(dataDir, "raft")); err == nil { + hasExistingData = true + } if hasExistingData { // Write peers.json for Raft cluster recovery (official RQLite mechanism). @@ -1309,8 +1467,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n NonVoter: false, }) } - dataDir := cm.rqliteSpawner.GetDataDir(namespaceName, cm.localNodeID) - if err := cm.rqliteSpawner.WritePeersJSON(dataDir, peers); err != nil { + if err := cm.writePeersJSON(dataDir, peers); err != nil { cm.logger.Error("Failed to write peers.json", zap.String("namespace", namespaceName), zap.Error(err)) } } @@ -1362,7 +1519,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n IsLeader: isLeader, } - if _, err := cm.rqliteSpawner.SpawnInstance(ctx, rqliteCfg); err != nil { + if err := cm.spawnRQLiteWithSystemd(ctx, rqliteCfg); err != nil { cm.logger.Error("Failed to restore RQLite", zap.String("namespace", namespaceName), zap.Error(err)) } else { cm.logger.Info("Restored RQLite instance", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort)) @@ -1397,7 +1554,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n PeerAddresses: peers, } - if _, err := cm.olricSpawner.SpawnInstance(ctx, olricCfg); err != nil { + if err := cm.spawnOlricWithSystemd(ctx, olricCfg); err != nil { cm.logger.Error("Failed to restore Olric", zap.String("namespace", namespaceName), zap.Error(err)) } else { cm.logger.Info("Restored Olric instance", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricHTTPPort)) @@ -1446,7 +1603,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n IPFSReplicationFactor: cm.ipfsReplicationFactor, } - if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil { + if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil { cm.logger.Error("Failed to restore Gateway", zap.String("namespace", namespaceName), zap.Error(err)) } else { cm.logger.Info("Restored Gateway instance", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort)) @@ -1596,8 +1753,15 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl localIP := state.LocalIP // 1. Restore RQLite - if !cm.rqliteSpawner.IsInstanceRunning(pb.RQLiteHTTPPort) { - hasExistingData := cm.rqliteSpawner.HasExistingData(state.NamespaceName, cm.localNodeID) + // Check if RQLite systemd service is already running + rqliteRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(state.NamespaceName, systemd.ServiceTypeRQLite) + if !rqliteRunning { + // Check if RQLite data directory exists (has existing data) + dataDir := filepath.Join(cm.baseDataDir, state.NamespaceName, "rqlite", cm.localNodeID) + hasExistingData := false + if _, err := os.Stat(filepath.Join(dataDir, "raft")); err == nil { + hasExistingData = true + } if hasExistingData { var peers []rqlite.RaftPeer @@ -1605,8 +1769,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort) peers = append(peers, rqlite.RaftPeer{ID: raftAddr, Address: raftAddr, NonVoter: false}) } - dataDir := cm.rqliteSpawner.GetDataDir(state.NamespaceName, cm.localNodeID) - if err := cm.rqliteSpawner.WritePeersJSON(dataDir, peers); err != nil { + if err := cm.writePeersJSON(dataDir, peers); err != nil { cm.logger.Error("Failed to write peers.json", zap.String("namespace", state.NamespaceName), zap.Error(err)) } } @@ -1643,7 +1806,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl JoinAddresses: joinAddrs, IsLeader: isLeader, } - if _, err := cm.rqliteSpawner.SpawnInstance(ctx, rqliteCfg); err != nil { + if err := cm.spawnRQLiteWithSystemd(ctx, rqliteCfg); err != nil { cm.logger.Error("Failed to restore RQLite from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) } else { cm.logger.Info("Restored RQLite instance from state", zap.String("namespace", state.NamespaceName)) @@ -1670,7 +1833,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl AdvertiseAddr: localIP, PeerAddresses: peers, } - if _, err := cm.olricSpawner.SpawnInstance(ctx, olricCfg); err != nil { + if err := cm.spawnOlricWithSystemd(ctx, olricCfg); err != nil { cm.logger.Error("Failed to restore Olric from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) } else { cm.logger.Info("Restored Olric instance from state", zap.String("namespace", state.NamespaceName)) @@ -1702,7 +1865,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, } - if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil { + if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil { cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) } else { cm.logger.Info("Restored Gateway instance from state", zap.String("namespace", state.NamespaceName)) diff --git a/pkg/namespace/systemd_spawner.go b/pkg/namespace/systemd_spawner.go new file mode 100644 index 0000000..2c2183c --- /dev/null +++ b/pkg/namespace/systemd_spawner.go @@ -0,0 +1,195 @@ +package namespace + +import ( + "context" + "fmt" + "time" + + "github.com/DeBrosOfficial/network/pkg/gateway" + "github.com/DeBrosOfficial/network/pkg/olric" + "github.com/DeBrosOfficial/network/pkg/rqlite" + "github.com/DeBrosOfficial/network/pkg/systemd" + "go.uber.org/zap" +) + +// SystemdSpawner spawns namespace cluster processes using systemd services +type SystemdSpawner struct { + systemdMgr *systemd.Manager + namespaceBase string + logger *zap.Logger +} + +// NewSystemdSpawner creates a new systemd-based spawner +func NewSystemdSpawner(namespaceBase string, logger *zap.Logger) *SystemdSpawner { + return &SystemdSpawner{ + systemdMgr: systemd.NewManager(namespaceBase, logger), + namespaceBase: namespaceBase, + logger: logger.With(zap.String("component", "systemd-spawner")), + } +} + +// SpawnRQLite starts a RQLite instance using systemd +func (s *SystemdSpawner) SpawnRQLite(ctx context.Context, namespace, nodeID string, cfg rqlite.InstanceConfig) error { + s.logger.Info("Spawning RQLite via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + // Build join arguments + joinArgs := "" + if len(cfg.JoinAddresses) > 0 { + joinArgs = fmt.Sprintf("-join %s", cfg.JoinAddresses[0]) + for _, addr := range cfg.JoinAddresses[1:] { + joinArgs += fmt.Sprintf(",%s", addr) + } + } + + // Generate environment file + envVars := map[string]string{ + "HTTP_ADDR": fmt.Sprintf("0.0.0.0:%d", cfg.HTTPPort), + "RAFT_ADDR": fmt.Sprintf("0.0.0.0:%d", cfg.RaftPort), + "HTTP_ADV_ADDR": cfg.HTTPAdvAddress, + "RAFT_ADV_ADDR": cfg.RaftAdvAddress, + "JOIN_ARGS": joinArgs, + "NODE_ID": nodeID, + } + + if err := s.systemdMgr.GenerateEnvFile(namespace, nodeID, systemd.ServiceTypeRQLite, envVars); err != nil { + return fmt.Errorf("failed to generate RQLite env file: %w", err) + } + + // Start the systemd service + if err := s.systemdMgr.StartService(namespace, systemd.ServiceTypeRQLite); err != nil { + return fmt.Errorf("failed to start RQLite service: %w", err) + } + + // Wait for service to be active + if err := s.waitForService(namespace, systemd.ServiceTypeRQLite, 30*time.Second); err != nil { + return fmt.Errorf("RQLite service did not become active: %w", err) + } + + s.logger.Info("RQLite spawned successfully via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + return nil +} + +// SpawnOlric starts an Olric instance using systemd +func (s *SystemdSpawner) SpawnOlric(ctx context.Context, namespace, nodeID string, cfg olric.InstanceConfig) error { + s.logger.Info("Spawning Olric via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + // Generate environment file (Olric uses OLRIC_SERVER_CONFIG env var, set in systemd unit) + envVars := map[string]string{ + // No additional env vars needed - Olric reads from config file + // The config file path is set in the systemd unit file + } + + if err := s.systemdMgr.GenerateEnvFile(namespace, nodeID, systemd.ServiceTypeOlric, envVars); err != nil { + return fmt.Errorf("failed to generate Olric env file: %w", err) + } + + // Start the systemd service + if err := s.systemdMgr.StartService(namespace, systemd.ServiceTypeOlric); err != nil { + return fmt.Errorf("failed to start Olric service: %w", err) + } + + // Wait for service to be active + if err := s.waitForService(namespace, systemd.ServiceTypeOlric, 30*time.Second); err != nil { + return fmt.Errorf("Olric service did not become active: %w", err) + } + + s.logger.Info("Olric spawned successfully via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + return nil +} + +// SpawnGateway starts a Gateway instance using systemd +func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error { + s.logger.Info("Spawning Gateway via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + // Generate environment file + envVars := map[string]string{ + // Gateway uses config file, no additional env vars needed + } + + if err := s.systemdMgr.GenerateEnvFile(namespace, nodeID, systemd.ServiceTypeGateway, envVars); err != nil { + return fmt.Errorf("failed to generate Gateway env file: %w", err) + } + + // Start the systemd service + if err := s.systemdMgr.StartService(namespace, systemd.ServiceTypeGateway); err != nil { + return fmt.Errorf("failed to start Gateway service: %w", err) + } + + // Wait for service to be active + if err := s.waitForService(namespace, systemd.ServiceTypeGateway, 30*time.Second); err != nil { + return fmt.Errorf("Gateway service did not become active: %w", err) + } + + s.logger.Info("Gateway spawned successfully via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + return nil +} + +// StopRQLite stops a RQLite instance +func (s *SystemdSpawner) StopRQLite(ctx context.Context, namespace, nodeID string) error { + s.logger.Info("Stopping RQLite via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + return s.systemdMgr.StopService(namespace, systemd.ServiceTypeRQLite) +} + +// StopOlric stops an Olric instance +func (s *SystemdSpawner) StopOlric(ctx context.Context, namespace, nodeID string) error { + s.logger.Info("Stopping Olric via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + return s.systemdMgr.StopService(namespace, systemd.ServiceTypeOlric) +} + +// StopGateway stops a Gateway instance +func (s *SystemdSpawner) StopGateway(ctx context.Context, namespace, nodeID string) error { + s.logger.Info("Stopping Gateway via systemd", + zap.String("namespace", namespace), + zap.String("node_id", nodeID)) + + return s.systemdMgr.StopService(namespace, systemd.ServiceTypeGateway) +} + +// StopAll stops all services for a namespace +func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error { + s.logger.Info("Stopping all namespace services via systemd", + zap.String("namespace", namespace)) + + return s.systemdMgr.StopAllNamespaceServices(namespace) +} + +// waitForService waits for a systemd service to become active +func (s *SystemdSpawner) waitForService(namespace string, serviceType systemd.ServiceType, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + active, err := s.systemdMgr.IsServiceActive(namespace, serviceType) + if err != nil { + return fmt.Errorf("failed to check service status: %w", err) + } + + if active { + return nil + } + + time.Sleep(1 * time.Second) + } + + return fmt.Errorf("service did not become active within %v", timeout) +} diff --git a/pkg/node/gateway.go b/pkg/node/gateway.go index 239a23f..d180fad 100644 --- a/pkg/node/gateway.go +++ b/pkg/node/gateway.go @@ -13,8 +13,6 @@ import ( "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/namespace" - olricpkg "github.com/DeBrosOfficial/network/pkg/olric" - rqlitepkg "github.com/DeBrosOfficial/network/pkg/rqlite" "go.uber.org/zap" ) @@ -81,10 +79,8 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { apiGateway.SetClusterProvisioner(clusterManager) // Wire spawn handler for distributed namespace instance spawning - rqliteSpawner := rqlitepkg.NewInstanceSpawner(baseDataDir, n.logger.Logger) - olricSpawner := olricpkg.NewInstanceSpawner(baseDataDir, n.logger.Logger) - gatewaySpawner := gateway.NewInstanceSpawner(baseDataDir, n.logger.Logger) - spawnHandler := namespacehandlers.NewSpawnHandler(rqliteSpawner, olricSpawner, gatewaySpawner, n.logger.Logger) + systemdSpawner := namespace.NewSystemdSpawner(baseDataDir, n.logger.Logger) + spawnHandler := namespacehandlers.NewSpawnHandler(systemdSpawner, n.logger.Logger) apiGateway.SetSpawnHandler(spawnHandler) // Wire namespace delete handler diff --git a/pkg/systemd/manager.go b/pkg/systemd/manager.go new file mode 100644 index 0000000..4c936a1 --- /dev/null +++ b/pkg/systemd/manager.go @@ -0,0 +1,326 @@ +package systemd + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + + "go.uber.org/zap" +) + +// ServiceType represents the type of namespace service +type ServiceType string + +const ( + ServiceTypeRQLite ServiceType = "rqlite" + ServiceTypeOlric ServiceType = "olric" + ServiceTypeGateway ServiceType = "gateway" +) + +// Manager manages systemd units for namespace services +type Manager struct { + logger *zap.Logger + systemdDir string + namespaceBase string // Base directory for namespace data +} + +// NewManager creates a new systemd manager +func NewManager(namespaceBase string, logger *zap.Logger) *Manager { + return &Manager{ + logger: logger.With(zap.String("component", "systemd-manager")), + systemdDir: "/etc/systemd/system", + namespaceBase: namespaceBase, + } +} + +// serviceName returns the systemd service name for a namespace and service type +func (m *Manager) serviceName(namespace string, serviceType ServiceType) string { + return fmt.Sprintf("debros-namespace-%s@%s.service", serviceType, namespace) +} + +// StartService starts a namespace service +func (m *Manager) StartService(namespace string, serviceType ServiceType) error { + svcName := m.serviceName(namespace, serviceType) + m.logger.Info("Starting systemd service", + zap.String("service", svcName), + zap.String("namespace", namespace)) + + cmd := exec.Command("systemctl", "start", svcName) + if output, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to start %s: %w\nOutput: %s", svcName, err, string(output)) + } + + m.logger.Info("Service started successfully", zap.String("service", svcName)) + return nil +} + +// StopService stops a namespace service +func (m *Manager) StopService(namespace string, serviceType ServiceType) error { + svcName := m.serviceName(namespace, serviceType) + m.logger.Info("Stopping systemd service", + zap.String("service", svcName), + zap.String("namespace", namespace)) + + cmd := exec.Command("systemctl", "stop", svcName) + if output, err := cmd.CombinedOutput(); err != nil { + // Don't error if service is already stopped or doesn't exist + if strings.Contains(string(output), "not loaded") || strings.Contains(string(output), "inactive") { + m.logger.Debug("Service already stopped or not loaded", zap.String("service", svcName)) + return nil + } + return fmt.Errorf("failed to stop %s: %w\nOutput: %s", svcName, err, string(output)) + } + + m.logger.Info("Service stopped successfully", zap.String("service", svcName)) + return nil +} + +// RestartService restarts a namespace service +func (m *Manager) RestartService(namespace string, serviceType ServiceType) error { + svcName := m.serviceName(namespace, serviceType) + m.logger.Info("Restarting systemd service", + zap.String("service", svcName), + zap.String("namespace", namespace)) + + cmd := exec.Command("systemctl", "restart", svcName) + if output, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to restart %s: %w\nOutput: %s", svcName, err, string(output)) + } + + m.logger.Info("Service restarted successfully", zap.String("service", svcName)) + return nil +} + +// EnableService enables a namespace service to start on boot +func (m *Manager) EnableService(namespace string, serviceType ServiceType) error { + svcName := m.serviceName(namespace, serviceType) + m.logger.Info("Enabling systemd service", + zap.String("service", svcName), + zap.String("namespace", namespace)) + + cmd := exec.Command("systemctl", "enable", svcName) + if output, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to enable %s: %w\nOutput: %s", svcName, err, string(output)) + } + + m.logger.Info("Service enabled successfully", zap.String("service", svcName)) + return nil +} + +// DisableService disables a namespace service +func (m *Manager) DisableService(namespace string, serviceType ServiceType) error { + svcName := m.serviceName(namespace, serviceType) + m.logger.Info("Disabling systemd service", + zap.String("service", svcName), + zap.String("namespace", namespace)) + + cmd := exec.Command("systemctl", "disable", svcName) + if output, err := cmd.CombinedOutput(); err != nil { + // Don't error if service is already disabled or doesn't exist + if strings.Contains(string(output), "not loaded") { + m.logger.Debug("Service not loaded", zap.String("service", svcName)) + return nil + } + return fmt.Errorf("failed to disable %s: %w\nOutput: %s", svcName, err, string(output)) + } + + m.logger.Info("Service disabled successfully", zap.String("service", svcName)) + return nil +} + +// IsServiceActive checks if a namespace service is active +func (m *Manager) IsServiceActive(namespace string, serviceType ServiceType) (bool, error) { + svcName := m.serviceName(namespace, serviceType) + cmd := exec.Command("systemctl", "is-active", svcName) + output, err := cmd.CombinedOutput() + + if err != nil { + // is-active returns exit code 3 if service is inactive + if strings.TrimSpace(string(output)) == "inactive" || strings.TrimSpace(string(output)) == "failed" { + return false, nil + } + return false, fmt.Errorf("failed to check service status: %w\nOutput: %s", err, string(output)) + } + + return strings.TrimSpace(string(output)) == "active", nil +} + +// ReloadDaemon reloads systemd daemon configuration +func (m *Manager) ReloadDaemon() error { + m.logger.Info("Reloading systemd daemon") + cmd := exec.Command("systemctl", "daemon-reload") + if output, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to reload systemd daemon: %w\nOutput: %s", err, string(output)) + } + return nil +} + +// StopAllNamespaceServices stops all namespace services for a given namespace +func (m *Manager) StopAllNamespaceServices(namespace string) error { + m.logger.Info("Stopping all namespace services", zap.String("namespace", namespace)) + + // Stop in reverse dependency order: Gateway → Olric → RQLite + services := []ServiceType{ServiceTypeGateway, ServiceTypeOlric, ServiceTypeRQLite} + for _, svcType := range services { + if err := m.StopService(namespace, svcType); err != nil { + m.logger.Warn("Failed to stop service", + zap.String("namespace", namespace), + zap.String("service_type", string(svcType)), + zap.Error(err)) + // Continue stopping other services even if one fails + } + } + + return nil +} + +// StartAllNamespaceServices starts all namespace services for a given namespace +func (m *Manager) StartAllNamespaceServices(namespace string) error { + m.logger.Info("Starting all namespace services", zap.String("namespace", namespace)) + + // Start in dependency order: RQLite → Olric → Gateway + services := []ServiceType{ServiceTypeRQLite, ServiceTypeOlric, ServiceTypeGateway} + for _, svcType := range services { + if err := m.StartService(namespace, svcType); err != nil { + return fmt.Errorf("failed to start %s service: %w", svcType, err) + } + } + + return nil +} + +// ListNamespaceServices returns all namespace services currently registered in systemd +func (m *Manager) ListNamespaceServices() ([]string, error) { + cmd := exec.Command("systemctl", "list-units", "--all", "--no-legend", "debros-namespace-*@*.service") + output, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failed to list namespace services: %w\nOutput: %s", err, string(output)) + } + + var services []string + lines := strings.Split(strings.TrimSpace(string(output)), "\n") + for _, line := range lines { + if line == "" { + continue + } + fields := strings.Fields(line) + if len(fields) > 0 { + services = append(services, fields[0]) + } + } + + return services, nil +} + +// StopAllNamespaceServicesGlobally stops ALL namespace services on this node (for upgrade/maintenance) +func (m *Manager) StopAllNamespaceServicesGlobally() error { + m.logger.Info("Stopping all namespace services globally") + + services, err := m.ListNamespaceServices() + if err != nil { + return fmt.Errorf("failed to list services: %w", err) + } + + for _, svc := range services { + m.logger.Info("Stopping service", zap.String("service", svc)) + cmd := exec.Command("systemctl", "stop", svc) + if output, err := cmd.CombinedOutput(); err != nil { + m.logger.Warn("Failed to stop service", + zap.String("service", svc), + zap.Error(err), + zap.String("output", string(output))) + // Continue stopping other services + } + } + + return nil +} + +// CleanupOrphanedProcesses finds and kills any orphaned namespace processes not managed by systemd +// This is for cleaning up after migration from old exec.Command approach +func (m *Manager) CleanupOrphanedProcesses() error { + m.logger.Info("Cleaning up orphaned namespace processes") + + // Find processes listening on namespace ports (10000-10999 range) + // This is a safety measure during migration + cmd := exec.Command("bash", "-c", "lsof -ti:10000-10999 2>/dev/null | xargs -r kill -TERM 2>/dev/null || true") + if output, err := cmd.CombinedOutput(); err != nil { + m.logger.Debug("Orphaned process cleanup completed", + zap.Error(err), + zap.String("output", string(output))) + } + + return nil +} + +// GenerateEnvFile creates the environment file for a namespace service +func (m *Manager) GenerateEnvFile(namespace, nodeID string, serviceType ServiceType, envVars map[string]string) error { + envDir := filepath.Join(m.namespaceBase, namespace) + if err := os.MkdirAll(envDir, 0755); err != nil { + return fmt.Errorf("failed to create env directory: %w", err) + } + + envFile := filepath.Join(envDir, fmt.Sprintf("%s.env", serviceType)) + + var content strings.Builder + content.WriteString("# Auto-generated environment file for namespace service\n") + content.WriteString(fmt.Sprintf("# Namespace: %s\n", namespace)) + content.WriteString(fmt.Sprintf("# Node ID: %s\n", nodeID)) + content.WriteString(fmt.Sprintf("# Service: %s\n\n", serviceType)) + + // Always include NODE_ID + content.WriteString(fmt.Sprintf("NODE_ID=%s\n", nodeID)) + + // Add all other environment variables + for key, value := range envVars { + content.WriteString(fmt.Sprintf("%s=%s\n", key, value)) + } + + if err := os.WriteFile(envFile, []byte(content.String()), 0644); err != nil { + return fmt.Errorf("failed to write env file: %w", err) + } + + m.logger.Info("Generated environment file", + zap.String("file", envFile), + zap.String("namespace", namespace), + zap.String("service_type", string(serviceType))) + + return nil +} + +// InstallTemplateUnits installs the systemd template unit files +func (m *Manager) InstallTemplateUnits(sourceDir string) error { + m.logger.Info("Installing systemd template units", zap.String("source", sourceDir)) + + templates := []string{ + "debros-namespace-rqlite@.service", + "debros-namespace-olric@.service", + "debros-namespace-gateway@.service", + } + + for _, template := range templates { + source := filepath.Join(sourceDir, template) + dest := filepath.Join(m.systemdDir, template) + + data, err := os.ReadFile(source) + if err != nil { + return fmt.Errorf("failed to read template %s: %w", template, err) + } + + if err := os.WriteFile(dest, data, 0644); err != nil { + return fmt.Errorf("failed to write template %s: %w", template, err) + } + + m.logger.Info("Installed template unit", zap.String("template", template)) + } + + // Reload systemd daemon to recognize new templates + if err := m.ReloadDaemon(); err != nil { + return fmt.Errorf("failed to reload systemd daemon: %w", err) + } + + m.logger.Info("All template units installed successfully") + return nil +} diff --git a/systemd/debros-namespace-gateway@.service b/systemd/debros-namespace-gateway@.service new file mode 100644 index 0000000..f80d85a --- /dev/null +++ b/systemd/debros-namespace-gateway@.service @@ -0,0 +1,32 @@ +[Unit] +Description=DeBros Namespace Gateway (%i) +Documentation=https://github.com/DeBrosOfficial/network +After=network.target debros-namespace-rqlite@%i.service debros-namespace-olric@%i.service +Requires=debros-namespace-rqlite@%i.service debros-namespace-olric@%i.service +PartOf=debros-node.service + +[Service] +Type=simple +User=debros +Group=debros +WorkingDirectory=/home/debros + +EnvironmentFile=/home/debros/.orama/data/namespaces/%i/gateway.env + +ExecStart=/home/debros/bin/gateway --config /home/debros/.orama/data/namespaces/%i/configs/gateway-${NODE_ID}.yaml + +TimeoutStopSec=30s +KillMode=mixed +KillSignal=SIGTERM + +Restart=on-failure +RestartSec=5s + +StandardOutput=journal +StandardError=journal +SyslogIdentifier=debros-gateway-%i + +LimitNOFILE=65536 + +[Install] +WantedBy=multi-user.target diff --git a/systemd/debros-namespace-olric@.service b/systemd/debros-namespace-olric@.service new file mode 100644 index 0000000..741daf9 --- /dev/null +++ b/systemd/debros-namespace-olric@.service @@ -0,0 +1,34 @@ +[Unit] +Description=DeBros Namespace Olric Cache (%i) +Documentation=https://github.com/DeBrosOfficial/network +After=network.target debros-namespace-rqlite@%i.service +Requires=debros-namespace-rqlite@%i.service +PartOf=debros-node.service + +[Service] +Type=simple +User=debros +Group=debros +WorkingDirectory=/home/debros + +# Olric reads config from environment variable +Environment="OLRIC_SERVER_CONFIG=/home/debros/.orama/data/namespaces/%i/configs/olric-${NODE_ID}.yaml" +EnvironmentFile=/home/debros/.orama/data/namespaces/%i/olric.env + +ExecStart=/usr/local/bin/olric-server + +TimeoutStopSec=30s +KillMode=mixed +KillSignal=SIGTERM + +Restart=on-failure +RestartSec=5s + +StandardOutput=journal +StandardError=journal +SyslogIdentifier=debros-olric-%i + +LimitNOFILE=65536 + +[Install] +WantedBy=multi-user.target diff --git a/systemd/debros-namespace-rqlite@.service b/systemd/debros-namespace-rqlite@.service new file mode 100644 index 0000000..9b0724f --- /dev/null +++ b/systemd/debros-namespace-rqlite@.service @@ -0,0 +1,44 @@ +[Unit] +Description=DeBros Namespace RQLite (%i) +Documentation=https://github.com/DeBrosOfficial/network +After=network.target +PartOf=debros-node.service +StopWhenUnneeded=false + +[Service] +Type=simple +User=debros +Group=debros +WorkingDirectory=/home/debros + +# Environment file contains namespace-specific config +EnvironmentFile=/home/debros/.orama/data/namespaces/%i/rqlite.env + +# Start rqlited with args from environment +ExecStart=/usr/local/bin/rqlited \ + -http-addr ${HTTP_ADDR} \ + -raft-addr ${RAFT_ADDR} \ + -http-adv-addr ${HTTP_ADV_ADDR} \ + -raft-adv-addr ${RAFT_ADV_ADDR} \ + ${JOIN_ARGS} \ + /home/debros/.orama/data/namespaces/%i/rqlite/${NODE_ID} + +# Graceful shutdown +TimeoutStopSec=30s +KillMode=mixed +KillSignal=SIGTERM + +# Restart policy +Restart=on-failure +RestartSec=5s + +# Logging +StandardOutput=journal +StandardError=journal +SyslogIdentifier=debros-rqlite-%i + +# Resource limits +LimitNOFILE=65536 + +[Install] +WantedBy=multi-user.target