orama/core/pkg/namespace/cluster_manager.go
anonpenguin23 2b184f0398 fix(namespace): make WebRTC config survive slow/cold node restarts (#130)
Root cause of the recurring "turn.credentials → namespace_not_configured" on a
distant node: at converge the gateway resolves its TURN secret from the
namespace rqlite, and on a slow/just-restarted node that read fails ONCE, so
the gateway is written with TURN disabled. Removing the node is not a fix — the
software must tolerate a slow read.

Two-part fix (complements e7ed718's "don't blank a warm config"):
  - RETRY the secret read (5×2s) at converge so a node whose rqlite is still
    syncing waits for it to land instead of writing an empty block once. A
    genuine decrypt failure still exhausts the retries → unresolved → the
    running config is preserved.
  - CACHE the resolved secret into the node's own cluster-state.json
    (applyResolvedWebRTCToState), so the NEXT cold start reads it from disk —
    chooseRestoreWebRTC is state-first and short-circuits before the DB. The
    state struct already had TURNSharedSecret "for cold start" but nothing
    populated it; now it's filled on every successful resolve (only rewritten
    on change). Each node self-heals its own cache; nothing new is sent
    cross-node.

cluster-state.json now carries the TURN secret, so both writers (local
saveLocalState and the remote SaveClusterState) are tightened to 0600 + chmod.
Stale-secret self-heals: disable/enable webrtc re-pushes every node's config
and the next converge re-caches the new value.

Dual-reviewed: code-quality APPROVED; security SECURE after the remote-write
0600 fix. Tests: cache populate + short-circuit, no-change, turn-only node.
2026-06-13 08:12:48 +03:00

2351 lines
91 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package namespace
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/gateway"
"github.com/DeBrosOfficial/network/pkg/olric"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"github.com/DeBrosOfficial/network/pkg/sfu"
"github.com/DeBrosOfficial/network/pkg/systemd"
"github.com/google/uuid"
"go.uber.org/zap"
)
// ClusterManagerConfig contains configuration for the cluster manager
type ClusterManagerConfig struct {
BaseDomain string // Base domain for namespace gateways (e.g., "orama-devnet.network")
BaseDataDir string // Base directory for namespace data (e.g., "~/.orama/data/namespaces")
GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001")
// IPFS configuration for namespace gateways (defaults used if not set)
IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094")
IPFSAPIURL string // IPFS API URL (default: "http://localhost:4501")
IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s)
IPFSReplicationFactor int // IPFS replication factor (default: 3)
// TurnEncryptionKey is a 32-byte AES-256 key for encrypting TURN shared secrets
// in RQLite. Derived from cluster secret via HKDF(clusterSecret, "turn-encryption").
// If nil, TURN secrets are stored in plaintext (backward compatibility).
TurnEncryptionKey []byte
// ClusterSecretPath is the host's cluster-secret file path. Forwarded
// to spawned namespace gateways via YAML so they can derive the
// cluster-wide JWT signing key (bug #215 fix). Empty string disables
// cross-node JWT verification within namespace clusters.
ClusterSecretPath string
// SecretsEncryptionKey is the host's serverless secrets encryption key
// (AES-256, hex-encoded), read once from secrets/secrets-encryption-key.
// Forwarded to spawned namespace gateways so `function secrets ...`
// works there (bugboard #837 follow-up). Empty leaves namespace-gateway
// secrets management disabled (fail-loud).
SecretsEncryptionKey string
}
// ClusterManager orchestrates namespace cluster provisioning and lifecycle
type ClusterManager struct {
db rqlite.Client
portAllocator *NamespacePortAllocator
webrtcPortAllocator *WebRTCPortAllocator
nodeSelector *ClusterNodeSelector
systemdSpawner *SystemdSpawner // NEW: Systemd-based spawner replaces old spawners
dnsManager *DNSRecordManager
logger *zap.Logger
baseDomain string
baseDataDir string
globalRQLiteDSN string // Global RQLite DSN for namespace gateway auth
// IPFS configuration for namespace gateways
ipfsClusterAPIURL string
ipfsAPIURL string
ipfsTimeout time.Duration
ipfsReplicationFactor int
// Local node identity for distributed spawning
localNodeID string
// AES-256 key for encrypting TURN secrets in RQLite (nil = plaintext)
turnEncryptionKey []byte
// Host's serverless secrets encryption key, forwarded to spawned
// namespace gateways (bugboard #837 follow-up). Empty = disabled.
secretsEncryptionKey string
// Track provisioning operations
provisioningMu sync.RWMutex
provisioning map[string]bool // namespace -> in progress
}
// NewClusterManager creates a new cluster manager
func NewClusterManager(
db rqlite.Client,
cfg ClusterManagerConfig,
logger *zap.Logger,
) *ClusterManager {
// Create internal components
portAllocator := NewNamespacePortAllocator(db, logger)
webrtcPortAllocator := NewWebRTCPortAllocator(db, logger)
nodeSelector := NewClusterNodeSelector(db, portAllocator, logger)
systemdSpawner := NewSystemdSpawner(cfg.BaseDataDir, cfg.ClusterSecretPath, logger)
dnsManager := NewDNSRecordManager(db, cfg.BaseDomain, logger)
// Set IPFS defaults
ipfsClusterAPIURL := cfg.IPFSClusterAPIURL
if ipfsClusterAPIURL == "" {
ipfsClusterAPIURL = "http://localhost:9094"
}
ipfsAPIURL := cfg.IPFSAPIURL
if ipfsAPIURL == "" {
ipfsAPIURL = "http://localhost:4501"
}
ipfsTimeout := cfg.IPFSTimeout
if ipfsTimeout == 0 {
ipfsTimeout = 60 * time.Second
}
ipfsReplicationFactor := cfg.IPFSReplicationFactor
if ipfsReplicationFactor == 0 {
ipfsReplicationFactor = 3
}
return &ClusterManager{
db: db,
portAllocator: portAllocator,
webrtcPortAllocator: webrtcPortAllocator,
nodeSelector: nodeSelector,
systemdSpawner: systemdSpawner,
dnsManager: dnsManager,
baseDomain: cfg.BaseDomain,
baseDataDir: cfg.BaseDataDir,
globalRQLiteDSN: cfg.GlobalRQLiteDSN,
ipfsClusterAPIURL: ipfsClusterAPIURL,
ipfsAPIURL: ipfsAPIURL,
ipfsTimeout: ipfsTimeout,
ipfsReplicationFactor: ipfsReplicationFactor,
turnEncryptionKey: cfg.TurnEncryptionKey,
secretsEncryptionKey: cfg.SecretsEncryptionKey,
logger: logger.With(zap.String("component", "cluster-manager")),
provisioning: make(map[string]bool),
}
}
// NewClusterManagerWithComponents creates a cluster manager with custom components (useful for testing)
func NewClusterManagerWithComponents(
db rqlite.Client,
portAllocator *NamespacePortAllocator,
nodeSelector *ClusterNodeSelector,
systemdSpawner *SystemdSpawner,
cfg ClusterManagerConfig,
logger *zap.Logger,
) *ClusterManager {
// Set IPFS defaults (same as NewClusterManager)
ipfsClusterAPIURL := cfg.IPFSClusterAPIURL
if ipfsClusterAPIURL == "" {
ipfsClusterAPIURL = "http://localhost:9094"
}
ipfsAPIURL := cfg.IPFSAPIURL
if ipfsAPIURL == "" {
ipfsAPIURL = "http://localhost:4501"
}
ipfsTimeout := cfg.IPFSTimeout
if ipfsTimeout == 0 {
ipfsTimeout = 60 * time.Second
}
ipfsReplicationFactor := cfg.IPFSReplicationFactor
if ipfsReplicationFactor == 0 {
ipfsReplicationFactor = 3
}
return &ClusterManager{
db: db,
portAllocator: portAllocator,
webrtcPortAllocator: NewWebRTCPortAllocator(db, logger),
nodeSelector: nodeSelector,
systemdSpawner: systemdSpawner,
dnsManager: NewDNSRecordManager(db, cfg.BaseDomain, logger),
baseDomain: cfg.BaseDomain,
baseDataDir: cfg.BaseDataDir,
globalRQLiteDSN: cfg.GlobalRQLiteDSN,
ipfsClusterAPIURL: ipfsClusterAPIURL,
ipfsAPIURL: ipfsAPIURL,
ipfsTimeout: ipfsTimeout,
ipfsReplicationFactor: ipfsReplicationFactor,
turnEncryptionKey: cfg.TurnEncryptionKey,
secretsEncryptionKey: cfg.SecretsEncryptionKey,
logger: logger.With(zap.String("component", "cluster-manager")),
provisioning: make(map[string]bool),
}
}
// SetLocalNodeID sets this node's peer ID for local/remote dispatch during provisioning
func (cm *ClusterManager) SetLocalNodeID(id string) {
cm.localNodeID = id
cm.logger.Info("Local node ID set for distributed provisioning", zap.String("local_node_id", id))
}
// spawnRQLiteWithSystemd generates config and spawns RQLite via systemd
func (cm *ClusterManager) spawnRQLiteWithSystemd(ctx context.Context, cfg rqlite.InstanceConfig) error {
// RQLite uses command-line args, no config file needed
// Just call systemd spawner which will generate env file and start service
return cm.systemdSpawner.SpawnRQLite(ctx, cfg.Namespace, cfg.NodeID, cfg)
}
// spawnOlricWithSystemd spawns Olric via systemd (config creation now handled by spawner)
func (cm *ClusterManager) spawnOlricWithSystemd(ctx context.Context, cfg olric.InstanceConfig) error {
// SystemdSpawner now handles config file creation
return cm.systemdSpawner.SpawnOlric(ctx, cfg.Namespace, cfg.NodeID, cfg)
}
// writePeersJSON writes RQLite peers.json file for Raft cluster recovery
func (cm *ClusterManager) writePeersJSON(dataDir string, peers []rqlite.RaftPeer) error {
raftDir := filepath.Join(dataDir, "raft")
if err := os.MkdirAll(raftDir, 0755); err != nil {
return fmt.Errorf("failed to create raft directory: %w", err)
}
peersFile := filepath.Join(raftDir, "peers.json")
data, err := json.Marshal(peers)
if err != nil {
return fmt.Errorf("failed to marshal peers: %w", err)
}
return os.WriteFile(peersFile, data, 0644)
}
// spawnGatewayWithSystemd spawns Gateway via systemd (config creation now handled by spawner)
func (cm *ClusterManager) spawnGatewayWithSystemd(ctx context.Context, cfg gateway.InstanceConfig) error {
// SystemdSpawner now handles config file creation
return cm.systemdSpawner.SpawnGateway(ctx, cfg.Namespace, cfg.NodeID, cfg)
}
// ProvisionCluster provisions a new 3-node cluster for a namespace
// This is an async operation - returns immediately with cluster ID for polling
func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, namespaceName, provisionedBy string) (*NamespaceCluster, error) {
// Check if already provisioning
cm.provisioningMu.Lock()
if cm.provisioning[namespaceName] {
cm.provisioningMu.Unlock()
return nil, fmt.Errorf("namespace %s is already being provisioned", namespaceName)
}
cm.provisioning[namespaceName] = true
cm.provisioningMu.Unlock()
defer func() {
cm.provisioningMu.Lock()
delete(cm.provisioning, namespaceName)
cm.provisioningMu.Unlock()
}()
cm.logger.Info("Starting cluster provisioning",
zap.String("namespace", namespaceName),
zap.Int("namespace_id", namespaceID),
zap.String("provisioned_by", provisionedBy),
)
// Create cluster record
cluster := &NamespaceCluster{
ID: uuid.New().String(),
NamespaceID: namespaceID,
NamespaceName: namespaceName,
Status: ClusterStatusProvisioning,
RQLiteNodeCount: 3,
OlricNodeCount: 3,
GatewayNodeCount: 3,
ProvisionedBy: provisionedBy,
ProvisionedAt: time.Now(),
}
// Insert cluster record
if err := cm.insertCluster(ctx, cluster); err != nil {
return nil, fmt.Errorf("failed to insert cluster record: %w", err)
}
// Log event
cm.logEvent(ctx, cluster.ID, EventProvisioningStarted, "", "Cluster provisioning started", nil)
// Select 3 nodes for the cluster
nodes, err := cm.nodeSelector.SelectNodesForCluster(ctx, 3)
if err != nil {
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, err.Error())
return nil, fmt.Errorf("failed to select nodes: %w", err)
}
nodeIDs := make([]string, len(nodes))
for i, n := range nodes {
nodeIDs[i] = n.NodeID
}
cm.logEvent(ctx, cluster.ID, EventNodesSelected, "", "Selected nodes for cluster", map[string]interface{}{"nodes": nodeIDs})
// Allocate ports on each node
portBlocks := make([]*PortBlock, len(nodes))
for i, node := range nodes {
block, err := cm.portAllocator.AllocatePortBlock(ctx, node.NodeID, cluster.ID)
if err != nil {
// Rollback previous allocations
for j := 0; j < i; j++ {
cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, nodes[j].NodeID)
}
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, err.Error())
return nil, fmt.Errorf("failed to allocate ports on node %s: %w", node.NodeID, err)
}
portBlocks[i] = block
cm.logEvent(ctx, cluster.ID, EventPortsAllocated, node.NodeID,
fmt.Sprintf("Allocated ports %d-%d", block.PortStart, block.PortEnd), nil)
}
// Start RQLite instances (leader first, then followers)
rqliteInstances, err := cm.startRQLiteCluster(ctx, cluster, nodes, portBlocks)
if err != nil {
cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, nil, nil)
return nil, fmt.Errorf("failed to start RQLite cluster: %w", err)
}
// Start Olric instances
olricInstances, err := cm.startOlricCluster(ctx, cluster, nodes, portBlocks)
if err != nil {
cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, nil)
return nil, fmt.Errorf("failed to start Olric cluster: %w", err)
}
// Start Gateway instances (optional - may not be available in dev mode)
_, err = cm.startGatewayCluster(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances)
if err != nil {
// Check if this is a "binary not found" error - if so, continue without gateways
if strings.Contains(err.Error(), "gateway binary not found") {
cm.logger.Warn("Skipping namespace gateway spawning (binary not available)",
zap.String("namespace", cluster.NamespaceName),
zap.Error(err),
)
cm.logEvent(ctx, cluster.ID, "gateway_skipped", "", "Gateway binary not available, cluster will use main gateway", nil)
} else {
cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances)
return nil, fmt.Errorf("failed to start Gateway cluster: %w", err)
}
}
// Create DNS records for namespace gateway
if err := cm.createDNSRecords(ctx, cluster, nodes, portBlocks); err != nil {
cm.logger.Warn("Failed to create DNS records", zap.Error(err))
// Don't fail provisioning for DNS errors
}
// Update cluster status to ready
now := time.Now()
cluster.Status = ClusterStatusReady
cluster.ReadyAt = &now
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "")
cm.logEvent(ctx, cluster.ID, EventClusterReady, "", "Cluster is ready", nil)
// Save cluster-state.json on all nodes (local + remote) for disk-based restore on restart
cm.saveClusterStateToAllNodes(ctx, cluster, nodes, portBlocks)
cm.logger.Info("Cluster provisioning completed",
zap.String("cluster_id", cluster.ID),
zap.String("namespace", namespaceName),
)
return cluster, nil
}
// startRQLiteCluster starts RQLite instances on all nodes (locally or remotely)
func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) ([]*rqlite.Instance, error) {
instances := make([]*rqlite.Instance, len(nodes))
// Start leader first (node 0)
leaderCfg := rqlite.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: nodes[0].NodeID,
HTTPPort: portBlocks[0].RQLiteHTTPPort,
RaftPort: portBlocks[0].RQLiteRaftPort,
HTTPAdvAddress: fmt.Sprintf("%s:%d", nodes[0].InternalIP, portBlocks[0].RQLiteHTTPPort),
RaftAdvAddress: fmt.Sprintf("%s:%d", nodes[0].InternalIP, portBlocks[0].RQLiteRaftPort),
IsLeader: true,
}
var err error
if nodes[0].NodeID == cm.localNodeID {
cm.logger.Info("Spawning RQLite leader locally", zap.String("node", nodes[0].NodeID))
err = cm.spawnRQLiteWithSystemd(ctx, leaderCfg)
if err == nil {
// Create Instance object for consistency with existing code
instances[0] = &rqlite.Instance{
Config: leaderCfg,
}
}
} else {
cm.logger.Info("Spawning RQLite leader remotely", zap.String("node", nodes[0].NodeID), zap.String("ip", nodes[0].InternalIP))
instances[0], err = cm.spawnRQLiteRemote(ctx, nodes[0].InternalIP, leaderCfg)
}
if err != nil {
return nil, fmt.Errorf("failed to start RQLite leader: %w", err)
}
cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, nodes[0].NodeID, "RQLite leader started", nil)
cm.logEvent(ctx, cluster.ID, EventRQLiteLeaderElected, nodes[0].NodeID, "RQLite leader elected", nil)
if err := cm.insertClusterNode(ctx, cluster.ID, nodes[0].NodeID, NodeRoleRQLiteLeader, portBlocks[0]); err != nil {
cm.logger.Warn("Failed to record cluster node", zap.Error(err))
}
// Start followers
leaderRaftAddr := leaderCfg.RaftAdvAddress
for i := 1; i < len(nodes); i++ {
followerCfg := rqlite.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: nodes[i].NodeID,
HTTPPort: portBlocks[i].RQLiteHTTPPort,
RaftPort: portBlocks[i].RQLiteRaftPort,
HTTPAdvAddress: fmt.Sprintf("%s:%d", nodes[i].InternalIP, portBlocks[i].RQLiteHTTPPort),
RaftAdvAddress: fmt.Sprintf("%s:%d", nodes[i].InternalIP, portBlocks[i].RQLiteRaftPort),
JoinAddresses: []string{leaderRaftAddr},
IsLeader: false,
}
var followerInstance *rqlite.Instance
if nodes[i].NodeID == cm.localNodeID {
cm.logger.Info("Spawning RQLite follower locally", zap.String("node", nodes[i].NodeID))
err = cm.spawnRQLiteWithSystemd(ctx, followerCfg)
if err == nil {
followerInstance = &rqlite.Instance{
Config: followerCfg,
}
}
} else {
cm.logger.Info("Spawning RQLite follower remotely", zap.String("node", nodes[i].NodeID), zap.String("ip", nodes[i].InternalIP))
followerInstance, err = cm.spawnRQLiteRemote(ctx, nodes[i].InternalIP, followerCfg)
}
if err != nil {
// Stop previously started instances
for j := 0; j < i; j++ {
cm.stopRQLiteOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName, instances[j])
}
return nil, fmt.Errorf("failed to start RQLite follower on node %s: %w", nodes[i].NodeID, err)
}
instances[i] = followerInstance
cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, nodes[i].NodeID, "RQLite follower started", nil)
cm.logEvent(ctx, cluster.ID, EventRQLiteJoined, nodes[i].NodeID, "RQLite follower joined cluster", nil)
if err := cm.insertClusterNode(ctx, cluster.ID, nodes[i].NodeID, NodeRoleRQLiteFollower, portBlocks[i]); err != nil {
cm.logger.Warn("Failed to record cluster node", zap.Error(err))
}
}
return instances, nil
}
// startOlricCluster starts Olric instances on all nodes concurrently.
// Olric uses memberlist for peer discovery — all peers must be reachable at roughly
// the same time. Sequential spawning fails because early instances exhaust their
// retry budget before later instances start. By spawning all concurrently, all
// memberlist ports open within seconds of each other, allowing discovery to succeed.
func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) ([]*olric.OlricInstance, error) {
instances := make([]*olric.OlricInstance, len(nodes))
errs := make([]error, len(nodes))
// Build configs for all nodes upfront
configs := make([]olric.InstanceConfig, len(nodes))
for i, node := range nodes {
var peers []string
for j, peerNode := range nodes {
if j != i {
peers = append(peers, fmt.Sprintf("%s:%d", peerNode.InternalIP, portBlocks[j].OlricMemberlistPort))
}
}
configs[i] = olric.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: node.NodeID,
HTTPPort: portBlocks[i].OlricHTTPPort,
MemberlistPort: portBlocks[i].OlricMemberlistPort,
BindAddr: node.InternalIP, // Bind to WG IP directly (0.0.0.0 resolves to IPv6 on some hosts)
AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers
PeerAddresses: peers,
}
}
// Spawn all instances concurrently
var wg sync.WaitGroup
for i, node := range nodes {
wg.Add(1)
go func(idx int, n NodeCapacity) {
defer wg.Done()
if n.NodeID == cm.localNodeID {
cm.logger.Info("Spawning Olric locally", zap.String("node", n.NodeID))
errs[idx] = cm.spawnOlricWithSystemd(ctx, configs[idx])
if errs[idx] == nil {
instances[idx] = &olric.OlricInstance{
Namespace: configs[idx].Namespace,
NodeID: configs[idx].NodeID,
HTTPPort: configs[idx].HTTPPort,
MemberlistPort: configs[idx].MemberlistPort,
BindAddr: configs[idx].BindAddr,
AdvertiseAddr: configs[idx].AdvertiseAddr,
PeerAddresses: configs[idx].PeerAddresses,
Status: olric.InstanceStatusRunning,
StartedAt: time.Now(),
}
}
} else {
cm.logger.Info("Spawning Olric remotely", zap.String("node", n.NodeID), zap.String("ip", n.InternalIP))
instances[idx], errs[idx] = cm.spawnOlricRemote(ctx, n.InternalIP, configs[idx])
}
}(i, node)
}
wg.Wait()
// Check for errors — if any failed, stop all and return
for i, err := range errs {
if err != nil {
cm.logger.Error("Olric spawn failed", zap.String("node", nodes[i].NodeID), zap.Error(err))
// Stop any that succeeded
for j := range nodes {
if errs[j] == nil {
cm.stopOlricOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName)
}
}
return nil, fmt.Errorf("failed to start Olric on node %s: %w", nodes[i].NodeID, err)
}
}
// All instances started — give memberlist time to converge.
// Olric's memberlist retries peer joins every ~1s for ~10 attempts.
// Since all instances are now up, they should discover each other quickly.
cm.logger.Info("All Olric instances started, waiting for memberlist convergence",
zap.Int("node_count", len(nodes)),
)
time.Sleep(5 * time.Second)
// Log events and record cluster nodes
for i, node := range nodes {
cm.logEvent(ctx, cluster.ID, EventOlricStarted, node.NodeID, "Olric instance started", nil)
cm.logEvent(ctx, cluster.ID, EventOlricJoined, node.NodeID, "Olric instance joined memberlist", nil)
if err := cm.insertClusterNode(ctx, cluster.ID, node.NodeID, NodeRoleOlric, portBlocks[i]); err != nil {
cm.logger.Warn("Failed to record cluster node", zap.Error(err))
}
}
// Verify at least the local instance is still healthy after convergence
for i, node := range nodes {
if node.NodeID == cm.localNodeID && instances[i] != nil {
healthy, err := instances[i].IsHealthy(ctx)
if !healthy {
cm.logger.Warn("Local Olric instance unhealthy after convergence wait", zap.Error(err))
} else {
cm.logger.Info("Local Olric instance healthy after convergence")
}
}
}
return instances, nil
}
// startGatewayCluster starts Gateway instances on all nodes (locally or remotely)
func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) ([]*gateway.GatewayInstance, error) {
instances := make([]*gateway.GatewayInstance, len(nodes))
// Build Olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface)
olricServers := make([]string, len(olricInstances))
for i, inst := range olricInstances {
olricServers[i] = inst.AdvertisedDSN() // Always use WireGuard IP
}
// Start all Gateway instances
for i, node := range nodes {
// Connect to local RQLite instance on each node
rqliteDSN := fmt.Sprintf("http://localhost:%d", portBlocks[i].RQLiteHTTPPort)
cfg := gateway.InstanceConfig{
Namespace: cluster.NamespaceName,
NodeID: node.NodeID,
HTTPPort: portBlocks[i].GatewayHTTPPort,
BaseDomain: cm.baseDomain,
RQLiteDSN: rqliteDSN,
GlobalRQLiteDSN: cm.globalRQLiteDSN,
OlricServers: olricServers,
OlricTimeout: 30 * time.Second,
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
IPFSAPIURL: cm.ipfsAPIURL,
IPFSTimeout: cm.ipfsTimeout,
IPFSReplicationFactor: cm.ipfsReplicationFactor,
SecretsEncryptionKey: cm.secretsEncryptionKey,
}
var instance *gateway.GatewayInstance
var err error
if node.NodeID == cm.localNodeID {
cm.logger.Info("Spawning Gateway locally", zap.String("node", node.NodeID))
err = cm.spawnGatewayWithSystemd(ctx, cfg)
if err == nil {
instance = &gateway.GatewayInstance{
Namespace: cfg.Namespace,
NodeID: cfg.NodeID,
HTTPPort: cfg.HTTPPort,
BaseDomain: cfg.BaseDomain,
RQLiteDSN: cfg.RQLiteDSN,
OlricServers: cfg.OlricServers,
Status: gateway.InstanceStatusRunning,
StartedAt: time.Now(),
}
}
} else {
cm.logger.Info("Spawning Gateway remotely", zap.String("node", node.NodeID), zap.String("ip", node.InternalIP))
instance, err = cm.spawnGatewayRemote(ctx, node.InternalIP, cfg)
}
if err != nil {
// Stop previously started instances
for j := 0; j < i; j++ {
cm.stopGatewayOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName)
}
return nil, fmt.Errorf("failed to start Gateway on node %s: %w", node.NodeID, err)
}
instances[i] = instance
cm.logEvent(ctx, cluster.ID, EventGatewayStarted, node.NodeID, "Gateway instance started", nil)
if err := cm.insertClusterNode(ctx, cluster.ID, node.NodeID, NodeRoleGateway, portBlocks[i]); err != nil {
cm.logger.Warn("Failed to record cluster node", zap.Error(err))
}
}
return instances, nil
}
// spawnRQLiteRemote sends a spawn-rqlite request to a remote node
func (cm *ClusterManager) spawnRQLiteRemote(ctx context.Context, nodeIP string, cfg rqlite.InstanceConfig) (*rqlite.Instance, error) {
resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
"action": "spawn-rqlite",
"namespace": cfg.Namespace,
"node_id": cfg.NodeID,
"rqlite_http_port": cfg.HTTPPort,
"rqlite_raft_port": cfg.RaftPort,
"rqlite_http_adv_addr": cfg.HTTPAdvAddress,
"rqlite_raft_adv_addr": cfg.RaftAdvAddress,
"rqlite_join_addrs": cfg.JoinAddresses,
"rqlite_is_leader": cfg.IsLeader,
})
if err != nil {
return nil, err
}
return &rqlite.Instance{PID: resp.PID}, nil
}
// spawnOlricRemote sends a spawn-olric request to a remote node
func (cm *ClusterManager) spawnOlricRemote(ctx context.Context, nodeIP string, cfg olric.InstanceConfig) (*olric.OlricInstance, error) {
resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
"action": "spawn-olric",
"namespace": cfg.Namespace,
"node_id": cfg.NodeID,
"olric_http_port": cfg.HTTPPort,
"olric_memberlist_port": cfg.MemberlistPort,
"olric_bind_addr": cfg.BindAddr,
"olric_advertise_addr": cfg.AdvertiseAddr,
"olric_peer_addresses": cfg.PeerAddresses,
})
if err != nil {
return nil, err
}
return &olric.OlricInstance{
PID: resp.PID,
HTTPPort: cfg.HTTPPort,
MemberlistPort: cfg.MemberlistPort,
BindAddr: cfg.BindAddr,
AdvertiseAddr: cfg.AdvertiseAddr,
}, nil
}
// spawnGatewayRemote sends a spawn-gateway request to a remote node
func (cm *ClusterManager) spawnGatewayRemote(ctx context.Context, nodeIP string, cfg gateway.InstanceConfig) (*gateway.GatewayInstance, error) {
ipfsTimeout := ""
if cfg.IPFSTimeout > 0 {
ipfsTimeout = cfg.IPFSTimeout.String()
}
olricTimeout := ""
if cfg.OlricTimeout > 0 {
olricTimeout = cfg.OlricTimeout.String()
}
resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
"action": "spawn-gateway",
"namespace": cfg.Namespace,
"node_id": cfg.NodeID,
"gateway_http_port": cfg.HTTPPort,
"gateway_base_domain": cfg.BaseDomain,
"gateway_rqlite_dsn": cfg.RQLiteDSN,
"gateway_global_rqlite_dsn": cfg.GlobalRQLiteDSN,
"gateway_olric_servers": cfg.OlricServers,
"gateway_olric_timeout": olricTimeout,
"ipfs_cluster_api_url": cfg.IPFSClusterAPIURL,
"ipfs_api_url": cfg.IPFSAPIURL,
"ipfs_timeout": ipfsTimeout,
"ipfs_replication_factor": cfg.IPFSReplicationFactor,
"gateway_webrtc_enabled": cfg.WebRTCEnabled,
"gateway_sfu_port": cfg.SFUPort,
"gateway_turn_domain": cfg.TURNDomain,
"gateway_turn_secret": cfg.TURNSecret,
"gateway_turn_stealth_domain": cfg.TURNStealthDomain,
// Bugboard #837 follow-up: carry the host secrets encryption key to
// the remote node so its spawned namespace gateway can manage secrets.
"gateway_secrets_encryption_key": cfg.SecretsEncryptionKey,
})
if err != nil {
return nil, err
}
return &gateway.GatewayInstance{
Namespace: cfg.Namespace,
NodeID: cfg.NodeID,
HTTPPort: cfg.HTTPPort,
BaseDomain: cfg.BaseDomain,
RQLiteDSN: cfg.RQLiteDSN,
OlricServers: cfg.OlricServers,
PID: resp.PID,
}, nil
}
// spawnResponse represents the JSON response from a spawn request
type spawnResponse struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
PID int `json:"pid,omitempty"`
}
// sendSpawnRequest sends a spawn/stop request to a remote node's spawn endpoint
func (cm *ClusterManager) sendSpawnRequest(ctx context.Context, nodeIP string, req map[string]interface{}) (*spawnResponse, error) {
url := fmt.Sprintf("http://%s:6001/v1/internal/namespace/spawn", nodeIP)
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal spawn request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("X-Orama-Internal-Auth", "namespace-coordination")
client := &http.Client{Timeout: 60 * time.Second}
resp, err := client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("failed to send spawn request to %s: %w", nodeIP, err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response from %s: %w", nodeIP, err)
}
var spawnResp spawnResponse
if err := json.Unmarshal(respBody, &spawnResp); err != nil {
return nil, fmt.Errorf("failed to decode response from %s: %w", nodeIP, err)
}
if !spawnResp.Success {
return nil, fmt.Errorf("spawn request failed on %s: %s", nodeIP, spawnResp.Error)
}
return &spawnResp, nil
}
// stopRQLiteOnNode stops a RQLite instance on a node (local or remote)
func (cm *ClusterManager) stopRQLiteOnNode(ctx context.Context, nodeID, nodeIP, namespace string, inst *rqlite.Instance) {
if nodeID == cm.localNodeID {
cm.systemdSpawner.StopRQLite(ctx, namespace, nodeID)
} else {
cm.sendStopRequest(ctx, nodeIP, "stop-rqlite", namespace, nodeID)
}
}
// stopOlricOnNode stops an Olric instance on a node (local or remote)
func (cm *ClusterManager) stopOlricOnNode(ctx context.Context, nodeID, nodeIP, namespace string) {
if nodeID == cm.localNodeID {
cm.systemdSpawner.StopOlric(ctx, namespace, nodeID)
} else {
cm.sendStopRequest(ctx, nodeIP, "stop-olric", namespace, nodeID)
}
}
// stopGatewayOnNode stops a Gateway instance on a node (local or remote)
func (cm *ClusterManager) stopGatewayOnNode(ctx context.Context, nodeID, nodeIP, namespace string) {
if nodeID == cm.localNodeID {
cm.systemdSpawner.StopGateway(ctx, namespace, nodeID)
} else {
cm.sendStopRequest(ctx, nodeIP, "stop-gateway", namespace, nodeID)
}
}
// sendStopRequest sends a stop request to a remote node
func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, namespace, nodeID string) {
_, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
"action": action,
"namespace": namespace,
"node_id": nodeID,
})
if err != nil {
cm.logger.Warn("Failed to send stop request to remote node",
zap.String("node_ip", nodeIP),
zap.String("action", action),
zap.Error(err),
)
}
}
// createDNSRecords creates DNS records for the namespace gateway.
// Creates A records (+ wildcards) pointing to the public IPs of nodes running the namespace gateway cluster.
func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error {
// Collect public IPs from the selected cluster nodes
var gatewayIPs []string
for _, node := range nodes {
if node.IPAddress != "" {
gatewayIPs = append(gatewayIPs, node.IPAddress)
}
}
if len(gatewayIPs) == 0 {
cm.logger.Error("No valid node IPs found for DNS records",
zap.String("namespace", cluster.NamespaceName),
zap.Int("node_count", len(nodes)),
)
return fmt.Errorf("no valid node IPs found for DNS records")
}
if err := cm.dnsManager.CreateNamespaceRecords(ctx, cluster.NamespaceName, gatewayIPs); err != nil {
return err
}
fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain)
cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d gateway node records)", fqdn, len(gatewayIPs)*2), nil)
return nil
}
// rollbackProvisioning cleans up a failed provisioning attempt
func (cm *ClusterManager) rollbackProvisioning(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) {
cm.logger.Info("Rolling back failed provisioning", zap.String("cluster_id", cluster.ID))
// Stop all namespace services (Gateway, Olric, RQLite) using systemd
cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName)
// Stop Olric instances on each node
if olricInstances != nil && nodes != nil {
for _, node := range nodes {
cm.stopOlricOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName)
}
}
// Stop RQLite instances on each node
if rqliteInstances != nil && nodes != nil {
for i, inst := range rqliteInstances {
if inst != nil && i < len(nodes) {
cm.stopRQLiteOnNode(ctx, nodes[i].NodeID, nodes[i].InternalIP, cluster.NamespaceName, inst)
}
}
}
// Deallocate ports
cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID)
// Update cluster status
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, "Provisioning failed and rolled back")
}
// DeprovisionCluster tears down a namespace cluster on all nodes.
// Stops namespace infrastructure (Gateway, Olric, RQLite) on every cluster node,
// deletes cluster-state.json, deallocates ports, removes DNS records, and cleans up DB.
func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID int64) error {
cluster, err := cm.GetClusterByNamespaceID(ctx, namespaceID)
if err != nil {
return fmt.Errorf("failed to get cluster: %w", err)
}
if cluster == nil {
return nil // No cluster to deprovision
}
cm.logger.Info("Starting cluster deprovisioning",
zap.String("cluster_id", cluster.ID),
zap.String("namespace", cluster.NamespaceName),
)
cm.logEvent(ctx, cluster.ID, EventDeprovisionStarted, "", "Cluster deprovisioning started", nil)
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusDeprovisioning, "")
// 1. Get cluster nodes WITH IPs (must happen before any DB deletion)
type deprovisionNodeInfo struct {
NodeID string `db:"node_id"`
InternalIP string `db:"internal_ip"`
}
var clusterNodes []deprovisionNodeInfo
nodeQuery := `
SELECT ncn.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip
FROM namespace_cluster_nodes ncn
JOIN dns_nodes dn ON ncn.node_id = dn.id
WHERE ncn.namespace_cluster_id = ?
`
if err := cm.db.Query(ctx, &clusterNodes, nodeQuery, cluster.ID); err != nil {
cm.logger.Warn("Failed to query cluster nodes for deprovisioning, falling back to local-only stop", zap.Error(err))
// Fall back to local-only stop (individual methods, NOT StopAll which uses dangerous glob)
// Stop WebRTC services first (SFU → TURN), then core services (Gateway → Olric → RQLite)
cm.systemdSpawner.StopSFU(ctx, cluster.NamespaceName, cm.localNodeID)
cm.systemdSpawner.StopTURN(ctx, cluster.NamespaceName, cm.localNodeID)
cm.systemdSpawner.StopGateway(ctx, cluster.NamespaceName, cm.localNodeID)
cm.systemdSpawner.StopOlric(ctx, cluster.NamespaceName, cm.localNodeID)
cm.systemdSpawner.StopRQLite(ctx, cluster.NamespaceName, cm.localNodeID)
cm.systemdSpawner.DeleteClusterState(cluster.NamespaceName)
} else {
// 2. Stop WebRTC services first (SFU → TURN), then core infra (Gateway → Olric → RQLite)
for _, node := range clusterNodes {
cm.stopSFUOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName)
}
for _, node := range clusterNodes {
cm.stopTURNOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName)
}
for _, node := range clusterNodes {
cm.stopGatewayOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName)
}
for _, node := range clusterNodes {
cm.stopOlricOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName)
}
for _, node := range clusterNodes {
cm.stopRQLiteOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName, nil)
}
// 3. Delete cluster-state.json on all nodes
for _, node := range clusterNodes {
if node.NodeID == cm.localNodeID {
cm.systemdSpawner.DeleteClusterState(cluster.NamespaceName)
} else {
cm.sendStopRequest(ctx, node.InternalIP, "delete-cluster-state", cluster.NamespaceName, node.NodeID)
}
}
}
// 4. Deallocate all ports (core + WebRTC)
cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID)
cm.webrtcPortAllocator.DeallocateAll(ctx, cluster.ID)
// 5. Delete namespace DNS records (gateway + TURN)
cm.dnsManager.DeleteNamespaceRecords(ctx, cluster.NamespaceName)
cm.dnsManager.DeleteTURNRecords(ctx, cluster.NamespaceName)
// 6. Explicitly delete child tables (FK cascades disabled in rqlite)
cm.db.Exec(ctx, `DELETE FROM namespace_cluster_events WHERE namespace_cluster_id = ?`, cluster.ID)
cm.db.Exec(ctx, `DELETE FROM namespace_cluster_nodes WHERE namespace_cluster_id = ?`, cluster.ID)
cm.db.Exec(ctx, `DELETE FROM namespace_port_allocations WHERE namespace_cluster_id = ?`, cluster.ID)
cm.db.Exec(ctx, `DELETE FROM webrtc_port_allocations WHERE namespace_cluster_id = ?`, cluster.ID)
cm.db.Exec(ctx, `DELETE FROM webrtc_rooms WHERE namespace_cluster_id = ?`, cluster.ID)
cm.db.Exec(ctx, `DELETE FROM namespace_webrtc_config WHERE namespace_cluster_id = ?`, cluster.ID)
// 7. Delete cluster record
cm.db.Exec(ctx, `DELETE FROM namespace_clusters WHERE id = ?`, cluster.ID)
cm.logEvent(ctx, cluster.ID, EventDeprovisioned, "", "Cluster deprovisioned", nil)
cm.logger.Info("Cluster deprovisioning completed", zap.String("cluster_id", cluster.ID))
return nil
}
// GetClusterStatus returns the current status of a namespace cluster
func (cm *ClusterManager) GetClusterStatus(ctx context.Context, clusterID string) (*ClusterProvisioningStatus, error) {
cluster, err := cm.GetCluster(ctx, clusterID)
if err != nil {
return nil, err
}
if cluster == nil {
return nil, fmt.Errorf("cluster not found")
}
status := &ClusterProvisioningStatus{
Status: cluster.Status,
ClusterID: cluster.ID,
}
// Check individual service status by inspecting cluster nodes
nodes, err := cm.getClusterNodes(ctx, clusterID)
if err == nil {
runningCount := 0
hasRQLite := false
hasOlric := false
hasGateway := false
for _, node := range nodes {
status.Nodes = append(status.Nodes, node.NodeID)
if node.Status == NodeStatusRunning {
runningCount++
}
if node.RQLiteHTTPPort > 0 {
hasRQLite = true
}
if node.OlricHTTPPort > 0 {
hasOlric = true
}
if node.GatewayHTTPPort > 0 {
hasGateway = true
}
}
allRunning := len(nodes) > 0 && runningCount == len(nodes)
status.RQLiteReady = allRunning && hasRQLite
status.OlricReady = allRunning && hasOlric
status.GatewayReady = allRunning && hasGateway
status.DNSReady = allRunning
}
if cluster.ErrorMessage != "" {
status.Error = cluster.ErrorMessage
}
return status, nil
}
// GetCluster retrieves a cluster by ID
func (cm *ClusterManager) GetCluster(ctx context.Context, clusterID string) (*NamespaceCluster, error) {
var clusters []NamespaceCluster
query := `SELECT * FROM namespace_clusters WHERE id = ?`
if err := cm.db.Query(ctx, &clusters, query, clusterID); err != nil {
return nil, err
}
if len(clusters) == 0 {
return nil, nil
}
return &clusters[0], nil
}
// GetClusterByNamespaceID retrieves a cluster by namespace ID
func (cm *ClusterManager) GetClusterByNamespaceID(ctx context.Context, namespaceID int64) (*NamespaceCluster, error) {
var clusters []NamespaceCluster
query := `SELECT * FROM namespace_clusters WHERE namespace_id = ?`
if err := cm.db.Query(ctx, &clusters, query, namespaceID); err != nil {
return nil, err
}
if len(clusters) == 0 {
return nil, nil
}
return &clusters[0], nil
}
// GetClusterByNamespace retrieves a cluster by namespace name
func (cm *ClusterManager) GetClusterByNamespace(ctx context.Context, namespaceName string) (*NamespaceCluster, error) {
var clusters []NamespaceCluster
query := `SELECT * FROM namespace_clusters WHERE namespace_name = ?`
if err := cm.db.Query(ctx, &clusters, query, namespaceName); err != nil {
return nil, err
}
if len(clusters) == 0 {
return nil, nil
}
return &clusters[0], nil
}
// Database helper methods
func (cm *ClusterManager) insertCluster(ctx context.Context, cluster *NamespaceCluster) error {
query := `
INSERT INTO namespace_clusters (
id, namespace_id, namespace_name, status,
rqlite_node_count, olric_node_count, gateway_node_count,
provisioned_by, provisioned_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := cm.db.Exec(ctx, query,
cluster.ID, cluster.NamespaceID, cluster.NamespaceName, cluster.Status,
cluster.RQLiteNodeCount, cluster.OlricNodeCount, cluster.GatewayNodeCount,
cluster.ProvisionedBy, cluster.ProvisionedAt,
)
return err
}
func (cm *ClusterManager) updateClusterStatus(ctx context.Context, clusterID string, status ClusterStatus, errorMsg string) error {
var query string
var args []interface{}
if status == ClusterStatusReady {
query = `UPDATE namespace_clusters SET status = ?, ready_at = ?, error_message = '' WHERE id = ?`
args = []interface{}{status, time.Now(), clusterID}
} else {
query = `UPDATE namespace_clusters SET status = ?, error_message = ? WHERE id = ?`
args = []interface{}{status, errorMsg, clusterID}
}
_, err := cm.db.Exec(ctx, query, args...)
return err
}
func (cm *ClusterManager) insertClusterNode(ctx context.Context, clusterID, nodeID string, role NodeRole, portBlock *PortBlock) error {
query := `
INSERT INTO namespace_cluster_nodes (
id, namespace_cluster_id, node_id, role, status,
rqlite_http_port, rqlite_raft_port,
olric_http_port, olric_memberlist_port,
gateway_http_port, created_at, updated_at
) VALUES (?, ?, ?, ?, 'running', ?, ?, ?, ?, ?, ?, ?)
`
now := time.Now()
_, err := cm.db.Exec(ctx, query,
uuid.New().String(), clusterID, nodeID, role,
portBlock.RQLiteHTTPPort, portBlock.RQLiteRaftPort,
portBlock.OlricHTTPPort, portBlock.OlricMemberlistPort,
portBlock.GatewayHTTPPort, now, now,
)
return err
}
func (cm *ClusterManager) getClusterNodes(ctx context.Context, clusterID string) ([]ClusterNode, error) {
var nodes []ClusterNode
query := `SELECT * FROM namespace_cluster_nodes WHERE namespace_cluster_id = ?`
if err := cm.db.Query(ctx, &nodes, query, clusterID); err != nil {
return nil, err
}
return nodes, nil
}
func (cm *ClusterManager) logEvent(ctx context.Context, clusterID string, eventType EventType, nodeID, message string, metadata map[string]interface{}) {
metadataJSON := ""
if metadata != nil {
if data, err := json.Marshal(metadata); err == nil {
metadataJSON = string(data)
}
}
query := `
INSERT INTO namespace_cluster_events (id, namespace_cluster_id, event_type, node_id, message, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`
_, err := cm.db.Exec(ctx, query, uuid.New().String(), clusterID, eventType, nodeID, message, metadataJSON, time.Now())
if err != nil {
cm.logger.Warn("Failed to log cluster event", zap.Error(err))
}
}
// ClusterProvisioner interface implementation
// CheckNamespaceCluster checks if a namespace has a cluster and returns its status.
// Returns: (clusterID, status, needsProvisioning, error)
// - If the namespace is "default", returns ("", "default", false, nil) as it uses the global cluster
// - If a cluster exists and is ready/provisioning, returns (clusterID, status, false, nil)
// - If no cluster exists or cluster failed, returns ("", "", true, nil) to indicate provisioning is needed
func (cm *ClusterManager) CheckNamespaceCluster(ctx context.Context, namespaceName string) (string, string, bool, error) {
// Default namespace uses the global cluster, no per-namespace cluster needed
if namespaceName == "default" || namespaceName == "" {
return "", "default", false, nil
}
cluster, err := cm.GetClusterByNamespace(ctx, namespaceName)
if err != nil {
return "", "", false, err
}
if cluster == nil {
// No cluster exists, provisioning is needed
return "", "", true, nil
}
// If the cluster failed, delete the old record and trigger re-provisioning
if cluster.Status == ClusterStatusFailed {
cm.logger.Info("Found failed cluster, will re-provision",
zap.String("namespace", namespaceName),
zap.String("cluster_id", cluster.ID),
)
// Delete the failed cluster record
query := `DELETE FROM namespace_clusters WHERE id = ?`
cm.db.Exec(ctx, query, cluster.ID)
// Also clean up any port allocations
cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID)
return "", "", true, nil
}
// Return current status
return cluster.ID, string(cluster.Status), false, nil
}
// ProvisionNamespaceCluster triggers provisioning for a new namespace cluster.
// Returns: (clusterID, pollURL, error)
// This starts an async provisioning process and returns immediately with the cluster ID
// and a URL to poll for status updates.
func (cm *ClusterManager) ProvisionNamespaceCluster(ctx context.Context, namespaceID int, namespaceName, wallet string) (string, string, error) {
// Check if already provisioning
cm.provisioningMu.Lock()
if cm.provisioning[namespaceName] {
cm.provisioningMu.Unlock()
// Return existing cluster ID if found
cluster, _ := cm.GetClusterByNamespace(ctx, namespaceName)
if cluster != nil {
return cluster.ID, "/v1/namespace/status?id=" + cluster.ID, nil
}
return "", "", fmt.Errorf("namespace %s is already being provisioned", namespaceName)
}
cm.provisioning[namespaceName] = true
cm.provisioningMu.Unlock()
// Create cluster record synchronously to get the ID
cluster := &NamespaceCluster{
ID: uuid.New().String(),
NamespaceID: namespaceID,
NamespaceName: namespaceName,
Status: ClusterStatusProvisioning,
RQLiteNodeCount: 3,
OlricNodeCount: 3,
GatewayNodeCount: 3,
ProvisionedBy: wallet,
ProvisionedAt: time.Now(),
}
// Insert cluster record
if err := cm.insertCluster(ctx, cluster); err != nil {
cm.provisioningMu.Lock()
delete(cm.provisioning, namespaceName)
cm.provisioningMu.Unlock()
return "", "", fmt.Errorf("failed to insert cluster record: %w", err)
}
cm.logEvent(ctx, cluster.ID, EventProvisioningStarted, "", "Cluster provisioning started", nil)
// Start actual provisioning in background goroutine
go cm.provisionClusterAsync(cluster, namespaceID, namespaceName, wallet)
pollURL := "/v1/namespace/status?id=" + cluster.ID
return cluster.ID, pollURL, nil
}
// provisionClusterAsync performs the actual cluster provisioning in the background
func (cm *ClusterManager) provisionClusterAsync(cluster *NamespaceCluster, namespaceID int, namespaceName, provisionedBy string) {
defer func() {
// Recover from panics (e.g., gorqlite index-out-of-range) so the
// goroutine doesn't die silently leaving status stuck at "provisioning".
if r := recover(); r != nil {
cm.logger.Error("Provisioning panicked",
zap.String("namespace", namespaceName),
zap.Any("panic", r),
)
bgCtx := context.Background()
cm.updateClusterStatus(bgCtx, cluster.ID, ClusterStatusFailed,
fmt.Sprintf("provisioning panicked: %v", r))
}
cm.provisioningMu.Lock()
delete(cm.provisioning, namespaceName)
cm.provisioningMu.Unlock()
}()
// Overall timeout — prevents the goroutine from hanging indefinitely
// if a remote spawn request or RQLite write blocks.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
cm.logger.Info("Starting async cluster provisioning",
zap.String("cluster_id", cluster.ID),
zap.String("namespace", namespaceName),
zap.Int("namespace_id", namespaceID),
zap.String("provisioned_by", provisionedBy),
)
// Select 3 nodes for the cluster
nodes, err := cm.nodeSelector.SelectNodesForCluster(ctx, 3)
if err != nil {
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, err.Error())
cm.logger.Error("Failed to select nodes for cluster", zap.Error(err))
return
}
nodeIDs := make([]string, len(nodes))
for i, n := range nodes {
nodeIDs[i] = n.NodeID
}
cm.logEvent(ctx, cluster.ID, EventNodesSelected, "", "Selected nodes for cluster", map[string]interface{}{"nodes": nodeIDs})
// Allocate ports on each node
portBlocks := make([]*PortBlock, len(nodes))
for i, node := range nodes {
block, err := cm.portAllocator.AllocatePortBlock(ctx, node.NodeID, cluster.ID)
if err != nil {
// Rollback previous allocations
for j := 0; j < i; j++ {
cm.portAllocator.DeallocatePortBlock(ctx, cluster.ID, nodes[j].NodeID)
}
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, err.Error())
cm.logger.Error("Failed to allocate ports", zap.Error(err))
return
}
portBlocks[i] = block
cm.logEvent(ctx, cluster.ID, EventPortsAllocated, node.NodeID,
fmt.Sprintf("Allocated ports %d-%d", block.PortStart, block.PortEnd), nil)
}
// Start RQLite instances (leader first, then followers)
rqliteInstances, err := cm.startRQLiteCluster(ctx, cluster, nodes, portBlocks)
if err != nil {
cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, nil, nil)
cm.logger.Error("Failed to start RQLite cluster", zap.Error(err))
return
}
// Start Olric instances
olricInstances, err := cm.startOlricCluster(ctx, cluster, nodes, portBlocks)
if err != nil {
cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, nil)
cm.logger.Error("Failed to start Olric cluster", zap.Error(err))
return
}
// Start Gateway instances (optional - may not be available in dev mode)
_, err = cm.startGatewayCluster(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances)
if err != nil {
// Check if this is a "binary not found" error - if so, continue without gateways
if strings.Contains(err.Error(), "gateway binary not found") {
cm.logger.Warn("Skipping namespace gateway spawning (binary not available)",
zap.String("namespace", cluster.NamespaceName),
zap.Error(err),
)
cm.logEvent(ctx, cluster.ID, "gateway_skipped", "", "Gateway binary not available, cluster will use main gateway", nil)
} else {
cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances)
cm.logger.Error("Failed to start Gateway cluster", zap.Error(err))
return
}
}
// Create DNS records for namespace gateway
if err := cm.createDNSRecords(ctx, cluster, nodes, portBlocks); err != nil {
cm.logger.Warn("Failed to create DNS records", zap.Error(err))
// Don't fail provisioning for DNS errors
}
// Update cluster status to ready
now := time.Now()
cluster.Status = ClusterStatusReady
cluster.ReadyAt = &now
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "")
cm.logEvent(ctx, cluster.ID, EventClusterReady, "", "Cluster is ready", nil)
cm.logger.Info("Cluster provisioning completed",
zap.String("cluster_id", cluster.ID),
zap.String("namespace", namespaceName),
)
}
// RestoreLocalClusters restores namespace cluster processes that should be running on this node.
// Called on node startup to re-spawn RQLite, Olric, and Gateway processes for clusters
// that were previously provisioned and assigned to this node.
func (cm *ClusterManager) RestoreLocalClusters(ctx context.Context) error {
if cm.localNodeID == "" {
return fmt.Errorf("local node ID not set")
}
cm.logger.Info("Checking for namespace clusters to restore", zap.String("local_node_id", cm.localNodeID))
// Find all ready clusters that have this node assigned
type clusterNodeInfo struct {
ClusterID string `db:"namespace_cluster_id"`
NamespaceName string `db:"namespace_name"`
NodeID string `db:"node_id"`
Role string `db:"role"`
}
var assignments []clusterNodeInfo
query := `
SELECT DISTINCT cn.namespace_cluster_id, c.namespace_name, cn.node_id, cn.role
FROM namespace_cluster_nodes cn
JOIN namespace_clusters c ON cn.namespace_cluster_id = c.id
WHERE cn.node_id = ? AND c.status = 'ready'
`
if err := cm.db.Query(ctx, &assignments, query, cm.localNodeID); err != nil {
return fmt.Errorf("failed to query local cluster assignments: %w", err)
}
if len(assignments) == 0 {
cm.logger.Info("No namespace clusters to restore on this node")
return nil
}
// Group by cluster
clusterNamespaces := make(map[string]string) // clusterID -> namespaceName
for _, a := range assignments {
clusterNamespaces[a.ClusterID] = a.NamespaceName
}
cm.logger.Info("Found namespace clusters to restore",
zap.Int("count", len(clusterNamespaces)),
zap.String("local_node_id", cm.localNodeID),
)
// Get local node's WireGuard IP
type nodeIPInfo struct {
InternalIP string `db:"internal_ip"`
}
var localNodeInfo []nodeIPInfo
ipQuery := `SELECT COALESCE(internal_ip, ip_address) as internal_ip FROM dns_nodes WHERE id = ? LIMIT 1`
if err := cm.db.Query(ctx, &localNodeInfo, ipQuery, cm.localNodeID); err != nil || len(localNodeInfo) == 0 {
cm.logger.Warn("Could not determine local node IP, skipping restore", zap.Error(err))
return fmt.Errorf("failed to get local node IP: %w", err)
}
localIP := localNodeInfo[0].InternalIP
for clusterID, namespaceName := range clusterNamespaces {
if err := cm.restoreClusterOnNode(ctx, clusterID, namespaceName, localIP); err != nil {
cm.logger.Error("Failed to restore namespace cluster",
zap.String("namespace", namespaceName),
zap.String("cluster_id", clusterID),
zap.Error(err),
)
// Continue restoring other clusters
}
}
return nil
}
// restoreClusterOnNode restores all processes for a single cluster on this node
func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, namespaceName, localIP string) error {
cm.logger.Info("Restoring namespace cluster processes",
zap.String("namespace", namespaceName),
zap.String("cluster_id", clusterID),
)
// Get port allocation for this node
var portBlocks []PortBlock
portQuery := `SELECT * FROM namespace_port_allocations WHERE namespace_cluster_id = ? AND node_id = ?`
if err := cm.db.Query(ctx, &portBlocks, portQuery, clusterID, cm.localNodeID); err != nil || len(portBlocks) == 0 {
return fmt.Errorf("no port allocation found for cluster %s on node %s", clusterID, cm.localNodeID)
}
pb := &portBlocks[0]
// Get all nodes in this cluster (for join addresses and peer addresses)
allNodes, err := cm.getClusterNodes(ctx, clusterID)
if err != nil {
return fmt.Errorf("failed to get cluster nodes: %w", err)
}
// Get all nodes' IPs and port allocations
type nodePortInfo struct {
NodeID string `db:"node_id"`
InternalIP string `db:"internal_ip"`
RQLiteHTTPPort int `db:"rqlite_http_port"`
RQLiteRaftPort int `db:"rqlite_raft_port"`
OlricHTTPPort int `db:"olric_http_port"`
OlricMemberlistPort int `db:"olric_memberlist_port"`
}
var allNodePorts []nodePortInfo
allPortsQuery := `
SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip,
pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port, pa.olric_memberlist_port
FROM namespace_port_allocations pa
JOIN dns_nodes dn ON pa.node_id = dn.id
WHERE pa.namespace_cluster_id = ?
`
if err := cm.db.Query(ctx, &allNodePorts, allPortsQuery, clusterID); err != nil {
return fmt.Errorf("failed to get all node ports: %w", err)
}
// 1. Restore RQLite
// Check if RQLite systemd service is already running
rqliteRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(namespaceName, systemd.ServiceTypeRQLite)
if !rqliteRunning {
// Check if RQLite data directory exists (has existing data)
dataDir := filepath.Join(cm.baseDataDir, namespaceName, "rqlite", cm.localNodeID)
hasExistingData := false
if _, err := os.Stat(filepath.Join(dataDir, "raft")); err == nil {
hasExistingData = true
}
if hasExistingData {
// Write peers.json for Raft cluster recovery (official RQLite mechanism).
// When all nodes restart simultaneously, Raft can't form quorum from stale state.
// peers.json tells rqlited the correct voter list so it can hold a fresh election.
var peers []rqlite.RaftPeer
for _, np := range allNodePorts {
raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)
peers = append(peers, rqlite.RaftPeer{
ID: raftAddr,
Address: raftAddr,
NonVoter: false,
})
}
if err := cm.writePeersJSON(dataDir, peers); err != nil {
cm.logger.Error("Failed to write peers.json", zap.String("namespace", namespaceName), zap.Error(err))
}
}
// Build join addresses for first-time joins (no existing data)
var joinAddrs []string
isLeader := false
if !hasExistingData {
// Deterministic leader selection: sort all node IDs and pick the first one.
// Every node independently computes the same result — no coordination needed.
// The elected leader bootstraps the cluster; followers use -join with retries
// to wait for the leader to become ready (up to 5 minutes).
sortedNodeIDs := make([]string, 0, len(allNodePorts))
for _, np := range allNodePorts {
sortedNodeIDs = append(sortedNodeIDs, np.NodeID)
}
sort.Strings(sortedNodeIDs)
electedLeaderID := sortedNodeIDs[0]
if cm.localNodeID == electedLeaderID {
isLeader = true
cm.logger.Info("Deterministic leader election: this node is the leader",
zap.String("namespace", namespaceName),
zap.String("node_id", cm.localNodeID))
} else {
// Follower: join the elected leader's raft address
for _, np := range allNodePorts {
if np.NodeID == electedLeaderID {
joinAddrs = append(joinAddrs, fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort))
break
}
}
cm.logger.Info("Deterministic leader election: this node is a follower",
zap.String("namespace", namespaceName),
zap.String("node_id", cm.localNodeID),
zap.String("leader_id", electedLeaderID),
zap.Strings("join_addrs", joinAddrs))
}
}
rqliteCfg := rqlite.InstanceConfig{
Namespace: namespaceName,
NodeID: cm.localNodeID,
HTTPPort: pb.RQLiteHTTPPort,
RaftPort: pb.RQLiteRaftPort,
HTTPAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteHTTPPort),
RaftAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteRaftPort),
JoinAddresses: joinAddrs,
IsLeader: isLeader,
}
if err := cm.spawnRQLiteWithSystemd(ctx, rqliteCfg); err != nil {
cm.logger.Error("Failed to restore RQLite", zap.String("namespace", namespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored RQLite instance", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort))
}
} else {
cm.logger.Info("RQLite already running", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort))
}
// 2. Restore Olric
olricRunning := false
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", pb.OlricMemberlistPort), 2*time.Second)
if err == nil {
conn.Close()
olricRunning = true
}
if !olricRunning {
var peers []string
for _, np := range allNodePorts {
if np.NodeID != cm.localNodeID {
peers = append(peers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricMemberlistPort))
}
}
olricCfg := olric.InstanceConfig{
Namespace: namespaceName,
NodeID: cm.localNodeID,
HTTPPort: pb.OlricHTTPPort,
MemberlistPort: pb.OlricMemberlistPort,
BindAddr: localIP,
AdvertiseAddr: localIP,
PeerAddresses: peers,
}
if err := cm.spawnOlricWithSystemd(ctx, olricCfg); err != nil {
cm.logger.Error("Failed to restore Olric", zap.String("namespace", namespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored Olric instance", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricHTTPPort))
}
} else {
cm.logger.Info("Olric already running", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricMemberlistPort))
}
// 3. Restore Gateway
// Check if any cluster node has the gateway role (gateway may have been skipped during provisioning)
hasGateway := false
for _, node := range allNodes {
if node.Role == NodeRoleGateway {
hasGateway = true
break
}
}
if hasGateway {
gwRunning := false
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort))
if err == nil {
resp.Body.Close()
gwRunning = true
}
if !gwRunning {
// Build olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface)
var olricServers []string
for _, np := range allNodePorts {
olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort))
}
gwCfg := gateway.InstanceConfig{
Namespace: namespaceName,
NodeID: cm.localNodeID,
HTTPPort: pb.GatewayHTTPPort,
BaseDomain: cm.baseDomain,
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
GlobalRQLiteDSN: cm.globalRQLiteDSN,
OlricServers: olricServers,
OlricTimeout: 30 * time.Second,
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
IPFSAPIURL: cm.ipfsAPIURL,
IPFSTimeout: cm.ipfsTimeout,
IPFSReplicationFactor: cm.ipfsReplicationFactor,
SecretsEncryptionKey: cm.secretsEncryptionKey,
}
// Add WebRTC config if enabled for this namespace
if webrtcCfg, err := cm.GetWebRTCConfig(ctx, namespaceName); err == nil && webrtcCfg != nil {
if sfuBlock, err := cm.webrtcPortAllocator.GetSFUPorts(ctx, clusterID, cm.localNodeID); err == nil && sfuBlock != nil {
gwCfg.WebRTCEnabled = true
gwCfg.SFUPort = sfuBlock.SFUSignalingPort
gwCfg.TURNDomain = fmt.Sprintf("turn.ns-%s.%s", namespaceName, cm.baseDomain)
gwCfg.TURNSecret = webrtcCfg.TURNSharedSecret
gwCfg.TURNStealthDomain = cm.stealthDomainFor(namespaceName, webrtcCfg)
}
}
if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil {
cm.logger.Error("Failed to restore Gateway", zap.String("namespace", namespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored Gateway instance", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort))
}
} else {
cm.logger.Info("Gateway already running", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort))
}
}
// Save local state to disk for future restarts without DB dependency
var stateNodes []ClusterLocalStateNode
for _, np := range allNodePorts {
stateNodes = append(stateNodes, ClusterLocalStateNode{
NodeID: np.NodeID,
InternalIP: np.InternalIP,
RQLiteHTTPPort: np.RQLiteHTTPPort,
RQLiteRaftPort: np.RQLiteRaftPort,
OlricHTTPPort: np.OlricHTTPPort,
OlricMemberlistPort: np.OlricMemberlistPort,
})
}
localState := &ClusterLocalState{
ClusterID: clusterID,
NamespaceName: namespaceName,
LocalNodeID: cm.localNodeID,
LocalIP: localIP,
LocalPorts: ClusterLocalStatePorts{
RQLiteHTTPPort: pb.RQLiteHTTPPort,
RQLiteRaftPort: pb.RQLiteRaftPort,
OlricHTTPPort: pb.OlricHTTPPort,
OlricMemberlistPort: pb.OlricMemberlistPort,
GatewayHTTPPort: pb.GatewayHTTPPort,
},
AllNodes: stateNodes,
HasGateway: hasGateway,
BaseDomain: cm.baseDomain,
SavedAt: time.Now(),
}
if err := cm.saveLocalState(localState); err != nil {
cm.logger.Warn("Failed to save cluster local state", zap.String("namespace", namespaceName), zap.Error(err))
}
return nil
}
// ClusterLocalState is persisted to disk so namespace processes can be restored
// without querying the main RQLite cluster (which may not have a leader yet on cold start).
type ClusterLocalState struct {
ClusterID string `json:"cluster_id"`
NamespaceName string `json:"namespace_name"`
LocalNodeID string `json:"local_node_id"`
LocalIP string `json:"local_ip"`
LocalPorts ClusterLocalStatePorts `json:"local_ports"`
AllNodes []ClusterLocalStateNode `json:"all_nodes"`
HasGateway bool `json:"has_gateway"`
BaseDomain string `json:"base_domain"`
SavedAt time.Time `json:"saved_at"`
// WebRTC fields (zero values when WebRTC not enabled — backward compatible)
HasSFU bool `json:"has_sfu,omitempty"`
HasTURN bool `json:"has_turn,omitempty"`
TURNSharedSecret string `json:"turn_shared_secret,omitempty"` // Needed for gateway to generate TURN credentials on cold start
TURNDomain string `json:"turn_domain,omitempty"` // TURN server domain for gateway config
TURNStealthDomain string `json:"turn_stealth_domain,omitempty"` // Stealth TURNS:443 host (feat-124); empty when stealth disabled
TURNCredentialTTL int `json:"turn_credential_ttl,omitempty"`
SFUSignalingPort int `json:"sfu_signaling_port,omitempty"`
SFUMediaPortStart int `json:"sfu_media_port_start,omitempty"`
SFUMediaPortEnd int `json:"sfu_media_port_end,omitempty"`
TURNListenPort int `json:"turn_listen_port,omitempty"`
TURNTLSPort int `json:"turn_tls_port,omitempty"`
TURNRelayPortStart int `json:"turn_relay_port_start,omitempty"`
TURNRelayPortEnd int `json:"turn_relay_port_end,omitempty"`
}
type ClusterLocalStatePorts struct {
RQLiteHTTPPort int `json:"rqlite_http_port"`
RQLiteRaftPort int `json:"rqlite_raft_port"`
OlricHTTPPort int `json:"olric_http_port"`
OlricMemberlistPort int `json:"olric_memberlist_port"`
GatewayHTTPPort int `json:"gateway_http_port"`
}
type ClusterLocalStateNode struct {
NodeID string `json:"node_id"`
InternalIP string `json:"internal_ip"`
RQLiteHTTPPort int `json:"rqlite_http_port"`
RQLiteRaftPort int `json:"rqlite_raft_port"`
OlricHTTPPort int `json:"olric_http_port"`
OlricMemberlistPort int `json:"olric_memberlist_port"`
}
// saveClusterStateToAllNodes builds and saves cluster-state.json on every node in the cluster.
// Each node gets its own state file with node-specific LocalNodeID, LocalIP, and LocalPorts.
func (cm *ClusterManager) saveClusterStateToAllNodes(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) {
// Build the shared AllNodes list
var allNodes []ClusterLocalStateNode
for i, node := range nodes {
allNodes = append(allNodes, ClusterLocalStateNode{
NodeID: node.NodeID,
InternalIP: node.InternalIP,
RQLiteHTTPPort: portBlocks[i].RQLiteHTTPPort,
RQLiteRaftPort: portBlocks[i].RQLiteRaftPort,
OlricHTTPPort: portBlocks[i].OlricHTTPPort,
OlricMemberlistPort: portBlocks[i].OlricMemberlistPort,
})
}
for i, node := range nodes {
state := &ClusterLocalState{
ClusterID: cluster.ID,
NamespaceName: cluster.NamespaceName,
LocalNodeID: node.NodeID,
LocalIP: node.InternalIP,
LocalPorts: ClusterLocalStatePorts{
RQLiteHTTPPort: portBlocks[i].RQLiteHTTPPort,
RQLiteRaftPort: portBlocks[i].RQLiteRaftPort,
OlricHTTPPort: portBlocks[i].OlricHTTPPort,
OlricMemberlistPort: portBlocks[i].OlricMemberlistPort,
GatewayHTTPPort: portBlocks[i].GatewayHTTPPort,
},
AllNodes: allNodes,
HasGateway: true,
BaseDomain: cm.baseDomain,
SavedAt: time.Now(),
}
if node.NodeID == cm.localNodeID {
// Save locally
if err := cm.saveLocalState(state); err != nil {
cm.logger.Warn("Failed to save local cluster state", zap.String("namespace", cluster.NamespaceName), zap.Error(err))
}
} else {
// Send to remote node
data, err := json.MarshalIndent(state, "", " ")
if err != nil {
cm.logger.Warn("Failed to marshal cluster state for remote node", zap.String("node", node.NodeID), zap.Error(err))
continue
}
_, err = cm.sendSpawnRequest(ctx, node.InternalIP, map[string]interface{}{
"action": "save-cluster-state",
"namespace": cluster.NamespaceName,
"node_id": node.NodeID,
"cluster_state": json.RawMessage(data),
})
if err != nil {
cm.logger.Warn("Failed to send cluster state to remote node",
zap.String("node", node.NodeID),
zap.String("ip", node.InternalIP),
zap.Error(err))
}
}
}
}
// saveLocalState writes cluster state to disk for fast recovery without DB queries.
func (cm *ClusterManager) saveLocalState(state *ClusterLocalState) error {
dir := filepath.Join(cm.baseDataDir, state.NamespaceName)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create state dir: %w", err)
}
data, err := json.MarshalIndent(state, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal state: %w", err)
}
path := filepath.Join(dir, "cluster-state.json")
// 0600: this file now carries the namespace TURN shared secret for
// cold-start resilience (bugboard #130), so it must not be world/group
// readable. WriteFile's mode only applies on create — chmod explicitly so a
// file written 0644 by an older release is tightened on the next rewrite.
if err := os.WriteFile(path, data, 0600); err != nil {
return fmt.Errorf("failed to write state file: %w", err)
}
if err := os.Chmod(path, 0600); err != nil {
return fmt.Errorf("failed to set state file permissions: %w", err)
}
cm.logger.Info("Saved cluster local state", zap.String("namespace", state.NamespaceName), zap.String("path", path))
return nil
}
// loadLocalState reads cluster state from disk.
func loadLocalState(path string) (*ClusterLocalState, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var state ClusterLocalState
if err := json.Unmarshal(data, &state); err != nil {
return nil, fmt.Errorf("failed to parse state file: %w", err)
}
return &state, nil
}
// RestoreLocalClustersFromDisk restores namespace processes using local state files,
// avoiding any dependency on the main RQLite cluster being available.
// Returns the number of namespaces restored, or -1 if no state files were found.
func (cm *ClusterManager) RestoreLocalClustersFromDisk(ctx context.Context) (int, error) {
pattern := filepath.Join(cm.baseDataDir, "*", "cluster-state.json")
matches, err := filepath.Glob(pattern)
if err != nil {
return -1, fmt.Errorf("failed to glob state files: %w", err)
}
if len(matches) == 0 {
return -1, nil
}
cm.logger.Info("Found local cluster state files", zap.Int("count", len(matches)))
restored := 0
for _, path := range matches {
state, err := loadLocalState(path)
if err != nil {
cm.logger.Error("Failed to load cluster state file", zap.String("path", path), zap.Error(err))
continue
}
if err := cm.restoreClusterFromState(ctx, state); err != nil {
cm.logger.Error("Failed to restore cluster from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
continue
}
restored++
}
return restored, nil
}
// restoreWebRTC is the resolved WebRTC gateway config for a restored
// namespace gateway.
const (
// webrtcResolveRetries / webrtcResolveRetryDelay bound how long the converge
// waits for a slow/just-restarted node's namespace rqlite to become readable
// before giving up on the WebRTC secret. A distant node (high WG RTT) can
// take a few seconds to sync; without this it reads empty once and comes up
// with TURN disabled (bugboard #130). 5 × 2s = 10s ceiling on the cold path.
webrtcResolveRetries = 5
webrtcResolveRetryDelay = 2 * time.Second
)
// applyResolvedWebRTCToState copies a freshly-resolved WebRTC config into the
// local cluster state so a future cold start can read the TURN secret from disk
// instead of the (possibly-slow) namespace rqlite (bugboard #130). Returns true
// iff the state changed, so the caller only rewrites the on-disk file when
// there's something to persist. Pure — unit-testable without a live cluster.
func applyResolvedWebRTCToState(state *ClusterLocalState, wr restoreWebRTC) bool {
hasTURN := wr.turnSecret != ""
hasSFU := wr.sfuPort > 0
if state.TURNSharedSecret == wr.turnSecret &&
state.TURNDomain == wr.turnDomain &&
state.TURNStealthDomain == wr.stealthDomain &&
state.SFUSignalingPort == wr.sfuPort &&
state.HasTURN == hasTURN &&
state.HasSFU == hasSFU {
return false
}
state.HasTURN = hasTURN
state.HasSFU = hasSFU
state.TURNSharedSecret = wr.turnSecret
state.TURNDomain = wr.turnDomain
state.TURNStealthDomain = wr.stealthDomain
state.SFUSignalingPort = wr.sfuPort
return true
}
type restoreWebRTC struct {
enabled bool
sfuPort int
turnDomain string
turnSecret string
stealthDomain string // feat-124: empty when webrtc stealth is disabled
// unresolved is true when the state file had no TURN secret AND the DB
// fallback ERRORED (vs. resolved-but-not-enabled). The caller must NOT
// write a WebRTC-disabled gateway config off an unresolved lookup — that
// silently kills turn.credentials on a node that should serve TURN
// (bugboard #130: a decrypt failure after cluster-secret rotation was
// swallowed into "disabled"). enabled is always false when unresolved.
unresolved bool
}
// chooseRestoreWebRTC resolves a restored gateway's WebRTC config. TWO
// independent aspects (bugboard #25 decouple):
//
// - TURN (turnSecret + turnDomain) is NAMESPACE-WIDE. Any gateway with
// the namespace TURN secret can mint /v1/webrtc/turn/credentials (the
// credentials are an HMAC; the actual TURN servers are remote). So a
// gateway node that runs NO local SFU still gets the TURN secret.
// - SFU (sfuPort) is PER-NODE — non-zero only when this node runs a
// local SFU (for /v1/webrtc/signal + /rooms proxying).
//
// Precedence: prefer the local state file; fall back to the DB (source of
// truth) when the state file lacks the TURN secret (the namespace-wide
// "webrtc is enabled" marker). dbFetch is lazy — only hit when needed.
//
// `enabled` is true when EITHER a TURN secret OR an SFU port is present,
// so the caller knows to write a webrtc block. A non-SFU gateway gets
// {sfuPort:0, turnSecret:set} — credentials route registers, signal/rooms
// don't.
//
// Extracted as a pure function so the precedence is unit-testable without
// standing up the full restore path (systemd spawner + DB + port store).
func chooseRestoreWebRTC(
stateHasSFU bool, stateSFUPort int, stateTURNDomain, stateTURNSecret, stateStealthDomain string,
dbFetch func() (turnSecret, turnDomain, stealthDomain string, sfuPort int, resolved bool),
) restoreWebRTC {
turnSecret := stateTURNSecret
turnDomain := stateTURNDomain
stealthDomain := stateStealthDomain
sfuPort := 0
if stateHasSFU && stateSFUPort > 0 {
sfuPort = stateSFUPort
}
// Fall back to the DB when the state file has no TURN secret — that's
// the marker that the namespace has WebRTC enabled at all. The state
// file is not updated by EnableWebRTC, so a namespace enabled after
// the state file was written reaches here with an empty secret.
// (Stealth toggles DO rewrite cluster state on every node, so the
// state-first read stays fresh for stealthDomain too.)
unresolved := false
if turnSecret == "" {
dbSecret, dbDomain, dbStealth, dbSFU, resolved := dbFetch()
switch {
case !resolved:
// The DB/decrypt lookup ERRORED — we do not know whether WebRTC
// is enabled. This is DISTINCT from resolved-but-empty (genuinely
// disabled). Signal unresolved so the caller preserves the
// running config instead of writing a TURN-disabled one
// (bugboard #130).
unresolved = true
case dbSecret != "":
turnSecret = dbSecret
if turnDomain == "" {
turnDomain = dbDomain
}
if stealthDomain == "" {
stealthDomain = dbStealth
}
if sfuPort == 0 {
sfuPort = dbSFU
}
}
}
return restoreWebRTC{
enabled: !unresolved && (turnSecret != "" || sfuPort > 0),
unresolved: unresolved,
sfuPort: sfuPort,
turnDomain: turnDomain,
turnSecret: turnSecret,
stealthDomain: stealthDomain,
}
}
// restoreClusterFromState restores all processes for a cluster using local state (no DB queries).
func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *ClusterLocalState) error {
cm.logger.Info("Restoring namespace cluster from local state",
zap.String("namespace", state.NamespaceName),
zap.String("cluster_id", state.ClusterID),
)
// Self-check: verify this node is still assigned to this cluster in the DB.
// If we were replaced during downtime, do NOT restore — stop services instead.
if cm.db != nil {
type countResult struct {
Count int `db:"count"`
}
var results []countResult
verifyQuery := `SELECT COUNT(*) as count FROM namespace_cluster_nodes WHERE namespace_cluster_id = ? AND node_id = ?`
if err := cm.db.Query(ctx, &results, verifyQuery, state.ClusterID, cm.localNodeID); err == nil && len(results) > 0 {
if results[0].Count == 0 {
cm.logger.Warn("Node was replaced during downtime, stopping orphaned services instead of restoring",
zap.String("namespace", state.NamespaceName),
zap.String("cluster_id", state.ClusterID))
cm.systemdSpawner.StopAll(ctx, state.NamespaceName)
// Delete the stale cluster-state.json
stateFilePath := filepath.Join(cm.baseDataDir, state.NamespaceName, "cluster-state.json")
os.Remove(stateFilePath)
return nil
}
}
}
pb := &state.LocalPorts
localIP := state.LocalIP
// 1. Restore RQLite
// Check if RQLite systemd service is already running
rqliteRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(state.NamespaceName, systemd.ServiceTypeRQLite)
if !rqliteRunning {
// Check if RQLite data directory exists (has existing data)
dataDir := filepath.Join(cm.baseDataDir, state.NamespaceName, "rqlite", cm.localNodeID)
hasExistingData := false
if _, err := os.Stat(filepath.Join(dataDir, "raft")); err == nil {
hasExistingData = true
}
if hasExistingData {
var peers []rqlite.RaftPeer
for _, np := range state.AllNodes {
raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)
peers = append(peers, rqlite.RaftPeer{ID: raftAddr, Address: raftAddr, NonVoter: false})
}
if err := cm.writePeersJSON(dataDir, peers); err != nil {
cm.logger.Error("Failed to write peers.json", zap.String("namespace", state.NamespaceName), zap.Error(err))
}
}
var joinAddrs []string
isLeader := false
if !hasExistingData {
sortedNodeIDs := make([]string, 0, len(state.AllNodes))
for _, np := range state.AllNodes {
sortedNodeIDs = append(sortedNodeIDs, np.NodeID)
}
sort.Strings(sortedNodeIDs)
electedLeaderID := sortedNodeIDs[0]
if cm.localNodeID == electedLeaderID {
isLeader = true
} else {
for _, np := range state.AllNodes {
if np.NodeID == electedLeaderID {
joinAddrs = append(joinAddrs, fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort))
break
}
}
}
}
rqliteCfg := rqlite.InstanceConfig{
Namespace: state.NamespaceName,
NodeID: cm.localNodeID,
HTTPPort: pb.RQLiteHTTPPort,
RaftPort: pb.RQLiteRaftPort,
HTTPAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteHTTPPort),
RaftAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteRaftPort),
JoinAddresses: joinAddrs,
IsLeader: isLeader,
}
if err := cm.spawnRQLiteWithSystemd(ctx, rqliteCfg); err != nil {
cm.logger.Error("Failed to restore RQLite from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored RQLite instance from state", zap.String("namespace", state.NamespaceName))
}
}
// 2. Restore Olric
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", pb.OlricMemberlistPort), 2*time.Second)
if err == nil {
conn.Close()
} else {
var peers []string
for _, np := range state.AllNodes {
if np.NodeID != cm.localNodeID {
peers = append(peers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricMemberlistPort))
}
}
olricCfg := olric.InstanceConfig{
Namespace: state.NamespaceName,
NodeID: cm.localNodeID,
HTTPPort: pb.OlricHTTPPort,
MemberlistPort: pb.OlricMemberlistPort,
BindAddr: localIP,
AdvertiseAddr: localIP,
PeerAddresses: peers,
}
if err := cm.spawnOlricWithSystemd(ctx, olricCfg); err != nil {
cm.logger.Error("Failed to restore Olric from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored Olric instance from state", zap.String("namespace", state.NamespaceName))
}
}
// 3. Restore Gateway
if state.HasGateway {
// Build the desired gateway config up front (incl. WebRTC resolved
// from state→DB) so it drives BOTH the cold-spawn (gateway down)
// and the warm-reconcile (gateway up but config drifted) paths.
var olricServers []string // WireGuard IPs (Olric binds to the WG interface)
for _, np := range state.AllNodes {
olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort))
}
gwCfg := gateway.InstanceConfig{
Namespace: state.NamespaceName,
NodeID: cm.localNodeID,
HTTPPort: pb.GatewayHTTPPort,
BaseDomain: state.BaseDomain,
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
GlobalRQLiteDSN: cm.globalRQLiteDSN,
OlricServers: olricServers,
OlricTimeout: 30 * time.Second,
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
IPFSAPIURL: cm.ipfsAPIURL,
IPFSTimeout: cm.ipfsTimeout,
IPFSReplicationFactor: cm.ipfsReplicationFactor,
SecretsEncryptionKey: cm.secretsEncryptionKey,
}
// Resolve WebRTC config. Prefer the local state file; fall back to
// the DB (source of truth) to self-heal stale state. Bugboard #25 —
// the state file is NOT updated by EnableWebRTC, so a namespace
// enabled AFTER its state file was written carries no SFU/TURN
// fields here. The lazy dbFetch only hits the DB when the state
// file is incomplete.
wr := chooseRestoreWebRTC(
state.HasSFU, state.SFUSignalingPort, state.TURNDomain, state.TURNSharedSecret, state.TURNStealthDomain,
func() (turnSecret, turnDomain, stealthDomain string, sfuPort int, resolved bool) {
// Retry the read on a transient error. A distant/slow node's
// namespace rqlite may not be synced/readable yet at cold-start
// converge time — without the retry the read fails once and the
// gateway is written with TURN disabled (bugboard #130). The
// secret IS in the DB; we just need the read to land once the
// follower catches up (typically a few seconds). A genuine
// decrypt failure (stale key) also errors here and will exhaust
// the retries → unresolved → the caller preserves the running
// config rather than blanking it.
var webrtcCfg *WebRTCConfig
var err error
for attempt := 0; attempt < webrtcResolveRetries; attempt++ {
webrtcCfg, err = cm.GetWebRTCConfig(ctx, state.NamespaceName)
if err == nil {
break // success — webrtcCfg may be nil (genuinely disabled)
}
if attempt < webrtcResolveRetries-1 {
time.Sleep(webrtcResolveRetryDelay)
}
}
if err != nil {
// Persistent error after retries (slow read that never
// landed, or a decrypt failure). Do NOT swallow into
// "disabled" — surface loudly and signal unresolved so the
// caller preserves the running config (bugboard #130).
cm.logger.Error("WebRTC TURN secret unresolvable on this node after retries — refusing to silently disable TURN; preserving existing gateway config. If this is a cluster-secret rotation, regenerate with `orama namespace disable webrtc` then `orama namespace enable webrtc`.",
zap.String("namespace", state.NamespaceName),
zap.String("node_id", cm.localNodeID),
zap.Int("attempts", webrtcResolveRetries),
zap.Error(err))
return "", "", "", 0, false
}
if webrtcCfg == nil {
// Resolved cleanly: the namespace genuinely has no WebRTC.
return "", "", "", 0, true
}
// TURN is namespace-wide; SFU port is per-node and may be
// absent on a gateway-only (non-SFU) node — that's fine,
// the gateway still serves TURN credentials.
sfu := 0
if sfuBlock, serr := cm.webrtcPortAllocator.GetSFUPorts(ctx, state.ClusterID, cm.localNodeID); serr == nil && sfuBlock != nil {
sfu = sfuBlock.SFUSignalingPort
}
return webrtcCfg.TURNSharedSecret,
fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain),
cm.stealthDomainFor(state.NamespaceName, webrtcCfg),
sfu, true
},
)
if wr.enabled {
// WebRTCEnabled is the legacy flag (ignored by the route gate
// now — bugboard #25/#411); set it to SFU presence for
// config-shape consistency with how EnableWebRTC writes nodes.
gwCfg.WebRTCEnabled = wr.sfuPort > 0
gwCfg.SFUPort = wr.sfuPort
gwCfg.TURNDomain = wr.turnDomain
gwCfg.TURNSecret = wr.turnSecret
gwCfg.TURNStealthDomain = wr.stealthDomain
// Cache the resolved secret into THIS node's local state so the
// NEXT cold start reads it from disk (state-first in
// chooseRestoreWebRTC short-circuits before the DB) instead of
// depending on a live, possibly-slow namespace-rqlite read — which
// is exactly what left a distant/slow node's gateway with TURN
// disabled on restart (bugboard #130). Each node self-heals its own
// cache on a successful resolve; nothing is sent cross-node.
if applyResolvedWebRTCToState(state, wr) {
if err := cm.saveLocalState(state); err != nil {
cm.logger.Warn("Failed to cache resolved WebRTC config to local state (cold start may fall back to the DB read next boot)",
zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Cached resolved WebRTC config to local state for cold-start resilience (bugboard #130)",
zap.String("namespace", state.NamespaceName))
}
}
}
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort))
if err == nil {
resp.Body.Close()
switch {
case wr.unresolved:
// Bugboard #130 guard: the WebRTC secret could not be resolved
// (DB/decrypt error, logged above). The gateway is already up
// and may be serving TURN from a valid on-disk secret — do NOT
// reconcile it to the empty/disabled block we'd otherwise
// build, which would kill turn.credentials on this node. Leave
// the running config untouched; the operator regenerates the
// secret.
//
// Note: this also defers ReconcileGateway's #837
// secrets-encryption-key reconcile for this one converge pass.
// That is acceptable — the operator action that fixes the
// unresolved TURN secret (regenerate + restart) re-runs the
// full reconcile, and pre-fix this path would have corrupted
// the WebRTC block anyway.
cm.logger.Error("Gateway up but WebRTC secret unresolved — skipping reconcile to avoid disabling TURN on the running config (bugboard #130)",
zap.String("namespace", state.NamespaceName))
default:
// Gateway is already up. Reconcile config drift (bugboard #25 —
// the WARM case): if the running gateway's on-disk config has a
// WebRTC block that differs from the desired (e.g. it lost the
// block on a prior restart where it stayed healthy and the
// cold-spawn path below never ran), rewrite the config +
// restart. ReconcileGateway is a no-op when the on-disk block
// already matches, so this does NOT cause a restart loop.
if rerr := cm.systemdSpawner.ReconcileGateway(ctx, state.NamespaceName, cm.localNodeID, gwCfg); rerr != nil {
cm.logger.Warn("Gateway WebRTC reconcile failed (leaving running config as-is)",
zap.String("namespace", state.NamespaceName), zap.Error(rerr))
}
}
} else {
// Gateway is down → cold spawn. We must bring a gateway up
// regardless (the namespace needs one); but if the WebRTC secret
// was unresolved we can't write a working TURN block, so warn
// loudly that TURN is degraded on this node until the secret is
// regenerated (bugboard #130).
switch {
case wr.unresolved:
cm.logger.Error("Cold-spawning gateway with TURN UNAVAILABLE — WebRTC secret unresolved on this node; turn.credentials will return namespace_not_configured until it is regenerated (`orama namespace disable webrtc` then `orama namespace enable webrtc`)",
zap.String("namespace", state.NamespaceName))
case wr.enabled && !state.HasSFU:
cm.logger.Info("Re-materialized WebRTC gateway config from DB (state file was stale)",
zap.String("namespace", state.NamespaceName),
zap.Int("sfu_port", wr.sfuPort))
}
if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil {
cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored Gateway instance from state", zap.String("namespace", state.NamespaceName))
}
}
}
// 4. Restore TURN (if enabled)
if state.HasTURN && state.TURNRelayPortStart > 0 {
turnRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(state.NamespaceName, systemd.ServiceTypeTURN)
if !turnRunning {
// TURN config needs the shared secret from DB — we can't persist it to disk state.
// If DB is available, fetch it; otherwise skip TURN restore (it will come back when DB is ready).
webrtcCfg, err := cm.GetWebRTCConfig(ctx, state.NamespaceName)
if err == nil && webrtcCfg != nil {
turnCfg := TURNInstanceConfig{
Namespace: state.NamespaceName,
NodeID: cm.localNodeID,
ListenAddr: fmt.Sprintf("0.0.0.0:%d", state.TURNListenPort),
TURNSListenAddr: fmt.Sprintf("0.0.0.0:%d", state.TURNTLSPort),
PublicIP: "", // Will be resolved by spawner or from node info
Realm: cm.baseDomain,
AuthSecret: webrtcCfg.TURNSharedSecret,
RelayPortStart: state.TURNRelayPortStart,
RelayPortEnd: state.TURNRelayPortEnd,
TURNDomain: fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain),
StealthDomain: cm.stealthDomainFor(state.NamespaceName, webrtcCfg),
}
if err := cm.systemdSpawner.SpawnTURN(ctx, state.NamespaceName, cm.localNodeID, turnCfg); err != nil {
cm.logger.Error("Failed to restore TURN from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored TURN instance from state", zap.String("namespace", state.NamespaceName))
}
} else {
cm.logger.Warn("Skipping TURN restore: WebRTC config not available from DB",
zap.String("namespace", state.NamespaceName))
}
}
}
// 5. Restore SFU (if enabled)
if state.HasSFU && state.SFUSignalingPort > 0 {
sfuRunning, _ := cm.systemdSpawner.systemdMgr.IsServiceActive(state.NamespaceName, systemd.ServiceTypeSFU)
if !sfuRunning {
webrtcCfg, err := cm.GetWebRTCConfig(ctx, state.NamespaceName)
if err == nil && webrtcCfg != nil {
turnDomain := fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain)
sfuCfg := SFUInstanceConfig{
Namespace: state.NamespaceName,
NodeID: cm.localNodeID,
ListenAddr: fmt.Sprintf("%s:%d", localIP, state.SFUSignalingPort),
MediaPortStart: state.SFUMediaPortStart,
MediaPortEnd: state.SFUMediaPortEnd,
TURNServers: []sfu.TURNServerConfig{
{Host: turnDomain, Port: TURNDefaultPort, Secure: false},
{Host: turnDomain, Port: TURNSPort, Secure: true},
},
TURNSecret: webrtcCfg.TURNSharedSecret,
TURNCredTTL: webrtcCfg.TURNCredentialTTL,
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
}
if err := cm.systemdSpawner.SpawnSFU(ctx, state.NamespaceName, cm.localNodeID, sfuCfg); err != nil {
cm.logger.Error("Failed to restore SFU from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
} else {
cm.logger.Info("Restored SFU instance from state", zap.String("namespace", state.NamespaceName))
}
} else {
cm.logger.Warn("Skipping SFU restore: WebRTC config not available from DB",
zap.String("namespace", state.NamespaceName))
}
}
}
return nil
}
// GetClusterStatusByID returns the full status of a cluster by ID.
// This method is part of the ClusterProvisioner interface used by the gateway.
// It returns a generic struct that matches the interface definition in auth/handlers.go.
func (cm *ClusterManager) GetClusterStatusByID(ctx context.Context, clusterID string) (interface{}, error) {
status, err := cm.GetClusterStatus(ctx, clusterID)
if err != nil {
return nil, err
}
// Return as a map to avoid import cycles with the interface type
return map[string]interface{}{
"cluster_id": status.ClusterID,
"namespace": status.Namespace,
"status": string(status.Status),
"nodes": status.Nodes,
"rqlite_ready": status.RQLiteReady,
"olric_ready": status.OlricReady,
"gateway_ready": status.GatewayReady,
"dns_ready": status.DNSReady,
"error": status.Error,
}, nil
}