From bf0d5f9f9fa446d8637ca6be5e8eac22a2eaa6c4 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Sat, 30 May 2026 19:26:26 +0300 Subject: [PATCH] feat(namespace): implement warm reconciliation for gateway webrtc config - Add logic to reconcile gateway configuration drift for running instances - Prevent unnecessary restart loops by verifying on-disk config state - Add unit tests to validate synchronization logic and prevent regressions --- core/pkg/namespace/cluster_manager.go | 126 +++++++++-------- core/pkg/namespace/reconcile_gateway_test.go | 136 +++++++++++++++++++ core/pkg/namespace/systemd_spawner.go | 60 ++++++++ 3 files changed, 263 insertions(+), 59 deletions(-) create mode 100644 core/pkg/namespace/reconcile_gateway_test.go diff --git a/core/pkg/namespace/cluster_manager.go b/core/pkg/namespace/cluster_manager.go index c4e5bc2..1ba6bad 100644 --- a/core/pkg/namespace/cluster_manager.go +++ b/core/pkg/namespace/cluster_manager.go @@ -1983,70 +1983,78 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl // 3. Restore Gateway if state.HasGateway { + // Build the desired gateway config up front (incl. WebRTC resolved + // from state→DB) so it drives BOTH the cold-spawn (gateway down) + // and the warm-reconcile (gateway up but config drifted) paths. + var olricServers []string // WireGuard IPs (Olric binds to the WG interface) + for _, np := range state.AllNodes { + olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) + } + gwCfg := gateway.InstanceConfig{ + Namespace: state.NamespaceName, + NodeID: cm.localNodeID, + HTTPPort: pb.GatewayHTTPPort, + BaseDomain: state.BaseDomain, + RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), + GlobalRQLiteDSN: cm.globalRQLiteDSN, + OlricServers: olricServers, + OlricTimeout: 30 * time.Second, + IPFSClusterAPIURL: cm.ipfsClusterAPIURL, + IPFSAPIURL: cm.ipfsAPIURL, + IPFSTimeout: cm.ipfsTimeout, + IPFSReplicationFactor: cm.ipfsReplicationFactor, + } + + // Resolve WebRTC config. Prefer the local state file; fall back to + // the DB (source of truth) to self-heal stale state. Bugboard #25 — + // the state file is NOT updated by EnableWebRTC, so a namespace + // enabled AFTER its state file was written carries no SFU/TURN + // fields here. The lazy dbFetch only hits the DB when the state + // file is incomplete. + wr := chooseRestoreWebRTC( + state.HasSFU, state.SFUSignalingPort, state.TURNDomain, state.TURNSharedSecret, + func() (bool, int, string, string) { + webrtcCfg, err := cm.GetWebRTCConfig(ctx, state.NamespaceName) + if err != nil || webrtcCfg == nil { + return false, 0, "", "" + } + sfuBlock, err := cm.webrtcPortAllocator.GetSFUPorts(ctx, state.ClusterID, cm.localNodeID) + if err != nil || sfuBlock == nil { + return false, 0, "", "" + } + return true, sfuBlock.SFUSignalingPort, + fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain), + webrtcCfg.TURNSharedSecret + }, + ) + if wr.enabled { + gwCfg.WebRTCEnabled = true + gwCfg.SFUPort = wr.sfuPort + gwCfg.TURNDomain = wr.turnDomain + gwCfg.TURNSecret = wr.turnSecret + } + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort)) if err == nil { resp.Body.Close() + // Gateway is already up. Reconcile config drift (bugboard #25 — + // the WARM case): if the running gateway's on-disk config has a + // WebRTC block that differs from the desired (e.g. it lost the + // block on a prior restart where it stayed healthy and the + // cold-spawn path below never ran), rewrite the config + restart. + // ReconcileGateway is a no-op when the on-disk block already + // matches, so this does NOT cause a restart loop on every boot. + if rerr := cm.systemdSpawner.ReconcileGateway(ctx, state.NamespaceName, cm.localNodeID, gwCfg); rerr != nil { + cm.logger.Warn("Gateway WebRTC reconcile failed (leaving running config as-is)", + zap.String("namespace", state.NamespaceName), zap.Error(rerr)) + } } else { - // Build olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface) - var olricServers []string - for _, np := range state.AllNodes { - olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort)) + // Gateway is down → cold spawn with the resolved config. + if wr.enabled && !state.HasSFU { + cm.logger.Info("Re-materialized WebRTC gateway config from DB (state file was stale)", + zap.String("namespace", state.NamespaceName), + zap.Int("sfu_port", wr.sfuPort)) } - gwCfg := gateway.InstanceConfig{ - Namespace: state.NamespaceName, - NodeID: cm.localNodeID, - HTTPPort: pb.GatewayHTTPPort, - BaseDomain: state.BaseDomain, - RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), - GlobalRQLiteDSN: cm.globalRQLiteDSN, - OlricServers: olricServers, - OlricTimeout: 30 * time.Second, - IPFSClusterAPIURL: cm.ipfsClusterAPIURL, - IPFSAPIURL: cm.ipfsAPIURL, - IPFSTimeout: cm.ipfsTimeout, - IPFSReplicationFactor: cm.ipfsReplicationFactor, - } - - // Resolve WebRTC config for the restored gateway. Prefer the - // local state file; fall back to the DB (source of truth) to - // self-heal stale state. Bugboard #25 — the state file is NOT - // updated by EnableWebRTC, so a namespace enabled AFTER its state - // file was written carries no SFU/TURN fields here. Because this - // from-disk restore runs BEFORE the DB-backed restore and - // succeeds, the gateway config would otherwise be regenerated - // WITHOUT the webrtc block on every restart — SFU/TURN services - // keep running but the gateway has empty turn_secret + sfu_port=0 - // (credentials return configured:false / 404, routes don't - // register). The lazy dbFetch only hits the DB when the state - // file is incomplete. - wr := chooseRestoreWebRTC( - state.HasSFU, state.SFUSignalingPort, state.TURNDomain, state.TURNSharedSecret, - func() (bool, int, string, string) { - webrtcCfg, err := cm.GetWebRTCConfig(ctx, state.NamespaceName) - if err != nil || webrtcCfg == nil { - return false, 0, "", "" - } - sfuBlock, err := cm.webrtcPortAllocator.GetSFUPorts(ctx, state.ClusterID, cm.localNodeID) - if err != nil || sfuBlock == nil { - return false, 0, "", "" - } - return true, sfuBlock.SFUSignalingPort, - fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain), - webrtcCfg.TURNSharedSecret - }, - ) - if wr.enabled { - gwCfg.WebRTCEnabled = true - gwCfg.SFUPort = wr.sfuPort - gwCfg.TURNDomain = wr.turnDomain - gwCfg.TURNSecret = wr.turnSecret - if !state.HasSFU { - cm.logger.Info("Re-materialized WebRTC gateway config from DB (state file was stale)", - zap.String("namespace", state.NamespaceName), - zap.Int("sfu_port", wr.sfuPort)) - } - } - if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil { cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err)) } else { diff --git a/core/pkg/namespace/reconcile_gateway_test.go b/core/pkg/namespace/reconcile_gateway_test.go new file mode 100644 index 0000000..97319e3 --- /dev/null +++ b/core/pkg/namespace/reconcile_gateway_test.go @@ -0,0 +1,136 @@ +package namespace + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/DeBrosOfficial/network/pkg/gateway" + "go.uber.org/zap" + "gopkg.in/yaml.v3" +) + +// Bugboard #25 (warm reconcile) — gatewayWebRTCInSync decides whether a +// running namespace gateway's on-disk WebRTC block already matches the +// desired config. ReconcileGateway restarts the gateway ONLY when this +// returns false, so the function is the guard against both (a) leaving a +// drifted gateway broken and (b) restart-looping a correct one on every +// boot. + +func desiredEnabled() gateway.InstanceConfig { + return gateway.InstanceConfig{ + WebRTCEnabled: true, + SFUPort: 30000, + TURNDomain: "turn.ns-anchat-test.orama-devnet.network", + TURNSecret: "the-secret", + } +} + +func TestGatewayWebRTCInSync_driftedBlockMissing_returnsFalse(t *testing.T) { + // The exact bug-25 warm case: the running config has NO webrtc block + // (enabled=false, port 0, empty secret) but the DB-desired config has + // it enabled. MUST report out-of-sync so ReconcileGateway restarts. + onDisk := gateway.GatewayYAMLWebRTC{} // zero value = no block + if gatewayWebRTCInSync(onDisk, desiredEnabled()) { + t.Fatal("BUG #25 REGRESSION: empty on-disk block vs DB-enabled desired must be out-of-sync (needs restart)") + } +} + +func TestGatewayWebRTCInSync_matchingBlock_returnsTrue(t *testing.T) { + // After a reconcile fixes the config, the on-disk block matches the + // desired. MUST report in-sync so the NEXT boot does NOT restart again + // (no restart loop — this is why we compare the actual on-disk config + // instead of the stale state file). + onDisk := gateway.GatewayYAMLWebRTC{ + Enabled: true, + SFUPort: 30000, + TURNDomain: "turn.ns-anchat-test.orama-devnet.network", + TURNSecret: "the-secret", + } + if !gatewayWebRTCInSync(onDisk, desiredEnabled()) { + t.Error("matching on-disk block must be in-sync (no restart) — else restart loop on every boot") + } +} + +func TestGatewayWebRTCInSync_eachFieldDriftDetected(t *testing.T) { + // Any single drifted field must trigger a restart. Pins that the + // comparison covers all four webrtc fields (a future refactor that + // drops one would silently let that field drift forever). + base := gateway.GatewayYAMLWebRTC{ + Enabled: true, SFUPort: 30000, + TURNDomain: "turn.ns-anchat-test.orama-devnet.network", TURNSecret: "the-secret", + } + mutations := []struct { + name string + mut func(w *gateway.GatewayYAMLWebRTC) + }{ + {"enabled flipped off", func(w *gateway.GatewayYAMLWebRTC) { w.Enabled = false }}, + {"sfu port changed", func(w *gateway.GatewayYAMLWebRTC) { w.SFUPort = 30001 }}, + {"turn domain changed", func(w *gateway.GatewayYAMLWebRTC) { w.TURNDomain = "turn.other" }}, + {"turn secret rotated", func(w *gateway.GatewayYAMLWebRTC) { w.TURNSecret = "rotated" }}, + } + for _, tc := range mutations { + t.Run(tc.name, func(t *testing.T) { + d := base + tc.mut(&d) + if gatewayWebRTCInSync(d, desiredEnabled()) { + t.Errorf("drift in %q not detected — gateway would keep serving stale config", tc.name) + } + }) + } +} + +func TestGatewayWebRTCInSync_bothDisabled_returnsTrue(t *testing.T) { + // A namespace genuinely without WebRTC: on-disk block empty, desired + // disabled. In-sync → no restart. (Avoids churning non-webrtc + // namespaces on every boot.) + if !gatewayWebRTCInSync(gateway.GatewayYAMLWebRTC{}, gateway.InstanceConfig{}) { + t.Error("disabled on-disk + disabled desired must be in-sync (no restart)") + } +} + +// ReconcileGateway I/O paths that DON'T restart (the restart path needs +// real systemd, so it's covered by the pure helper above). These pin +// that a matching config is a clean no-op and that an unreadable config +// surfaces an error instead of blind-restarting. + +func writeGatewayConfig(t *testing.T, base, ns, nodeID string, wr gateway.GatewayYAMLWebRTC) { + t.Helper() + dir := filepath.Join(base, ns, "configs") + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal(err) + } + b, _ := yaml.Marshal(gateway.GatewayYAMLConfig{ClientNamespace: ns, WebRTC: wr}) + if err := os.WriteFile(filepath.Join(dir, "gateway-"+nodeID+".yaml"), b, 0644); err != nil { + t.Fatal(err) + } +} + +func TestReconcileGateway_inSyncIsNoOpNoError(t *testing.T) { + base := t.TempDir() + ns, node := "anchat-test", "node-1" + writeGatewayConfig(t, base, ns, node, gateway.GatewayYAMLWebRTC{ + Enabled: true, SFUPort: 30000, + TURNDomain: "turn.ns-anchat-test.orama-devnet.network", TURNSecret: "the-secret", + }) + s := NewSystemdSpawner(base, "", zap.NewNop()) + + // Desired == on-disk → must return nil WITHOUT attempting a restart + // (RestartGateway would error here since there's no real systemd, so + // a nil return proves we never reached it). + err := s.ReconcileGateway(context.Background(), ns, node, desiredEnabled()) + if err != nil { + t.Errorf("in-sync config must be a clean no-op; got %v (did it try to restart?)", err) + } +} + +func TestReconcileGateway_missingConfigReturnsErrorNotRestart(t *testing.T) { + // No config file on disk → return an error so the caller leaves the + // running gateway alone, rather than blind-restarting a healthy one. + s := NewSystemdSpawner(t.TempDir(), "", zap.NewNop()) + err := s.ReconcileGateway(context.Background(), "anchat-test", "node-1", desiredEnabled()) + if err == nil { + t.Error("missing config must return an error (don't blind-restart a healthy gateway)") + } +} diff --git a/core/pkg/namespace/systemd_spawner.go b/core/pkg/namespace/systemd_spawner.go index c25b0ad..9fcd92c 100644 --- a/core/pkg/namespace/systemd_spawner.go +++ b/core/pkg/namespace/systemd_spawner.go @@ -321,6 +321,66 @@ func (s *SystemdSpawner) RestartGateway(ctx context.Context, namespace, nodeID s return s.SpawnGateway(ctx, namespace, nodeID, cfg) } +// gatewayWebRTCInSync reports whether the WebRTC block already on disk +// matches the desired gateway config — i.e. no restart is needed. +// Compares only the WebRTC-relevant fields (bugboard #25 drift surface). +// Pure function so the reconcile decision is unit-testable without files +// or systemd. +func gatewayWebRTCInSync(onDisk gateway.GatewayYAMLWebRTC, cfg gateway.InstanceConfig) bool { + return onDisk.Enabled == cfg.WebRTCEnabled && + onDisk.SFUPort == cfg.SFUPort && + onDisk.TURNSecret == cfg.TURNSecret && + onDisk.TURNDomain == cfg.TURNDomain +} + +// ReconcileGateway is the WARM counterpart to SpawnGateway: when a +// namespace gateway is already running, this compares its on-disk config +// against the desired `cfg` and restarts it ONLY if the WebRTC block has +// drifted (enabled / sfu_port / turn_secret / turn_domain differ). +// +// Bugboard #25: the from-disk restore skips healthy gateways, so a +// gateway that lost its webrtc block on a prior restart (while staying +// healthy) never gets its config regenerated — leaving SFU/TURN services +// running but the gateway with no turn_secret/sfu_port (credentials +// configured:false, /v1/webrtc/turn/credentials 404). The cold-spawn +// self-heal only fires when the gateway happens to be down during +// restore. This closes that gap for the healthy case. +// +// Idempotent: returns nil WITHOUT restarting when the on-disk WebRTC +// block already matches the desired config — so it does not cause a +// restart loop on every node boot. WebRTC is the only known config-drift +// surface (bugboard #25); other fields are intentionally not compared to +// avoid spurious restarts from harmless differences (e.g. olric server +// ordering). +func (s *SystemdSpawner) ReconcileGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error { + configPath := filepath.Join(s.namespaceBase, namespace, "configs", fmt.Sprintf("gateway-%s.yaml", nodeID)) + existing, err := os.ReadFile(configPath) + if err != nil { + // No readable config to compare against — don't blindly restart a + // healthy gateway; absence of the config file is a different + // problem the caller's cold-spawn path handles. + return fmt.Errorf("read gateway config for reconcile: %w", err) + } + var onDisk gateway.GatewayYAMLConfig + if err := yaml.Unmarshal(existing, &onDisk); err != nil { + return fmt.Errorf("parse gateway config for reconcile: %w", err) + } + + if gatewayWebRTCInSync(onDisk.WebRTC, cfg) { + // Already in sync — nothing to do, no restart. + return nil + } + + s.logger.Info("Gateway WebRTC config drifted from desired; reconciling (rewrite + restart)", + zap.String("namespace", namespace), + zap.String("node_id", nodeID), + zap.Bool("ondisk_enabled", onDisk.WebRTC.Enabled), + zap.Int("ondisk_sfu_port", onDisk.WebRTC.SFUPort), + zap.Bool("desired_enabled", cfg.WebRTCEnabled), + zap.Int("desired_sfu_port", cfg.SFUPort)) + return s.RestartGateway(ctx, namespace, nodeID, cfg) +} + // SFUInstanceConfig holds configuration for spawning an SFU instance type SFUInstanceConfig struct { Namespace string