mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-06-16 21:54:14 +00:00
feat(namespace): implement warm reconciliation for gateway webrtc config
- Add logic to reconcile gateway configuration drift for running instances - Prevent unnecessary restart loops by verifying on-disk config state - Add unit tests to validate synchronization logic and prevent regressions
This commit is contained in:
parent
3987ad0cf3
commit
bf0d5f9f9f
@ -1983,70 +1983,78 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
|
||||
|
||||
// 3. Restore Gateway
|
||||
if state.HasGateway {
|
||||
// Build the desired gateway config up front (incl. WebRTC resolved
|
||||
// from state→DB) so it drives BOTH the cold-spawn (gateway down)
|
||||
// and the warm-reconcile (gateway up but config drifted) paths.
|
||||
var olricServers []string // WireGuard IPs (Olric binds to the WG interface)
|
||||
for _, np := range state.AllNodes {
|
||||
olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort))
|
||||
}
|
||||
gwCfg := gateway.InstanceConfig{
|
||||
Namespace: state.NamespaceName,
|
||||
NodeID: cm.localNodeID,
|
||||
HTTPPort: pb.GatewayHTTPPort,
|
||||
BaseDomain: state.BaseDomain,
|
||||
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
|
||||
GlobalRQLiteDSN: cm.globalRQLiteDSN,
|
||||
OlricServers: olricServers,
|
||||
OlricTimeout: 30 * time.Second,
|
||||
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
|
||||
IPFSAPIURL: cm.ipfsAPIURL,
|
||||
IPFSTimeout: cm.ipfsTimeout,
|
||||
IPFSReplicationFactor: cm.ipfsReplicationFactor,
|
||||
}
|
||||
|
||||
// Resolve WebRTC config. Prefer the local state file; fall back to
|
||||
// the DB (source of truth) to self-heal stale state. Bugboard #25 —
|
||||
// the state file is NOT updated by EnableWebRTC, so a namespace
|
||||
// enabled AFTER its state file was written carries no SFU/TURN
|
||||
// fields here. The lazy dbFetch only hits the DB when the state
|
||||
// file is incomplete.
|
||||
wr := chooseRestoreWebRTC(
|
||||
state.HasSFU, state.SFUSignalingPort, state.TURNDomain, state.TURNSharedSecret,
|
||||
func() (bool, int, string, string) {
|
||||
webrtcCfg, err := cm.GetWebRTCConfig(ctx, state.NamespaceName)
|
||||
if err != nil || webrtcCfg == nil {
|
||||
return false, 0, "", ""
|
||||
}
|
||||
sfuBlock, err := cm.webrtcPortAllocator.GetSFUPorts(ctx, state.ClusterID, cm.localNodeID)
|
||||
if err != nil || sfuBlock == nil {
|
||||
return false, 0, "", ""
|
||||
}
|
||||
return true, sfuBlock.SFUSignalingPort,
|
||||
fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain),
|
||||
webrtcCfg.TURNSharedSecret
|
||||
},
|
||||
)
|
||||
if wr.enabled {
|
||||
gwCfg.WebRTCEnabled = true
|
||||
gwCfg.SFUPort = wr.sfuPort
|
||||
gwCfg.TURNDomain = wr.turnDomain
|
||||
gwCfg.TURNSecret = wr.turnSecret
|
||||
}
|
||||
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort))
|
||||
if err == nil {
|
||||
resp.Body.Close()
|
||||
// Gateway is already up. Reconcile config drift (bugboard #25 —
|
||||
// the WARM case): if the running gateway's on-disk config has a
|
||||
// WebRTC block that differs from the desired (e.g. it lost the
|
||||
// block on a prior restart where it stayed healthy and the
|
||||
// cold-spawn path below never ran), rewrite the config + restart.
|
||||
// ReconcileGateway is a no-op when the on-disk block already
|
||||
// matches, so this does NOT cause a restart loop on every boot.
|
||||
if rerr := cm.systemdSpawner.ReconcileGateway(ctx, state.NamespaceName, cm.localNodeID, gwCfg); rerr != nil {
|
||||
cm.logger.Warn("Gateway WebRTC reconcile failed (leaving running config as-is)",
|
||||
zap.String("namespace", state.NamespaceName), zap.Error(rerr))
|
||||
}
|
||||
} else {
|
||||
// Build olric server addresses — always use WireGuard IPs (Olric binds to WireGuard interface)
|
||||
var olricServers []string
|
||||
for _, np := range state.AllNodes {
|
||||
olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort))
|
||||
// Gateway is down → cold spawn with the resolved config.
|
||||
if wr.enabled && !state.HasSFU {
|
||||
cm.logger.Info("Re-materialized WebRTC gateway config from DB (state file was stale)",
|
||||
zap.String("namespace", state.NamespaceName),
|
||||
zap.Int("sfu_port", wr.sfuPort))
|
||||
}
|
||||
gwCfg := gateway.InstanceConfig{
|
||||
Namespace: state.NamespaceName,
|
||||
NodeID: cm.localNodeID,
|
||||
HTTPPort: pb.GatewayHTTPPort,
|
||||
BaseDomain: state.BaseDomain,
|
||||
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
|
||||
GlobalRQLiteDSN: cm.globalRQLiteDSN,
|
||||
OlricServers: olricServers,
|
||||
OlricTimeout: 30 * time.Second,
|
||||
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
|
||||
IPFSAPIURL: cm.ipfsAPIURL,
|
||||
IPFSTimeout: cm.ipfsTimeout,
|
||||
IPFSReplicationFactor: cm.ipfsReplicationFactor,
|
||||
}
|
||||
|
||||
// Resolve WebRTC config for the restored gateway. Prefer the
|
||||
// local state file; fall back to the DB (source of truth) to
|
||||
// self-heal stale state. Bugboard #25 — the state file is NOT
|
||||
// updated by EnableWebRTC, so a namespace enabled AFTER its state
|
||||
// file was written carries no SFU/TURN fields here. Because this
|
||||
// from-disk restore runs BEFORE the DB-backed restore and
|
||||
// succeeds, the gateway config would otherwise be regenerated
|
||||
// WITHOUT the webrtc block on every restart — SFU/TURN services
|
||||
// keep running but the gateway has empty turn_secret + sfu_port=0
|
||||
// (credentials return configured:false / 404, routes don't
|
||||
// register). The lazy dbFetch only hits the DB when the state
|
||||
// file is incomplete.
|
||||
wr := chooseRestoreWebRTC(
|
||||
state.HasSFU, state.SFUSignalingPort, state.TURNDomain, state.TURNSharedSecret,
|
||||
func() (bool, int, string, string) {
|
||||
webrtcCfg, err := cm.GetWebRTCConfig(ctx, state.NamespaceName)
|
||||
if err != nil || webrtcCfg == nil {
|
||||
return false, 0, "", ""
|
||||
}
|
||||
sfuBlock, err := cm.webrtcPortAllocator.GetSFUPorts(ctx, state.ClusterID, cm.localNodeID)
|
||||
if err != nil || sfuBlock == nil {
|
||||
return false, 0, "", ""
|
||||
}
|
||||
return true, sfuBlock.SFUSignalingPort,
|
||||
fmt.Sprintf("turn.ns-%s.%s", state.NamespaceName, cm.baseDomain),
|
||||
webrtcCfg.TURNSharedSecret
|
||||
},
|
||||
)
|
||||
if wr.enabled {
|
||||
gwCfg.WebRTCEnabled = true
|
||||
gwCfg.SFUPort = wr.sfuPort
|
||||
gwCfg.TURNDomain = wr.turnDomain
|
||||
gwCfg.TURNSecret = wr.turnSecret
|
||||
if !state.HasSFU {
|
||||
cm.logger.Info("Re-materialized WebRTC gateway config from DB (state file was stale)",
|
||||
zap.String("namespace", state.NamespaceName),
|
||||
zap.Int("sfu_port", wr.sfuPort))
|
||||
}
|
||||
}
|
||||
|
||||
if err := cm.spawnGatewayWithSystemd(ctx, gwCfg); err != nil {
|
||||
cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
|
||||
} else {
|
||||
|
||||
136
core/pkg/namespace/reconcile_gateway_test.go
Normal file
136
core/pkg/namespace/reconcile_gateway_test.go
Normal file
@ -0,0 +1,136 @@
|
||||
package namespace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/gateway"
|
||||
"go.uber.org/zap"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Bugboard #25 (warm reconcile) — gatewayWebRTCInSync decides whether a
|
||||
// running namespace gateway's on-disk WebRTC block already matches the
|
||||
// desired config. ReconcileGateway restarts the gateway ONLY when this
|
||||
// returns false, so the function is the guard against both (a) leaving a
|
||||
// drifted gateway broken and (b) restart-looping a correct one on every
|
||||
// boot.
|
||||
|
||||
func desiredEnabled() gateway.InstanceConfig {
|
||||
return gateway.InstanceConfig{
|
||||
WebRTCEnabled: true,
|
||||
SFUPort: 30000,
|
||||
TURNDomain: "turn.ns-anchat-test.orama-devnet.network",
|
||||
TURNSecret: "the-secret",
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayWebRTCInSync_driftedBlockMissing_returnsFalse(t *testing.T) {
|
||||
// The exact bug-25 warm case: the running config has NO webrtc block
|
||||
// (enabled=false, port 0, empty secret) but the DB-desired config has
|
||||
// it enabled. MUST report out-of-sync so ReconcileGateway restarts.
|
||||
onDisk := gateway.GatewayYAMLWebRTC{} // zero value = no block
|
||||
if gatewayWebRTCInSync(onDisk, desiredEnabled()) {
|
||||
t.Fatal("BUG #25 REGRESSION: empty on-disk block vs DB-enabled desired must be out-of-sync (needs restart)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayWebRTCInSync_matchingBlock_returnsTrue(t *testing.T) {
|
||||
// After a reconcile fixes the config, the on-disk block matches the
|
||||
// desired. MUST report in-sync so the NEXT boot does NOT restart again
|
||||
// (no restart loop — this is why we compare the actual on-disk config
|
||||
// instead of the stale state file).
|
||||
onDisk := gateway.GatewayYAMLWebRTC{
|
||||
Enabled: true,
|
||||
SFUPort: 30000,
|
||||
TURNDomain: "turn.ns-anchat-test.orama-devnet.network",
|
||||
TURNSecret: "the-secret",
|
||||
}
|
||||
if !gatewayWebRTCInSync(onDisk, desiredEnabled()) {
|
||||
t.Error("matching on-disk block must be in-sync (no restart) — else restart loop on every boot")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayWebRTCInSync_eachFieldDriftDetected(t *testing.T) {
|
||||
// Any single drifted field must trigger a restart. Pins that the
|
||||
// comparison covers all four webrtc fields (a future refactor that
|
||||
// drops one would silently let that field drift forever).
|
||||
base := gateway.GatewayYAMLWebRTC{
|
||||
Enabled: true, SFUPort: 30000,
|
||||
TURNDomain: "turn.ns-anchat-test.orama-devnet.network", TURNSecret: "the-secret",
|
||||
}
|
||||
mutations := []struct {
|
||||
name string
|
||||
mut func(w *gateway.GatewayYAMLWebRTC)
|
||||
}{
|
||||
{"enabled flipped off", func(w *gateway.GatewayYAMLWebRTC) { w.Enabled = false }},
|
||||
{"sfu port changed", func(w *gateway.GatewayYAMLWebRTC) { w.SFUPort = 30001 }},
|
||||
{"turn domain changed", func(w *gateway.GatewayYAMLWebRTC) { w.TURNDomain = "turn.other" }},
|
||||
{"turn secret rotated", func(w *gateway.GatewayYAMLWebRTC) { w.TURNSecret = "rotated" }},
|
||||
}
|
||||
for _, tc := range mutations {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
d := base
|
||||
tc.mut(&d)
|
||||
if gatewayWebRTCInSync(d, desiredEnabled()) {
|
||||
t.Errorf("drift in %q not detected — gateway would keep serving stale config", tc.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayWebRTCInSync_bothDisabled_returnsTrue(t *testing.T) {
|
||||
// A namespace genuinely without WebRTC: on-disk block empty, desired
|
||||
// disabled. In-sync → no restart. (Avoids churning non-webrtc
|
||||
// namespaces on every boot.)
|
||||
if !gatewayWebRTCInSync(gateway.GatewayYAMLWebRTC{}, gateway.InstanceConfig{}) {
|
||||
t.Error("disabled on-disk + disabled desired must be in-sync (no restart)")
|
||||
}
|
||||
}
|
||||
|
||||
// ReconcileGateway I/O paths that DON'T restart (the restart path needs
|
||||
// real systemd, so it's covered by the pure helper above). These pin
|
||||
// that a matching config is a clean no-op and that an unreadable config
|
||||
// surfaces an error instead of blind-restarting.
|
||||
|
||||
func writeGatewayConfig(t *testing.T, base, ns, nodeID string, wr gateway.GatewayYAMLWebRTC) {
|
||||
t.Helper()
|
||||
dir := filepath.Join(base, ns, "configs")
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
b, _ := yaml.Marshal(gateway.GatewayYAMLConfig{ClientNamespace: ns, WebRTC: wr})
|
||||
if err := os.WriteFile(filepath.Join(dir, "gateway-"+nodeID+".yaml"), b, 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReconcileGateway_inSyncIsNoOpNoError(t *testing.T) {
|
||||
base := t.TempDir()
|
||||
ns, node := "anchat-test", "node-1"
|
||||
writeGatewayConfig(t, base, ns, node, gateway.GatewayYAMLWebRTC{
|
||||
Enabled: true, SFUPort: 30000,
|
||||
TURNDomain: "turn.ns-anchat-test.orama-devnet.network", TURNSecret: "the-secret",
|
||||
})
|
||||
s := NewSystemdSpawner(base, "", zap.NewNop())
|
||||
|
||||
// Desired == on-disk → must return nil WITHOUT attempting a restart
|
||||
// (RestartGateway would error here since there's no real systemd, so
|
||||
// a nil return proves we never reached it).
|
||||
err := s.ReconcileGateway(context.Background(), ns, node, desiredEnabled())
|
||||
if err != nil {
|
||||
t.Errorf("in-sync config must be a clean no-op; got %v (did it try to restart?)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReconcileGateway_missingConfigReturnsErrorNotRestart(t *testing.T) {
|
||||
// No config file on disk → return an error so the caller leaves the
|
||||
// running gateway alone, rather than blind-restarting a healthy one.
|
||||
s := NewSystemdSpawner(t.TempDir(), "", zap.NewNop())
|
||||
err := s.ReconcileGateway(context.Background(), "anchat-test", "node-1", desiredEnabled())
|
||||
if err == nil {
|
||||
t.Error("missing config must return an error (don't blind-restart a healthy gateway)")
|
||||
}
|
||||
}
|
||||
@ -321,6 +321,66 @@ func (s *SystemdSpawner) RestartGateway(ctx context.Context, namespace, nodeID s
|
||||
return s.SpawnGateway(ctx, namespace, nodeID, cfg)
|
||||
}
|
||||
|
||||
// gatewayWebRTCInSync reports whether the WebRTC block already on disk
|
||||
// matches the desired gateway config — i.e. no restart is needed.
|
||||
// Compares only the WebRTC-relevant fields (bugboard #25 drift surface).
|
||||
// Pure function so the reconcile decision is unit-testable without files
|
||||
// or systemd.
|
||||
func gatewayWebRTCInSync(onDisk gateway.GatewayYAMLWebRTC, cfg gateway.InstanceConfig) bool {
|
||||
return onDisk.Enabled == cfg.WebRTCEnabled &&
|
||||
onDisk.SFUPort == cfg.SFUPort &&
|
||||
onDisk.TURNSecret == cfg.TURNSecret &&
|
||||
onDisk.TURNDomain == cfg.TURNDomain
|
||||
}
|
||||
|
||||
// ReconcileGateway is the WARM counterpart to SpawnGateway: when a
|
||||
// namespace gateway is already running, this compares its on-disk config
|
||||
// against the desired `cfg` and restarts it ONLY if the WebRTC block has
|
||||
// drifted (enabled / sfu_port / turn_secret / turn_domain differ).
|
||||
//
|
||||
// Bugboard #25: the from-disk restore skips healthy gateways, so a
|
||||
// gateway that lost its webrtc block on a prior restart (while staying
|
||||
// healthy) never gets its config regenerated — leaving SFU/TURN services
|
||||
// running but the gateway with no turn_secret/sfu_port (credentials
|
||||
// configured:false, /v1/webrtc/turn/credentials 404). The cold-spawn
|
||||
// self-heal only fires when the gateway happens to be down during
|
||||
// restore. This closes that gap for the healthy case.
|
||||
//
|
||||
// Idempotent: returns nil WITHOUT restarting when the on-disk WebRTC
|
||||
// block already matches the desired config — so it does not cause a
|
||||
// restart loop on every node boot. WebRTC is the only known config-drift
|
||||
// surface (bugboard #25); other fields are intentionally not compared to
|
||||
// avoid spurious restarts from harmless differences (e.g. olric server
|
||||
// ordering).
|
||||
func (s *SystemdSpawner) ReconcileGateway(ctx context.Context, namespace, nodeID string, cfg gateway.InstanceConfig) error {
|
||||
configPath := filepath.Join(s.namespaceBase, namespace, "configs", fmt.Sprintf("gateway-%s.yaml", nodeID))
|
||||
existing, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
// No readable config to compare against — don't blindly restart a
|
||||
// healthy gateway; absence of the config file is a different
|
||||
// problem the caller's cold-spawn path handles.
|
||||
return fmt.Errorf("read gateway config for reconcile: %w", err)
|
||||
}
|
||||
var onDisk gateway.GatewayYAMLConfig
|
||||
if err := yaml.Unmarshal(existing, &onDisk); err != nil {
|
||||
return fmt.Errorf("parse gateway config for reconcile: %w", err)
|
||||
}
|
||||
|
||||
if gatewayWebRTCInSync(onDisk.WebRTC, cfg) {
|
||||
// Already in sync — nothing to do, no restart.
|
||||
return nil
|
||||
}
|
||||
|
||||
s.logger.Info("Gateway WebRTC config drifted from desired; reconciling (rewrite + restart)",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("node_id", nodeID),
|
||||
zap.Bool("ondisk_enabled", onDisk.WebRTC.Enabled),
|
||||
zap.Int("ondisk_sfu_port", onDisk.WebRTC.SFUPort),
|
||||
zap.Bool("desired_enabled", cfg.WebRTCEnabled),
|
||||
zap.Int("desired_sfu_port", cfg.SFUPort))
|
||||
return s.RestartGateway(ctx, namespace, nodeID, cfg)
|
||||
}
|
||||
|
||||
// SFUInstanceConfig holds configuration for spawning an SFU instance
|
||||
type SFUInstanceConfig struct {
|
||||
Namespace string
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user