Updated the way we spawn services on namespace added systemd

This commit is contained in:
anonpenguin23 2026-02-04 17:17:01 +02:00
parent f972358e78
commit c855b790f8
11 changed files with 1051 additions and 98 deletions

View File

@ -205,6 +205,12 @@ func (o *Orchestrator) executeGenesisFlow() error {
return fmt.Errorf("service creation failed: %w", err)
}
// Install namespace systemd template units
fmt.Printf("\n🔧 Phase 5b: Installing namespace systemd templates...\n")
if err := o.installNamespaceTemplates(); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Template installation warning: %v\n", err)
}
// Phase 7: Seed DNS records (with retry — migrations may still be running)
if o.flags.Nameserver && o.flags.BaseDomain != "" {
fmt.Printf("\n🌐 Phase 7: Seeding DNS records...\n")
@ -330,6 +336,12 @@ func (o *Orchestrator) executeJoinFlow() error {
return fmt.Errorf("service creation failed: %w", err)
}
// Install namespace systemd template units
fmt.Printf("\n🔧 Installing namespace systemd templates...\n")
if err := o.installNamespaceTemplates(); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Template installation warning: %v\n", err)
}
o.setup.LogSetupComplete(o.setup.NodePeerID)
fmt.Printf("✅ Production installation complete! Joined cluster via %s\n\n", o.flags.JoinAddress)
return nil
@ -529,3 +541,47 @@ func promptForBaseDomain() string {
return "orama-devnet.network"
}
}
// installNamespaceTemplates installs systemd template unit files for namespace services
func (o *Orchestrator) installNamespaceTemplates() error {
sourceDir := filepath.Join(o.oramaHome, "src", "systemd")
systemdDir := "/etc/systemd/system"
templates := []string{
"debros-namespace-rqlite@.service",
"debros-namespace-olric@.service",
"debros-namespace-gateway@.service",
}
installedCount := 0
for _, template := range templates {
sourcePath := filepath.Join(sourceDir, template)
destPath := filepath.Join(systemdDir, template)
// Read template file
data, err := os.ReadFile(sourcePath)
if err != nil {
fmt.Printf(" ⚠️ Warning: Failed to read %s: %v\n", template, err)
continue
}
// Write to systemd directory
if err := os.WriteFile(destPath, data, 0644); err != nil {
fmt.Printf(" ⚠️ Warning: Failed to install %s: %v\n", template, err)
continue
}
installedCount++
fmt.Printf(" ✓ Installed %s\n", template)
}
if installedCount > 0 {
// Reload systemd daemon to pick up new templates
if err := exec.Command("systemctl", "daemon-reload").Run(); err != nil {
return fmt.Errorf("failed to reload systemd daemon: %w", err)
}
fmt.Printf(" ✓ Systemd daemon reloaded (%d templates installed)\n", installedCount)
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"os/exec"
"strings"
"time"
"github.com/DeBrosOfficial/network/pkg/cli/utils"
@ -18,12 +19,18 @@ func HandleStop() {
fmt.Printf("Stopping all DeBros production services...\n")
// First, stop all namespace services
fmt.Printf("\n Stopping namespace services...\n")
stopAllNamespaceServices()
services := utils.GetProductionServices()
if len(services) == 0 {
fmt.Printf(" ⚠️ No DeBros services found\n")
return
}
fmt.Printf("\n Stopping main services...\n")
// First, disable all services to prevent auto-restart
disableArgs := []string{"disable"}
disableArgs = append(disableArgs, services...)
@ -110,3 +117,40 @@ func HandleStop() {
fmt.Printf(" Use 'orama prod start' to start and re-enable services\n")
}
}
// stopAllNamespaceServices stops all running namespace services
func stopAllNamespaceServices() {
// Find all running namespace services using systemctl list-units
cmd := exec.Command("systemctl", "list-units", "--type=service", "--all", "--no-pager", "--no-legend", "debros-namespace-*@*.service")
output, err := cmd.Output()
if err != nil {
fmt.Printf(" ⚠️ Warning: Failed to list namespace services: %v\n", err)
return
}
lines := strings.Split(string(output), "\n")
var namespaceServices []string
for _, line := range lines {
fields := strings.Fields(line)
if len(fields) > 0 {
serviceName := fields[0]
if strings.HasPrefix(serviceName, "debros-namespace-") {
namespaceServices = append(namespaceServices, serviceName)
}
}
}
if len(namespaceServices) == 0 {
fmt.Printf(" No namespace services found\n")
return
}
// Stop all namespace services
for _, svc := range namespaceServices {
if err := exec.Command("systemctl", "stop", svc).Run(); err != nil {
fmt.Printf(" ⚠️ Warning: Failed to stop %s: %v\n", svc, err)
}
}
fmt.Printf(" ✓ Stopped %d namespace service(s)\n", len(namespaceServices))
}

View File

@ -156,6 +156,12 @@ func (o *Orchestrator) Execute() error {
fmt.Fprintf(os.Stderr, "⚠️ Service update warning: %v\n", err)
}
// Install namespace systemd template units
fmt.Printf("\n🔧 Phase 5b: Installing namespace systemd templates...\n")
if err := o.installNamespaceTemplates(); err != nil {
fmt.Fprintf(os.Stderr, "⚠️ Template installation warning: %v\n", err)
}
// Re-apply UFW firewall rules (idempotent)
fmt.Printf("\n🛡 Re-applying firewall rules...\n")
if err := o.setup.Phase6bSetupFirewall(false); err != nil {
@ -333,6 +339,13 @@ func (o *Orchestrator) stopServices() error {
fmt.Printf("\n⏹ Stopping all services before upgrade...\n")
serviceController := production.NewSystemdController()
// First, stop all namespace services (debros-namespace-*@*.service)
fmt.Printf(" Stopping namespace services...\n")
if err := o.stopAllNamespaceServices(serviceController); err != nil {
fmt.Printf(" ⚠️ Warning: Failed to stop namespace services: %v\n", err)
}
// Stop services in reverse dependency order
services := []string{
"caddy.service", // Depends on node
@ -360,6 +373,82 @@ func (o *Orchestrator) stopServices() error {
return nil
}
// stopAllNamespaceServices stops all running namespace services
func (o *Orchestrator) stopAllNamespaceServices(serviceController *production.SystemdController) error {
// Find all running namespace services using systemctl list-units
cmd := exec.Command("systemctl", "list-units", "--type=service", "--state=running", "--no-pager", "--no-legend", "debros-namespace-*@*.service")
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("failed to list namespace services: %w", err)
}
lines := strings.Split(string(output), "\n")
stoppedCount := 0
for _, line := range lines {
fields := strings.Fields(line)
if len(fields) > 0 {
serviceName := fields[0]
if strings.HasPrefix(serviceName, "debros-namespace-") {
if err := serviceController.StopService(serviceName); err != nil {
fmt.Printf(" ⚠️ Warning: Failed to stop %s: %v\n", serviceName, err)
} else {
stoppedCount++
}
}
}
}
if stoppedCount > 0 {
fmt.Printf(" ✓ Stopped %d namespace service(s)\n", stoppedCount)
}
return nil
}
// installNamespaceTemplates installs systemd template unit files for namespace services
func (o *Orchestrator) installNamespaceTemplates() error {
sourceDir := filepath.Join(o.oramaHome, "src", "systemd")
systemdDir := "/etc/systemd/system"
templates := []string{
"debros-namespace-rqlite@.service",
"debros-namespace-olric@.service",
"debros-namespace-gateway@.service",
}
installedCount := 0
for _, template := range templates {
sourcePath := filepath.Join(sourceDir, template)
destPath := filepath.Join(systemdDir, template)
// Read template file
data, err := os.ReadFile(sourcePath)
if err != nil {
fmt.Printf(" ⚠️ Warning: Failed to read %s: %v\n", template, err)
continue
}
// Write to systemd directory
if err := os.WriteFile(destPath, data, 0644); err != nil {
fmt.Printf(" ⚠️ Warning: Failed to install %s: %v\n", template, err)
continue
}
installedCount++
fmt.Printf(" ✓ Installed %s\n", template)
}
if installedCount > 0 {
// Reload systemd daemon to pick up new templates
if err := exec.Command("systemctl", "daemon-reload").Run(); err != nil {
return fmt.Errorf("failed to reload systemd daemon: %w", err)
}
fmt.Printf(" ✓ Systemd daemon reloaded (%d templates installed)\n", installedCount)
}
return nil
}
func (o *Orchestrator) extractPeers() []string {
nodeConfigPath := filepath.Join(o.oramaDir, "configs", "node.yaml")
var peers []string

View File

@ -5,10 +5,10 @@ import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/gateway"
namespacepkg "github.com/DeBrosOfficial/network/pkg/namespace"
"github.com/DeBrosOfficial/network/pkg/olric"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"go.uber.org/zap"
@ -54,24 +54,17 @@ type SpawnResponse struct {
}
// SpawnHandler handles remote namespace instance spawn/stop requests.
// It tracks spawned RQLite instances locally so they can be stopped later.
// Now uses systemd for service management instead of direct process spawning.
type SpawnHandler struct {
rqliteSpawner *rqlite.InstanceSpawner
olricSpawner *olric.InstanceSpawner
gatewaySpawner *gateway.InstanceSpawner
logger *zap.Logger
rqliteInstances map[string]*rqlite.Instance // key: "namespace:nodeID"
rqliteInstanceMu sync.RWMutex
systemdSpawner *namespacepkg.SystemdSpawner
logger *zap.Logger
}
// NewSpawnHandler creates a new spawn handler
func NewSpawnHandler(rs *rqlite.InstanceSpawner, os *olric.InstanceSpawner, gs *gateway.InstanceSpawner, logger *zap.Logger) *SpawnHandler {
func NewSpawnHandler(systemdSpawner *namespacepkg.SystemdSpawner, logger *zap.Logger) *SpawnHandler {
return &SpawnHandler{
rqliteSpawner: rs,
olricSpawner: os,
gatewaySpawner: gs,
logger: logger.With(zap.String("component", "namespace-spawn-handler")),
rqliteInstances: make(map[string]*rqlite.Instance),
systemdSpawner: systemdSpawner,
logger: logger.With(zap.String("component", "namespace-spawn-handler")),
}
}
@ -121,18 +114,12 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
JoinAddresses: req.RQLiteJoinAddrs,
IsLeader: req.RQLiteIsLeader,
}
instance, err := h.rqliteSpawner.SpawnInstance(ctx, cfg)
if err != nil {
if err := h.systemdSpawner.SpawnRQLite(ctx, req.Namespace, req.NodeID, cfg); err != nil {
h.logger.Error("Failed to spawn RQLite instance", zap.Error(err))
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
return
}
// Track the instance for later stop requests
key := fmt.Sprintf("%s:%s", req.Namespace, req.NodeID)
h.rqliteInstanceMu.Lock()
h.rqliteInstances[key] = instance
h.rqliteInstanceMu.Unlock()
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID})
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
case "spawn-olric":
cfg := olric.InstanceConfig{
@ -144,27 +131,15 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
AdvertiseAddr: req.OlricAdvertiseAddr,
PeerAddresses: req.OlricPeerAddresses,
}
instance, err := h.olricSpawner.SpawnInstance(ctx, cfg)
if err != nil {
if err := h.systemdSpawner.SpawnOlric(ctx, req.Namespace, req.NodeID, cfg); err != nil {
h.logger.Error("Failed to spawn Olric instance", zap.Error(err))
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
return
}
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID})
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
case "stop-rqlite":
key := fmt.Sprintf("%s:%s", req.Namespace, req.NodeID)
h.rqliteInstanceMu.Lock()
instance, ok := h.rqliteInstances[key]
if ok {
delete(h.rqliteInstances, key)
}
h.rqliteInstanceMu.Unlock()
if !ok {
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) // Already stopped
return
}
if err := h.rqliteSpawner.StopInstance(ctx, instance); err != nil {
if err := h.systemdSpawner.StopRQLite(ctx, req.Namespace, req.NodeID); err != nil {
h.logger.Error("Failed to stop RQLite instance", zap.Error(err))
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
return
@ -172,7 +147,7 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
case "stop-olric":
if err := h.olricSpawner.StopInstance(ctx, req.Namespace, req.NodeID); err != nil {
if err := h.systemdSpawner.StopOlric(ctx, req.Namespace, req.NodeID); err != nil {
h.logger.Error("Failed to stop Olric instance", zap.Error(err))
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
return
@ -203,16 +178,15 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
IPFSTimeout: ipfsTimeout,
IPFSReplicationFactor: req.IPFSReplicationFactor,
}
instance, err := h.gatewaySpawner.SpawnInstance(ctx, cfg)
if err != nil {
if err := h.systemdSpawner.SpawnGateway(ctx, req.Namespace, req.NodeID, cfg); err != nil {
h.logger.Error("Failed to spawn Gateway instance", zap.Error(err))
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
return
}
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID})
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
case "stop-gateway":
if err := h.gatewaySpawner.StopInstance(ctx, req.Namespace, req.NodeID); err != nil {
if err := h.systemdSpawner.StopGateway(ctx, req.Namespace, req.NodeID); err != nil {
h.logger.Error("Failed to stop Gateway instance", zap.Error(err))
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
return

View File

@ -18,8 +18,10 @@ import (
"github.com/DeBrosOfficial/network/pkg/gateway"
"github.com/DeBrosOfficial/network/pkg/olric"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"github.com/DeBrosOfficial/network/pkg/systemd"
"github.com/google/uuid"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
// ClusterManagerConfig contains configuration for the cluster manager
@ -36,12 +38,10 @@ type ClusterManagerConfig struct {
// ClusterManager orchestrates namespace cluster provisioning and lifecycle
type ClusterManager struct {
db rqlite.Client
portAllocator *NamespacePortAllocator
nodeSelector *ClusterNodeSelector
rqliteSpawner *rqlite.InstanceSpawner
olricSpawner *olric.InstanceSpawner
gatewaySpawner *gateway.InstanceSpawner
db rqlite.Client
portAllocator *NamespacePortAllocator
nodeSelector *ClusterNodeSelector
systemdSpawner *SystemdSpawner // NEW: Systemd-based spawner replaces old spawners
logger *zap.Logger
baseDomain string
baseDataDir string
@ -70,9 +70,7 @@ func NewClusterManager(
// Create internal components
portAllocator := NewNamespacePortAllocator(db, logger)
nodeSelector := NewClusterNodeSelector(db, portAllocator, logger)
rqliteSpawner := rqlite.NewInstanceSpawner(cfg.BaseDataDir, logger)
olricSpawner := olric.NewInstanceSpawner(cfg.BaseDataDir, logger)
gatewaySpawner := gateway.NewInstanceSpawner(cfg.BaseDataDir, logger)
systemdSpawner := NewSystemdSpawner(cfg.BaseDataDir, logger)
// Set IPFS defaults
ipfsClusterAPIURL := cfg.IPFSClusterAPIURL
@ -96,9 +94,7 @@ func NewClusterManager(
db: db,
portAllocator: portAllocator,
nodeSelector: nodeSelector,
rqliteSpawner: rqliteSpawner,
olricSpawner: olricSpawner,
gatewaySpawner: gatewaySpawner,
systemdSpawner: systemdSpawner,
baseDomain: cfg.BaseDomain,
baseDataDir: cfg.BaseDataDir,
globalRQLiteDSN: cfg.GlobalRQLiteDSN,
@ -116,9 +112,7 @@ func NewClusterManagerWithComponents(
db rqlite.Client,
portAllocator *NamespacePortAllocator,
nodeSelector *ClusterNodeSelector,
rqliteSpawner *rqlite.InstanceSpawner,
olricSpawner *olric.InstanceSpawner,
gatewaySpawner *gateway.InstanceSpawner,
systemdSpawner *SystemdSpawner,
cfg ClusterManagerConfig,
logger *zap.Logger,
) *ClusterManager {
@ -144,9 +138,7 @@ func NewClusterManagerWithComponents(
db: db,
portAllocator: portAllocator,
nodeSelector: nodeSelector,
rqliteSpawner: rqliteSpawner,
olricSpawner: olricSpawner,
gatewaySpawner: gatewaySpawner,
systemdSpawner: systemdSpawner,
baseDomain: cfg.BaseDomain,
baseDataDir: cfg.BaseDataDir,
globalRQLiteDSN: cfg.GlobalRQLiteDSN,
@ -165,6 +157,135 @@ func (cm *ClusterManager) SetLocalNodeID(id string) {
cm.logger.Info("Local node ID set for distributed provisioning", zap.String("local_node_id", id))
}
// spawnRQLiteWithSystemd generates config and spawns RQLite via systemd
func (cm *ClusterManager) spawnRQLiteWithSystemd(ctx context.Context, cfg rqlite.InstanceConfig) error {
// RQLite uses command-line args, no config file needed
// Just call systemd spawner which will generate env file and start service
return cm.systemdSpawner.SpawnRQLite(ctx, cfg.Namespace, cfg.NodeID, cfg)
}
// spawnOlricWithSystemd generates config and spawns Olric via systemd
func (cm *ClusterManager) spawnOlricWithSystemd(ctx context.Context, cfg olric.InstanceConfig) error {
// Generate Olric config file (YAML)
configDir := filepath.Join(cm.baseDataDir, cfg.Namespace, "configs")
if err := os.MkdirAll(configDir, 0755); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
configPath := filepath.Join(configDir, fmt.Sprintf("olric-%s.yaml", cfg.NodeID))
// Generate Olric YAML config
type olricServerConfig struct {
BindAddr string `yaml:"bindAddr"`
BindPort int `yaml:"bindPort"`
}
type olricMemberlistConfig struct {
Environment string `yaml:"environment"`
BindAddr string `yaml:"bindAddr"`
BindPort int `yaml:"bindPort"`
Peers []string `yaml:"peers,omitempty"`
}
type olricConfig struct {
Server olricServerConfig `yaml:"server"`
Memberlist olricMemberlistConfig `yaml:"memberlist"`
PartitionCount uint64 `yaml:"partitionCount"`
}
config := olricConfig{
Server: olricServerConfig{
BindAddr: cfg.BindAddr,
BindPort: cfg.HTTPPort,
},
Memberlist: olricMemberlistConfig{
Environment: "lan",
BindAddr: cfg.BindAddr,
BindPort: cfg.MemberlistPort,
Peers: cfg.PeerAddresses,
},
PartitionCount: 12, // Optimized for namespace clusters (vs 256 default)
}
configBytes, err := yaml.Marshal(config)
if err != nil {
return fmt.Errorf("failed to marshal Olric config: %w", err)
}
if err := os.WriteFile(configPath, configBytes, 0644); err != nil {
return fmt.Errorf("failed to write Olric config: %w", err)
}
// Start via systemd
return cm.systemdSpawner.SpawnOlric(ctx, cfg.Namespace, cfg.NodeID, cfg)
}
// writePeersJSON writes RQLite peers.json file for Raft cluster recovery
func (cm *ClusterManager) writePeersJSON(dataDir string, peers []rqlite.RaftPeer) error {
raftDir := filepath.Join(dataDir, "raft")
if err := os.MkdirAll(raftDir, 0755); err != nil {
return fmt.Errorf("failed to create raft directory: %w", err)
}
peersFile := filepath.Join(raftDir, "peers.json")
data, err := json.Marshal(peers)
if err != nil {
return fmt.Errorf("failed to marshal peers: %w", err)
}
return os.WriteFile(peersFile, data, 0644)
}
// spawnGatewayWithSystemd generates config and spawns Gateway via systemd
func (cm *ClusterManager) spawnGatewayWithSystemd(ctx context.Context, cfg gateway.InstanceConfig) error {
// Generate Gateway config file (YAML)
configDir := filepath.Join(cm.baseDataDir, cfg.Namespace, "configs")
if err := os.MkdirAll(configDir, 0755); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
configPath := filepath.Join(configDir, fmt.Sprintf("gateway-%s.yaml", cfg.NodeID))
// Build Gateway YAML config
type gatewayYAMLConfig struct {
ListenAddr string `yaml:"listen_addr"`
ClientNamespace string `yaml:"client_namespace"`
RQLiteDSN string `yaml:"rqlite_dsn"`
GlobalRQLiteDSN string `yaml:"global_rqlite_dsn,omitempty"`
DomainName string `yaml:"domain_name"`
OlricServers []string `yaml:"olric_servers"`
OlricTimeout string `yaml:"olric_timeout"`
IPFSClusterAPIURL string `yaml:"ipfs_cluster_api_url"`
IPFSAPIURL string `yaml:"ipfs_api_url"`
IPFSTimeout string `yaml:"ipfs_timeout"`
IPFSReplicationFactor int `yaml:"ipfs_replication_factor"`
}
gatewayConfig := gatewayYAMLConfig{
ListenAddr: fmt.Sprintf(":%d", cfg.HTTPPort),
ClientNamespace: cfg.Namespace,
RQLiteDSN: cfg.RQLiteDSN,
GlobalRQLiteDSN: cfg.GlobalRQLiteDSN,
DomainName: cfg.BaseDomain,
OlricServers: cfg.OlricServers,
OlricTimeout: cfg.OlricTimeout.String(),
IPFSClusterAPIURL: cfg.IPFSClusterAPIURL,
IPFSAPIURL: cfg.IPFSAPIURL,
IPFSTimeout: cfg.IPFSTimeout.String(),
IPFSReplicationFactor: cfg.IPFSReplicationFactor,
}
configBytes, err := yaml.Marshal(gatewayConfig)
if err != nil {
return fmt.Errorf("failed to marshal Gateway config: %w", err)
}
if err := os.WriteFile(configPath, configBytes, 0644); err != nil {
return fmt.Errorf("failed to write Gateway config: %w", err)
}
// Start via systemd
return cm.systemdSpawner.SpawnGateway(ctx, cfg.Namespace, cfg.NodeID, cfg)
}
// ProvisionCluster provisions a new 3-node cluster for a namespace
// This is an async operation - returns immediately with cluster ID for polling
func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, namespaceName, provisionedBy string) (*NamespaceCluster, error) {
@ -306,19 +427,23 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names
IsLeader: true,
}
var leaderInstance *rqlite.Instance
var err error
if nodes[0].NodeID == cm.localNodeID {
cm.logger.Info("Spawning RQLite leader locally", zap.String("node", nodes[0].NodeID))
leaderInstance, err = cm.rqliteSpawner.SpawnInstance(ctx, leaderCfg)
err = cm.spawnRQLiteWithSystemd(ctx, leaderCfg)
if err == nil {
// Create Instance object for consistency with existing code
instances[0] = &rqlite.Instance{
Config: leaderCfg,
}
}
} else {
cm.logger.Info("Spawning RQLite leader remotely", zap.String("node", nodes[0].NodeID), zap.String("ip", nodes[0].InternalIP))
leaderInstance, err = cm.spawnRQLiteRemote(ctx, nodes[0].InternalIP, leaderCfg)
instances[0], err = cm.spawnRQLiteRemote(ctx, nodes[0].InternalIP, leaderCfg)
}
if err != nil {
return nil, fmt.Errorf("failed to start RQLite leader: %w", err)
}
instances[0] = leaderInstance
cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, nodes[0].NodeID, "RQLite leader started", nil)
cm.logEvent(ctx, cluster.ID, EventRQLiteLeaderElected, nodes[0].NodeID, "RQLite leader elected", nil)
@ -344,7 +469,12 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names
var followerInstance *rqlite.Instance
if nodes[i].NodeID == cm.localNodeID {
cm.logger.Info("Spawning RQLite follower locally", zap.String("node", nodes[i].NodeID))
followerInstance, err = cm.rqliteSpawner.SpawnInstance(ctx, followerCfg)
err = cm.spawnRQLiteWithSystemd(ctx, followerCfg)
if err == nil {
followerInstance = &rqlite.Instance{
Config: followerCfg,
}
}
} else {
cm.logger.Info("Spawning RQLite follower remotely", zap.String("node", nodes[i].NodeID), zap.String("ip", nodes[i].InternalIP))
followerInstance, err = cm.spawnRQLiteRemote(ctx, nodes[i].InternalIP, followerCfg)
@ -406,7 +536,20 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp
defer wg.Done()
if n.NodeID == cm.localNodeID {
cm.logger.Info("Spawning Olric locally", zap.String("node", n.NodeID))
instances[idx], errs[idx] = cm.olricSpawner.SpawnInstance(ctx, configs[idx])
errs[idx] = cm.spawnOlricWithSystemd(ctx, configs[idx])
if errs[idx] == nil {
instances[idx] = &olric.OlricInstance{
Namespace: configs[idx].Namespace,
NodeID: configs[idx].NodeID,
HTTPPort: configs[idx].HTTPPort,
MemberlistPort: configs[idx].MemberlistPort,
BindAddr: configs[idx].BindAddr,
AdvertiseAddr: configs[idx].AdvertiseAddr,
PeerAddresses: configs[idx].PeerAddresses,
Status: olric.InstanceStatusRunning,
StartedAt: time.Now(),
}
}
} else {
cm.logger.Info("Spawning Olric remotely", zap.String("node", n.NodeID), zap.String("ip", n.InternalIP))
instances[idx], errs[idx] = cm.spawnOlricRemote(ctx, n.InternalIP, configs[idx])
@ -496,7 +639,19 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name
var err error
if node.NodeID == cm.localNodeID {
cm.logger.Info("Spawning Gateway locally", zap.String("node", node.NodeID))
instance, err = cm.gatewaySpawner.SpawnInstance(ctx, cfg)
err = cm.spawnGatewayWithSystemd(ctx, cfg)
if err == nil {
instance = &gateway.GatewayInstance{
Namespace: cfg.Namespace,
NodeID: cfg.NodeID,
HTTPPort: cfg.HTTPPort,
BaseDomain: cfg.BaseDomain,
RQLiteDSN: cfg.RQLiteDSN,
OlricServers: cfg.OlricServers,
Status: gateway.InstanceStatusRunning,
StartedAt: time.Now(),
}
}
} else {
cm.logger.Info("Spawning Gateway remotely", zap.String("node", node.NodeID), zap.String("ip", node.InternalIP))
instance, err = cm.spawnGatewayRemote(ctx, node.InternalIP, cfg)
@ -646,9 +801,7 @@ func (cm *ClusterManager) sendSpawnRequest(ctx context.Context, nodeIP string, r
// stopRQLiteOnNode stops a RQLite instance on a node (local or remote)
func (cm *ClusterManager) stopRQLiteOnNode(ctx context.Context, nodeID, nodeIP, namespace string, inst *rqlite.Instance) {
if nodeID == cm.localNodeID {
if inst != nil {
cm.rqliteSpawner.StopInstance(ctx, inst)
}
cm.systemdSpawner.StopRQLite(ctx, namespace, nodeID)
} else {
cm.sendStopRequest(ctx, nodeIP, "stop-rqlite", namespace, nodeID)
}
@ -657,7 +810,7 @@ func (cm *ClusterManager) stopRQLiteOnNode(ctx context.Context, nodeID, nodeIP,
// stopOlricOnNode stops an Olric instance on a node (local or remote)
func (cm *ClusterManager) stopOlricOnNode(ctx context.Context, nodeID, nodeIP, namespace string) {
if nodeID == cm.localNodeID {
cm.olricSpawner.StopInstance(ctx, namespace, nodeID)
cm.systemdSpawner.StopOlric(ctx, namespace, nodeID)
} else {
cm.sendStopRequest(ctx, nodeIP, "stop-olric", namespace, nodeID)
}
@ -666,7 +819,7 @@ func (cm *ClusterManager) stopOlricOnNode(ctx context.Context, nodeID, nodeIP, n
// stopGatewayOnNode stops a Gateway instance on a node (local or remote)
func (cm *ClusterManager) stopGatewayOnNode(ctx context.Context, nodeID, nodeIP, namespace string) {
if nodeID == cm.localNodeID {
cm.gatewaySpawner.StopInstance(ctx, namespace, nodeID)
cm.systemdSpawner.StopGateway(ctx, namespace, nodeID)
} else {
cm.sendStopRequest(ctx, nodeIP, "stop-gateway", namespace, nodeID)
}
@ -763,8 +916,8 @@ func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *Namespa
func (cm *ClusterManager) rollbackProvisioning(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) {
cm.logger.Info("Rolling back failed provisioning", zap.String("cluster_id", cluster.ID))
// Stop Gateway instances (local only for now)
cm.gatewaySpawner.StopAllInstances(ctx, cluster.NamespaceName)
// Stop all namespace services (Gateway, Olric, RQLite) using systemd
cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName)
// Stop Olric instances on each node
if olricInstances != nil && nodes != nil {
@ -808,10 +961,8 @@ func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID in
cm.logEvent(ctx, cluster.ID, EventDeprovisionStarted, "", "Cluster deprovisioning started", nil)
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusDeprovisioning, "")
// Stop all services
cm.gatewaySpawner.StopAllInstances(ctx, cluster.NamespaceName)
cm.olricSpawner.StopAllInstances(ctx, cluster.NamespaceName)
// Note: RQLite instances need to be stopped individually based on stored PIDs
// Stop all services using systemd
cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName)
// Deallocate all ports
cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID)
@ -1293,8 +1444,15 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
}
// 1. Restore RQLite
if !cm.rqliteSpawner.IsInstanceRunning(pb.RQLiteHTTPPort) {
hasExistingData := cm.rqliteSpawner.HasExistingData(namespaceName, cm.localNodeID)
// Check if RQLite systemd service is already running
rqliteRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(namespaceName, systemd.ServiceTypeRQLite)
if !rqliteRunning {
// Check if RQLite data directory exists (has existing data)
dataDir := filepath.Join(cm.baseDataDir, namespaceName, "rqlite", cm.localNodeID)
hasExistingData := false
if _, err := os.Stat(filepath.Join(dataDir, "raft")); err == nil {
hasExistingData = true
}
if hasExistingData {
// Write peers.json for Raft cluster recovery (official RQLite mechanism).
@ -1309,8 +1467,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
NonVoter: false,
})
}
dataDir := cm.rqliteSpawner.GetDataDir(namespaceName, cm.localNodeID)
if err := cm.rqliteSpawner.WritePeersJSON(dataDir, peers); err != nil {
if err := cm.writePeersJSON(dataDir, peers); err != nil {
cm.logger.Error("Failed to write peers.json", zap.String("namespace", namespaceName), zap.Error(err))
}
}
@ -1362,7 +1519,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
IsLeader: isLeader,
}
if _, err := cm.rqliteSpawner.SpawnInstance(ctx, rqliteCfg); err != nil {
if err := cm.spawnRQLiteWithSystemd(ctx, rqliteCfg); err != nil {
cm.logger.Error("Failed to restore RQLite", zap.String("namespace", namespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored RQLite instance", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort))
@ -1397,7 +1554,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
PeerAddresses: peers,
}
if _, err := cm.olricSpawner.SpawnInstance(ctx, olricCfg); err != nil {
if err := cm.spawnOlricWithSystemd(ctx, olricCfg); err != nil {
cm.logger.Error("Failed to restore Olric", zap.String("namespace", namespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored Olric instance", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricHTTPPort))
@ -1446,7 +1603,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
IPFSReplicationFactor: cm.ipfsReplicationFactor,
}
if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil {
if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil {
cm.logger.Error("Failed to restore Gateway", zap.String("namespace", namespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored Gateway instance", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort))
@ -1596,8 +1753,15 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
localIP := state.LocalIP
// 1. Restore RQLite
if !cm.rqliteSpawner.IsInstanceRunning(pb.RQLiteHTTPPort) {
hasExistingData := cm.rqliteSpawner.HasExistingData(state.NamespaceName, cm.localNodeID)
// Check if RQLite systemd service is already running
rqliteRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(state.NamespaceName, systemd.ServiceTypeRQLite)
if !rqliteRunning {
// Check if RQLite data directory exists (has existing data)
dataDir := filepath.Join(cm.baseDataDir, state.NamespaceName, "rqlite", cm.localNodeID)
hasExistingData := false
if _, err := os.Stat(filepath.Join(dataDir, "raft")); err == nil {
hasExistingData = true
}
if hasExistingData {
var peers []rqlite.RaftPeer
@ -1605,8 +1769,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)
peers = append(peers, rqlite.RaftPeer{ID: raftAddr, Address: raftAddr, NonVoter: false})
}
dataDir := cm.rqliteSpawner.GetDataDir(state.NamespaceName, cm.localNodeID)
if err := cm.rqliteSpawner.WritePeersJSON(dataDir, peers); err != nil {
if err := cm.writePeersJSON(dataDir, peers); err != nil {
cm.logger.Error("Failed to write peers.json", zap.String("namespace", state.NamespaceName), zap.Error(err))
}
}
@ -1643,7 +1806,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
JoinAddresses: joinAddrs,
IsLeader: isLeader,
}
if _, err := cm.rqliteSpawner.SpawnInstance(ctx, rqliteCfg); err != nil {
if err := cm.spawnRQLiteWithSystemd(ctx, rqliteCfg); err != nil {
cm.logger.Error("Failed to restore RQLite from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored RQLite instance from state", zap.String("namespace", state.NamespaceName))
@ -1670,7 +1833,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
AdvertiseAddr: localIP,
PeerAddresses: peers,
}
if _, err := cm.olricSpawner.SpawnInstance(ctx, olricCfg); err != nil {
if err := cm.spawnOlricWithSystemd(ctx, olricCfg); err != nil {
cm.logger.Error("Failed to restore Olric from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored Olric instance from state", zap.String("namespace", state.NamespaceName))
@ -1702,7 +1865,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
IPFSTimeout: cm.ipfsTimeout,
IPFSReplicationFactor: cm.ipfsReplicationFactor,
}
if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil {
if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil {
cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored Gateway instance from state", zap.String("namespace", state.NamespaceName))

View File

@ -0,0 +1,195 @@
package namespace
import (
"context"
"fmt"
"time"
"github.com/DeBrosOfficial/network/pkg/gateway"
"github.com/DeBrosOfficial/network/pkg/olric"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"github.com/DeBrosOfficial/network/pkg/systemd"
"go.uber.org/zap"
)
// SystemdSpawner spawns namespace cluster processes using systemd services
type SystemdSpawner struct {
systemdMgr *systemd.Manager
namespaceBase string
logger *zap.Logger
}
// NewSystemdSpawner creates a new systemd-based spawner
func NewSystemdSpawner(namespaceBase string, logger *zap.Logger) *SystemdSpawner {
return &SystemdSpawner{
systemdMgr: systemd.NewManager(namespaceBase, logger),
namespaceBase: namespaceBase,
logger: logger.With(zap.String("component", "systemd-spawner")),
}
}
// SpawnRQLite starts a RQLite instance using systemd
func (s *SystemdSpawner) SpawnRQLite(ctx context.Context, namespace, nodeID string, cfg rqlite.InstanceConfig) error {
s.logger.Info("Spawning RQLite via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
// Build join arguments
joinArgs := ""
if len(cfg.JoinAddresses) > 0 {
joinArgs = fmt.Sprintf("-join %s", cfg.JoinAddresses[0])
for _, addr := range cfg.JoinAddresses[1:] {
joinArgs += fmt.Sprintf(",%s", addr)
}
}
// Generate environment file
envVars := map[string]string{
"HTTP_ADDR": fmt.Sprintf("0.0.0.0:%d", cfg.HTTPPort),
"RAFT_ADDR": fmt.Sprintf("0.0.0.0:%d", cfg.RaftPort),
"HTTP_ADV_ADDR": cfg.HTTPAdvAddress,
"RAFT_ADV_ADDR": cfg.RaftAdvAddress,
"JOIN_ARGS": joinArgs,
"NODE_ID": nodeID,
}
if err := s.systemdMgr.GenerateEnvFile(namespace, nodeID, systemd.ServiceTypeRQLite, envVars); err != nil {
return fmt.Errorf("failed to generate RQLite env file: %w", err)
}
// Start the systemd service
if err := s.systemdMgr.StartService(namespace, systemd.ServiceTypeRQLite); err != nil {
return fmt.Errorf("failed to start RQLite service: %w", err)
}
// Wait for service to be active
if err := s.waitForService(namespace, systemd.ServiceTypeRQLite, 30*time.Second); err != nil {
return fmt.Errorf("RQLite service did not become active: %w", err)
}
s.logger.Info("RQLite spawned successfully via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
return nil
}
// SpawnOlric starts an Olric instance using systemd
func (s *SystemdSpawner) SpawnOlric(ctx context.Context, namespace, nodeID string, cfg olric.InstanceConfig) error {
s.logger.Info("Spawning Olric via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
// Generate environment file (Olric uses OLRIC_SERVER_CONFIG env var, set in systemd unit)
envVars := map[string]string{
// No additional env vars needed - Olric reads from config file
// The config file path is set in the systemd unit file
}
if err := s.systemdMgr.GenerateEnvFile(namespace, nodeID, systemd.ServiceTypeOlric, envVars); err != nil {
return fmt.Errorf("failed to generate Olric env file: %w", err)
}
// Start the systemd service
if err := s.systemdMgr.StartService(namespace, systemd.ServiceTypeOlric); err != nil {
return fmt.Errorf("failed to start Olric service: %w", err)
}
// Wait for service to be active
if err := s.waitForService(namespace, systemd.ServiceTypeOlric, 30*time.Second); err != nil {
return fmt.Errorf("Olric service did not become active: %w", err)
}
s.logger.Info("Olric spawned successfully via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
return nil
}
// SpawnGateway starts a Gateway instance using systemd
func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error {
s.logger.Info("Spawning Gateway via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
// Generate environment file
envVars := map[string]string{
// Gateway uses config file, no additional env vars needed
}
if err := s.systemdMgr.GenerateEnvFile(namespace, nodeID, systemd.ServiceTypeGateway, envVars); err != nil {
return fmt.Errorf("failed to generate Gateway env file: %w", err)
}
// Start the systemd service
if err := s.systemdMgr.StartService(namespace, systemd.ServiceTypeGateway); err != nil {
return fmt.Errorf("failed to start Gateway service: %w", err)
}
// Wait for service to be active
if err := s.waitForService(namespace, systemd.ServiceTypeGateway, 30*time.Second); err != nil {
return fmt.Errorf("Gateway service did not become active: %w", err)
}
s.logger.Info("Gateway spawned successfully via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
return nil
}
// StopRQLite stops a RQLite instance
func (s *SystemdSpawner) StopRQLite(ctx context.Context, namespace, nodeID string) error {
s.logger.Info("Stopping RQLite via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
return s.systemdMgr.StopService(namespace, systemd.ServiceTypeRQLite)
}
// StopOlric stops an Olric instance
func (s *SystemdSpawner) StopOlric(ctx context.Context, namespace, nodeID string) error {
s.logger.Info("Stopping Olric via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
return s.systemdMgr.StopService(namespace, systemd.ServiceTypeOlric)
}
// StopGateway stops a Gateway instance
func (s *SystemdSpawner) StopGateway(ctx context.Context, namespace, nodeID string) error {
s.logger.Info("Stopping Gateway via systemd",
zap.String("namespace", namespace),
zap.String("node_id", nodeID))
return s.systemdMgr.StopService(namespace, systemd.ServiceTypeGateway)
}
// StopAll stops all services for a namespace
func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error {
s.logger.Info("Stopping all namespace services via systemd",
zap.String("namespace", namespace))
return s.systemdMgr.StopAllNamespaceServices(namespace)
}
// waitForService waits for a systemd service to become active
func (s *SystemdSpawner) waitForService(namespace string, serviceType systemd.ServiceType, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
active, err := s.systemdMgr.IsServiceActive(namespace, serviceType)
if err != nil {
return fmt.Errorf("failed to check service status: %w", err)
}
if active {
return nil
}
time.Sleep(1 * time.Second)
}
return fmt.Errorf("service did not become active within %v", timeout)
}

View File

@ -13,8 +13,6 @@ import (
"github.com/DeBrosOfficial/network/pkg/ipfs"
"github.com/DeBrosOfficial/network/pkg/logging"
"github.com/DeBrosOfficial/network/pkg/namespace"
olricpkg "github.com/DeBrosOfficial/network/pkg/olric"
rqlitepkg "github.com/DeBrosOfficial/network/pkg/rqlite"
"go.uber.org/zap"
)
@ -81,10 +79,8 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
apiGateway.SetClusterProvisioner(clusterManager)
// Wire spawn handler for distributed namespace instance spawning
rqliteSpawner := rqlitepkg.NewInstanceSpawner(baseDataDir, n.logger.Logger)
olricSpawner := olricpkg.NewInstanceSpawner(baseDataDir, n.logger.Logger)
gatewaySpawner := gateway.NewInstanceSpawner(baseDataDir, n.logger.Logger)
spawnHandler := namespacehandlers.NewSpawnHandler(rqliteSpawner, olricSpawner, gatewaySpawner, n.logger.Logger)
systemdSpawner := namespace.NewSystemdSpawner(baseDataDir, n.logger.Logger)
spawnHandler := namespacehandlers.NewSpawnHandler(systemdSpawner, n.logger.Logger)
apiGateway.SetSpawnHandler(spawnHandler)
// Wire namespace delete handler

326
pkg/systemd/manager.go Normal file
View File

@ -0,0 +1,326 @@
package systemd
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"go.uber.org/zap"
)
// ServiceType represents the type of namespace service
type ServiceType string
const (
ServiceTypeRQLite ServiceType = "rqlite"
ServiceTypeOlric ServiceType = "olric"
ServiceTypeGateway ServiceType = "gateway"
)
// Manager manages systemd units for namespace services
type Manager struct {
logger *zap.Logger
systemdDir string
namespaceBase string // Base directory for namespace data
}
// NewManager creates a new systemd manager
func NewManager(namespaceBase string, logger *zap.Logger) *Manager {
return &Manager{
logger: logger.With(zap.String("component", "systemd-manager")),
systemdDir: "/etc/systemd/system",
namespaceBase: namespaceBase,
}
}
// serviceName returns the systemd service name for a namespace and service type
func (m *Manager) serviceName(namespace string, serviceType ServiceType) string {
return fmt.Sprintf("debros-namespace-%s@%s.service", serviceType, namespace)
}
// StartService starts a namespace service
func (m *Manager) StartService(namespace string, serviceType ServiceType) error {
svcName := m.serviceName(namespace, serviceType)
m.logger.Info("Starting systemd service",
zap.String("service", svcName),
zap.String("namespace", namespace))
cmd := exec.Command("systemctl", "start", svcName)
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to start %s: %w\nOutput: %s", svcName, err, string(output))
}
m.logger.Info("Service started successfully", zap.String("service", svcName))
return nil
}
// StopService stops a namespace service
func (m *Manager) StopService(namespace string, serviceType ServiceType) error {
svcName := m.serviceName(namespace, serviceType)
m.logger.Info("Stopping systemd service",
zap.String("service", svcName),
zap.String("namespace", namespace))
cmd := exec.Command("systemctl", "stop", svcName)
if output, err := cmd.CombinedOutput(); err != nil {
// Don't error if service is already stopped or doesn't exist
if strings.Contains(string(output), "not loaded") || strings.Contains(string(output), "inactive") {
m.logger.Debug("Service already stopped or not loaded", zap.String("service", svcName))
return nil
}
return fmt.Errorf("failed to stop %s: %w\nOutput: %s", svcName, err, string(output))
}
m.logger.Info("Service stopped successfully", zap.String("service", svcName))
return nil
}
// RestartService restarts a namespace service
func (m *Manager) RestartService(namespace string, serviceType ServiceType) error {
svcName := m.serviceName(namespace, serviceType)
m.logger.Info("Restarting systemd service",
zap.String("service", svcName),
zap.String("namespace", namespace))
cmd := exec.Command("systemctl", "restart", svcName)
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to restart %s: %w\nOutput: %s", svcName, err, string(output))
}
m.logger.Info("Service restarted successfully", zap.String("service", svcName))
return nil
}
// EnableService enables a namespace service to start on boot
func (m *Manager) EnableService(namespace string, serviceType ServiceType) error {
svcName := m.serviceName(namespace, serviceType)
m.logger.Info("Enabling systemd service",
zap.String("service", svcName),
zap.String("namespace", namespace))
cmd := exec.Command("systemctl", "enable", svcName)
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to enable %s: %w\nOutput: %s", svcName, err, string(output))
}
m.logger.Info("Service enabled successfully", zap.String("service", svcName))
return nil
}
// DisableService disables a namespace service
func (m *Manager) DisableService(namespace string, serviceType ServiceType) error {
svcName := m.serviceName(namespace, serviceType)
m.logger.Info("Disabling systemd service",
zap.String("service", svcName),
zap.String("namespace", namespace))
cmd := exec.Command("systemctl", "disable", svcName)
if output, err := cmd.CombinedOutput(); err != nil {
// Don't error if service is already disabled or doesn't exist
if strings.Contains(string(output), "not loaded") {
m.logger.Debug("Service not loaded", zap.String("service", svcName))
return nil
}
return fmt.Errorf("failed to disable %s: %w\nOutput: %s", svcName, err, string(output))
}
m.logger.Info("Service disabled successfully", zap.String("service", svcName))
return nil
}
// IsServiceActive checks if a namespace service is active
func (m *Manager) IsServiceActive(namespace string, serviceType ServiceType) (bool, error) {
svcName := m.serviceName(namespace, serviceType)
cmd := exec.Command("systemctl", "is-active", svcName)
output, err := cmd.CombinedOutput()
if err != nil {
// is-active returns exit code 3 if service is inactive
if strings.TrimSpace(string(output)) == "inactive" || strings.TrimSpace(string(output)) == "failed" {
return false, nil
}
return false, fmt.Errorf("failed to check service status: %w\nOutput: %s", err, string(output))
}
return strings.TrimSpace(string(output)) == "active", nil
}
// ReloadDaemon reloads systemd daemon configuration
func (m *Manager) ReloadDaemon() error {
m.logger.Info("Reloading systemd daemon")
cmd := exec.Command("systemctl", "daemon-reload")
if output, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("failed to reload systemd daemon: %w\nOutput: %s", err, string(output))
}
return nil
}
// StopAllNamespaceServices stops all namespace services for a given namespace
func (m *Manager) StopAllNamespaceServices(namespace string) error {
m.logger.Info("Stopping all namespace services", zap.String("namespace", namespace))
// Stop in reverse dependency order: Gateway → Olric → RQLite
services := []ServiceType{ServiceTypeGateway, ServiceTypeOlric, ServiceTypeRQLite}
for _, svcType := range services {
if err := m.StopService(namespace, svcType); err != nil {
m.logger.Warn("Failed to stop service",
zap.String("namespace", namespace),
zap.String("service_type", string(svcType)),
zap.Error(err))
// Continue stopping other services even if one fails
}
}
return nil
}
// StartAllNamespaceServices starts all namespace services for a given namespace
func (m *Manager) StartAllNamespaceServices(namespace string) error {
m.logger.Info("Starting all namespace services", zap.String("namespace", namespace))
// Start in dependency order: RQLite → Olric → Gateway
services := []ServiceType{ServiceTypeRQLite, ServiceTypeOlric, ServiceTypeGateway}
for _, svcType := range services {
if err := m.StartService(namespace, svcType); err != nil {
return fmt.Errorf("failed to start %s service: %w", svcType, err)
}
}
return nil
}
// ListNamespaceServices returns all namespace services currently registered in systemd
func (m *Manager) ListNamespaceServices() ([]string, error) {
cmd := exec.Command("systemctl", "list-units", "--all", "--no-legend", "debros-namespace-*@*.service")
output, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to list namespace services: %w\nOutput: %s", err, string(output))
}
var services []string
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
for _, line := range lines {
if line == "" {
continue
}
fields := strings.Fields(line)
if len(fields) > 0 {
services = append(services, fields[0])
}
}
return services, nil
}
// StopAllNamespaceServicesGlobally stops ALL namespace services on this node (for upgrade/maintenance)
func (m *Manager) StopAllNamespaceServicesGlobally() error {
m.logger.Info("Stopping all namespace services globally")
services, err := m.ListNamespaceServices()
if err != nil {
return fmt.Errorf("failed to list services: %w", err)
}
for _, svc := range services {
m.logger.Info("Stopping service", zap.String("service", svc))
cmd := exec.Command("systemctl", "stop", svc)
if output, err := cmd.CombinedOutput(); err != nil {
m.logger.Warn("Failed to stop service",
zap.String("service", svc),
zap.Error(err),
zap.String("output", string(output)))
// Continue stopping other services
}
}
return nil
}
// CleanupOrphanedProcesses finds and kills any orphaned namespace processes not managed by systemd
// This is for cleaning up after migration from old exec.Command approach
func (m *Manager) CleanupOrphanedProcesses() error {
m.logger.Info("Cleaning up orphaned namespace processes")
// Find processes listening on namespace ports (10000-10999 range)
// This is a safety measure during migration
cmd := exec.Command("bash", "-c", "lsof -ti:10000-10999 2>/dev/null | xargs -r kill -TERM 2>/dev/null || true")
if output, err := cmd.CombinedOutput(); err != nil {
m.logger.Debug("Orphaned process cleanup completed",
zap.Error(err),
zap.String("output", string(output)))
}
return nil
}
// GenerateEnvFile creates the environment file for a namespace service
func (m *Manager) GenerateEnvFile(namespace, nodeID string, serviceType ServiceType, envVars map[string]string) error {
envDir := filepath.Join(m.namespaceBase, namespace)
if err := os.MkdirAll(envDir, 0755); err != nil {
return fmt.Errorf("failed to create env directory: %w", err)
}
envFile := filepath.Join(envDir, fmt.Sprintf("%s.env", serviceType))
var content strings.Builder
content.WriteString("# Auto-generated environment file for namespace service\n")
content.WriteString(fmt.Sprintf("# Namespace: %s\n", namespace))
content.WriteString(fmt.Sprintf("# Node ID: %s\n", nodeID))
content.WriteString(fmt.Sprintf("# Service: %s\n\n", serviceType))
// Always include NODE_ID
content.WriteString(fmt.Sprintf("NODE_ID=%s\n", nodeID))
// Add all other environment variables
for key, value := range envVars {
content.WriteString(fmt.Sprintf("%s=%s\n", key, value))
}
if err := os.WriteFile(envFile, []byte(content.String()), 0644); err != nil {
return fmt.Errorf("failed to write env file: %w", err)
}
m.logger.Info("Generated environment file",
zap.String("file", envFile),
zap.String("namespace", namespace),
zap.String("service_type", string(serviceType)))
return nil
}
// InstallTemplateUnits installs the systemd template unit files
func (m *Manager) InstallTemplateUnits(sourceDir string) error {
m.logger.Info("Installing systemd template units", zap.String("source", sourceDir))
templates := []string{
"debros-namespace-rqlite@.service",
"debros-namespace-olric@.service",
"debros-namespace-gateway@.service",
}
for _, template := range templates {
source := filepath.Join(sourceDir, template)
dest := filepath.Join(m.systemdDir, template)
data, err := os.ReadFile(source)
if err != nil {
return fmt.Errorf("failed to read template %s: %w", template, err)
}
if err := os.WriteFile(dest, data, 0644); err != nil {
return fmt.Errorf("failed to write template %s: %w", template, err)
}
m.logger.Info("Installed template unit", zap.String("template", template))
}
// Reload systemd daemon to recognize new templates
if err := m.ReloadDaemon(); err != nil {
return fmt.Errorf("failed to reload systemd daemon: %w", err)
}
m.logger.Info("All template units installed successfully")
return nil
}

View File

@ -0,0 +1,32 @@
[Unit]
Description=DeBros Namespace Gateway (%i)
Documentation=https://github.com/DeBrosOfficial/network
After=network.target debros-namespace-rqlite@%i.service debros-namespace-olric@%i.service
Requires=debros-namespace-rqlite@%i.service debros-namespace-olric@%i.service
PartOf=debros-node.service
[Service]
Type=simple
User=debros
Group=debros
WorkingDirectory=/home/debros
EnvironmentFile=/home/debros/.orama/data/namespaces/%i/gateway.env
ExecStart=/home/debros/bin/gateway --config /home/debros/.orama/data/namespaces/%i/configs/gateway-${NODE_ID}.yaml
TimeoutStopSec=30s
KillMode=mixed
KillSignal=SIGTERM
Restart=on-failure
RestartSec=5s
StandardOutput=journal
StandardError=journal
SyslogIdentifier=debros-gateway-%i
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,34 @@
[Unit]
Description=DeBros Namespace Olric Cache (%i)
Documentation=https://github.com/DeBrosOfficial/network
After=network.target debros-namespace-rqlite@%i.service
Requires=debros-namespace-rqlite@%i.service
PartOf=debros-node.service
[Service]
Type=simple
User=debros
Group=debros
WorkingDirectory=/home/debros
# Olric reads config from environment variable
Environment="OLRIC_SERVER_CONFIG=/home/debros/.orama/data/namespaces/%i/configs/olric-${NODE_ID}.yaml"
EnvironmentFile=/home/debros/.orama/data/namespaces/%i/olric.env
ExecStart=/usr/local/bin/olric-server
TimeoutStopSec=30s
KillMode=mixed
KillSignal=SIGTERM
Restart=on-failure
RestartSec=5s
StandardOutput=journal
StandardError=journal
SyslogIdentifier=debros-olric-%i
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,44 @@
[Unit]
Description=DeBros Namespace RQLite (%i)
Documentation=https://github.com/DeBrosOfficial/network
After=network.target
PartOf=debros-node.service
StopWhenUnneeded=false
[Service]
Type=simple
User=debros
Group=debros
WorkingDirectory=/home/debros
# Environment file contains namespace-specific config
EnvironmentFile=/home/debros/.orama/data/namespaces/%i/rqlite.env
# Start rqlited with args from environment
ExecStart=/usr/local/bin/rqlited \
-http-addr ${HTTP_ADDR} \
-raft-addr ${RAFT_ADDR} \
-http-adv-addr ${HTTP_ADV_ADDR} \
-raft-adv-addr ${RAFT_ADV_ADDR} \
${JOIN_ARGS} \
/home/debros/.orama/data/namespaces/%i/rqlite/${NODE_ID}
# Graceful shutdown
TimeoutStopSec=30s
KillMode=mixed
KillSignal=SIGTERM
# Restart policy
Restart=on-failure
RestartSec=5s
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=debros-rqlite-%i
# Resource limits
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target