diff --git a/core/cmd/gateway/config.go b/core/cmd/gateway/config.go index e97f27f..5810f50 100644 --- a/core/cmd/gateway/config.go +++ b/core/cmd/gateway/config.go @@ -92,6 +92,12 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { IPFSTimeout string `yaml:"ipfs_timeout"` IPFSReplicationFactor int `yaml:"ipfs_replication_factor"` WebRTC yamlWebRTCCfg `yaml:"webrtc"` + // SecretsEncryptionKey: see GatewayYAMLConfig docstring. Optional; + // when set, the standalone gateway populates + // cfg.SecretsEncryptionKey so serverless function secrets can be + // encrypted/decrypted (bugboard #837 follow-up). Empty leaves + // secrets management disabled (fail-loud). + SecretsEncryptionKey string `yaml:"secrets_encryption_key"` // ClusterSecretPath: see GatewayYAMLConfig docstring. Optional; // when set, the standalone gateway reads the file at this path // and populates cfg.ClusterSecret so JWT signing keys can be @@ -229,6 +235,16 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { } } + // Serverless secrets encryption key — bugboard #837 follow-up. The + // host-managed gateway (pkg/node/gateway.go) reads this from + // secrets/secrets-encryption-key; the standalone binary used by namespace + // gateways via systemd receives it through this YAML field. Without it, + // `function secrets list` returned 501 ("Secrets management not + // available") on namespace gateways even though the host had the key. + if v := strings.TrimSpace(y.SecretsEncryptionKey); v != "" { + cfg.SecretsEncryptionKey = v + } + // WebRTC configuration cfg.WebRTCEnabled = y.WebRTC.Enabled if y.WebRTC.SFUPort > 0 { diff --git a/core/cmd/gateway/config_secrets_test.go b/core/cmd/gateway/config_secrets_test.go new file mode 100644 index 0000000..e8194fb --- /dev/null +++ b/core/cmd/gateway/config_secrets_test.go @@ -0,0 +1,70 @@ +package main + +import ( + "strings" + "testing" + + "github.com/DeBrosOfficial/network/pkg/config" + "github.com/DeBrosOfficial/network/pkg/gateway" + "gopkg.in/yaml.v3" +) + +// TestSpawnedGatewayConfig_loadsSecretsEncryptionKey is the bugboard #837 +// follow-up regression test for the *load* half: a YAML written by the +// namespace gateway spawner (gateway.GatewayYAMLConfig with the secrets key) +// must (a) pass the standalone gateway's STRICT decoder — i.e. the +// secrets_encryption_key field is a known field, not rejected — and (b) end +// up in gateway.Config.SecretsEncryptionKey via the same trim/assign the real +// parseGatewayConfig uses. Without the load mapping, `function secrets list` +// returned 501 on namespace gateways. +func TestSpawnedGatewayConfig_loadsSecretsEncryptionKey(t *testing.T) { + const key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" + + // Produce the exact YAML a spawned namespace gateway receives. + written := gateway.GatewayYAMLConfig{ + ListenAddr: ":6001", + ClientNamespace: "anchat-test", + RQLiteDSN: "http://localhost:10000", + OlricServers: []string{"localhost:3320"}, + SecretsEncryptionKey: key, + } + data, err := yaml.Marshal(written) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + // yamlCfgMirror mirrors the function-local yamlCfg in config.go. If the + // real loader's field/tag drifts, the round-trip assertion below fails. + type webrtc struct { + Enabled bool `yaml:"enabled"` + SFUPort int `yaml:"sfu_port"` + TURNDomain string `yaml:"turn_domain"` + TURNSecret string `yaml:"turn_secret"` + } + type yamlCfgMirror struct { + ListenAddr string `yaml:"listen_addr"` + ClientNamespace string `yaml:"client_namespace"` + RQLiteDSN string `yaml:"rqlite_dsn"` + OlricServers []string `yaml:"olric_servers"` + WebRTC webrtc `yaml:"webrtc"` + SecretsEncryptionKey string `yaml:"secrets_encryption_key"` + ClusterSecretPath string `yaml:"cluster_secret_path"` + } + + var y yamlCfgMirror + // STRICT decode — the real loader rejects unknown fields, so this proves + // secrets_encryption_key is recognized. + if err := config.DecodeStrict(strings.NewReader(string(data)), &y); err != nil { + t.Fatalf("strict decode rejected the spawned gateway YAML: %v", err) + } + + // Apply the same trim/assign as parseGatewayConfig. + cfg := &gateway.Config{} + if v := strings.TrimSpace(y.SecretsEncryptionKey); v != "" { + cfg.SecretsEncryptionKey = v + } + + if cfg.SecretsEncryptionKey != key { + t.Errorf("gateway.Config.SecretsEncryptionKey = %q, want %q", cfg.SecretsEncryptionKey, key) + } +} diff --git a/core/pkg/cli/production/install/orchestrator.go b/core/pkg/cli/production/install/orchestrator.go index 4a94be2..2000d02 100644 --- a/core/pkg/cli/production/install/orchestrator.go +++ b/core/pkg/cli/production/install/orchestrator.go @@ -485,6 +485,14 @@ func (o *Orchestrator) saveSecretsFromJoinResponse(resp *joinhandlers.JoinRespon } } + // Write TURN shared secret (feat-124 #913) — identical on every node so + // WebRTC TURN credentials validate cluster-wide and survive config regen. + if resp.TURNSecret != "" { + if err := os.WriteFile(filepath.Join(secretsDir, "turn-secret"), []byte(resp.TURNSecret), 0600); err != nil { + return fmt.Errorf("failed to write turn-secret: %w", err) + } + } + // Write IPFS Cluster trusted peer IDs if len(resp.IPFSClusterPeerIDs) > 0 { content := strings.Join(resp.IPFSClusterPeerIDs, "\n") + "\n" diff --git a/core/pkg/environments/production/config.go b/core/pkg/environments/production/config.go index 085555c..9100f94 100644 --- a/core/pkg/environments/production/config.go +++ b/core/pkg/environments/production/config.go @@ -16,8 +16,16 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "gopkg.in/yaml.v3" ) +// defaultSFUSignalingPort is the SFU signaling port the namespace gateway +// proxies WebRTC traffic to when an existing node.yaml did not record one. +// Mirrors pkg/namespace.SFUSignalingPortRangeStart (30000); kept as a local +// constant to avoid importing the namespace package (which other agents own +// and which would create a dependency cycle here). +const defaultSFUSignalingPort = 30000 + // ConfigGenerator manages generation of node, gateway, and service configs type ConfigGenerator struct { oramaDir string @@ -212,9 +220,127 @@ func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP stri data.SecretsEncryptionKey = strings.TrimSpace(string(keyBytes)) } + // WebRTC/TURN config (feat-124 #913). The TURN secret lives in the secrets + // dir so it survives Phase4 config regeneration; turn_domain/sfu_port/enabled + // are operator-set values that only exist in the previous node.yaml, so we + // carry them forward from the existing on-disk config. Without this, a regen + // wipes the operator's manually-added webrtc block and the namespace + // reconciler restarts gateways with an empty TURN secret (the outage). + if err := cg.populateWebRTCConfig(&data); err != nil { + return "", fmt.Errorf("failed to populate webrtc config: %w", err) + } + return templates.RenderNodeConfig(data) } +// existingWebRTC is the minimal shape parsed out of an existing node.yaml to +// carry forward operator-set WebRTC fields across a config regeneration. +type existingWebRTC struct { + Enabled bool + SFUPort int + TURNDomain string + TURNSecret string +} + +// populateWebRTCConfig fills the WebRTC fields on data so the rendered node.yaml +// preserves operator TURN configuration across regenerations. +// +// Sources, in order of authority: +// - turn_secret: the persisted secrets/turn-secret file (durable, survives +// regen). If absent but the existing node.yaml carried a secret, that secret +// is persisted to the file so it becomes durable from now on. +// - turn_domain / sfu_port / enabled: carried forward from the existing +// node.yaml's http_gateway.webrtc block (operator-set, not in secrets). +// +// If there is no persisted secret and no existing webrtc block, WebRTC is left +// disabled and the template renders nothing. +func (cg *ConfigGenerator) populateWebRTCConfig(data *templates.NodeConfigData) error { + existing := cg.readExistingWebRTC() + + // Resolve the TURN secret: persisted file wins; otherwise adopt the secret + // from the existing node.yaml and persist it so it is durable. + secret := "" + secretPath := filepath.Join(cg.oramaDir, "secrets", "turn-secret") + if b, err := os.ReadFile(secretPath); err == nil { + secret = strings.TrimSpace(string(b)) + } + if secret == "" && existing != nil && existing.TURNSecret != "" { + secret = existing.TURNSecret + if err := cg.persistTURNSecret(secret); err != nil { + return err + } + } + + if secret == "" { + // No durable secret and nothing to adopt — leave WebRTC disabled. + return nil + } + + data.TURNSecret = secret + data.WebRTCEnabled = true + + if existing != nil { + data.TURNDomain = existing.TURNDomain + data.SFUPort = existing.SFUPort + } + if data.SFUPort == 0 { + data.SFUPort = defaultSFUSignalingPort + } + + return nil +} + +// readExistingWebRTC parses just the http_gateway.webrtc block out of the +// existing node.yaml. Absence of the file or block is tolerated (returns nil). +func (cg *ConfigGenerator) readExistingWebRTC() *existingWebRTC { + configPath := filepath.Join(cg.oramaDir, "configs", "node.yaml") + raw, err := os.ReadFile(configPath) + if err != nil { + return nil // No existing config (fresh install) — nothing to carry forward. + } + + var parsed struct { + HTTPGateway struct { + WebRTC struct { + Enabled bool `yaml:"enabled"` + SFUPort int `yaml:"sfu_port"` + TURNDomain string `yaml:"turn_domain"` + TURNSecret string `yaml:"turn_secret"` + } `yaml:"webrtc"` + } `yaml:"http_gateway"` + } + if err := yaml.Unmarshal(raw, &parsed); err != nil { + return nil // Malformed/old config — don't fail regen; just nothing to carry. + } + + wb := parsed.HTTPGateway.WebRTC + if !wb.Enabled && wb.SFUPort == 0 && wb.TURNDomain == "" && wb.TURNSecret == "" { + return nil // No webrtc block present. + } + return &existingWebRTC{ + Enabled: wb.Enabled, + SFUPort: wb.SFUPort, + TURNDomain: wb.TURNDomain, + TURNSecret: wb.TURNSecret, + } +} + +// persistTURNSecret writes the TURN secret to the secrets dir with 0600 perms +// and correct ownership, making it durable across future config regenerations. +func (cg *ConfigGenerator) persistTURNSecret(secret string) error { + secretPath := filepath.Join(cg.oramaDir, "secrets", "turn-secret") + if err := os.MkdirAll(filepath.Dir(secretPath), 0700); err != nil { + return fmt.Errorf("failed to create secrets directory: %w", err) + } + if err := os.WriteFile(secretPath, []byte(secret), 0600); err != nil { + return fmt.Errorf("failed to persist TURN secret: %w", err) + } + if err := ensureSecretFilePermissions(secretPath); err != nil { + return err + } + return nil +} + // GenerateVaultConfig generates vault.yaml configuration for the Vault Guardian. // The vault config uses key=value format (not YAML, despite the file extension). // Peer discovery is dynamic via RQLite — no static peer list needed. @@ -532,6 +658,57 @@ func (sg *SecretGenerator) EnsureSecretsEncryptionKey() (string, error) { return key, nil } +// EnsureTURNSecret gets or generates the HMAC-SHA1 shared secret used to mint +// TURN credentials for WebRTC (the http_gateway.webrtc.turn_secret field). +// The secret is a 32-byte random value stored as 64 hex characters. +// +// It MUST be identical on every namespace-gateway node in a cluster and stable +// across restarts AND config regenerations — otherwise the namespace reconciler +// sees drift (desired vs on-disk) and restarts gateways with an empty secret, +// which makes turn.credentials return namespace_not_configured (feat-124 #913, +// the AnChat outage). Persisting the secret to the secrets dir is what lets it +// survive Phase4 config regeneration: GenerateNodeConfig reads this file rather +// than relying on the (regenerated-from-template) node.yaml. Joining nodes +// receive the value through the join flow rather than generating their own. +func (sg *SecretGenerator) EnsureTURNSecret() (string, error) { + secretPath := filepath.Join(sg.oramaDir, "secrets", "turn-secret") + secretDir := filepath.Dir(secretPath) + + if err := os.MkdirAll(secretDir, 0700); err != nil { + return "", fmt.Errorf("failed to create secrets directory: %w", err) + } + if err := os.Chmod(secretDir, 0700); err != nil { + return "", fmt.Errorf("failed to set secrets directory permissions: %w", err) + } + + // Try to read existing secret + if data, err := os.ReadFile(secretPath); err == nil { + secret := strings.TrimSpace(string(data)) + if len(secret) == 64 { + if err := ensureSecretFilePermissions(secretPath); err != nil { + return "", err + } + return secret, nil + } + } + + // Generate new secret (32 bytes = 64 hex chars) + secretBytes := make([]byte, 32) + if _, err := rand.Read(secretBytes); err != nil { + return "", fmt.Errorf("failed to generate TURN secret: %w", err) + } + secret := hex.EncodeToString(secretBytes) + + if err := os.WriteFile(secretPath, []byte(secret), 0600); err != nil { + return "", fmt.Errorf("failed to save TURN secret: %w", err) + } + if err := ensureSecretFilePermissions(secretPath); err != nil { + return "", err + } + + return secret, nil +} + func ensureSecretFilePermissions(secretPath string) error { if err := os.Chmod(secretPath, 0600); err != nil { return fmt.Errorf("failed to set permissions on %s: %w", secretPath, err) diff --git a/core/pkg/environments/production/orchestrator.go b/core/pkg/environments/production/orchestrator.go index 42be586..b910f30 100644 --- a/core/pkg/environments/production/orchestrator.go +++ b/core/pkg/environments/production/orchestrator.go @@ -599,6 +599,14 @@ func (ps *ProductionSetup) Phase3GenerateSecrets() error { } ps.logf(" ✓ Secrets encryption key ensured") + // WebRTC TURN shared secret (feat-124 #913). Persisting it here lets the + // TURN config survive Phase4 config regeneration so namespace gateways are + // never restarted with an empty turn_secret (the AnChat outage). + if _, err := ps.secretGenerator.EnsureTURNSecret(); err != nil { + return fmt.Errorf("failed to ensure TURN secret: %w", err) + } + ps.logf(" ✓ TURN secret ensured") + // Node identity (unified architecture) peerID, err := ps.secretGenerator.EnsureNodeIdentity() if err != nil { diff --git a/core/pkg/environments/production/turn_secret_test.go b/core/pkg/environments/production/turn_secret_test.go new file mode 100644 index 0000000..32077cc --- /dev/null +++ b/core/pkg/environments/production/turn_secret_test.go @@ -0,0 +1,190 @@ +package production + +import ( + "encoding/hex" + "os" + "path/filepath" + "strings" + "testing" +) + +// TestEnsureTURNSecret_generatesAndPersists verifies that a fresh oramaDir +// produces a valid 32-byte hex secret written to secrets/turn-secret. +func TestEnsureTURNSecret_generatesAndPersists(t *testing.T) { + dir := t.TempDir() + sg := NewSecretGenerator(dir) + + secret, err := sg.EnsureTURNSecret() + if err != nil { + t.Fatalf("EnsureTURNSecret failed: %v", err) + } + if len(secret) != 64 { + t.Fatalf("expected 64 hex chars, got %d (%q)", len(secret), secret) + } + raw, err := hex.DecodeString(secret) + if err != nil || len(raw) != 32 { + t.Fatalf("secret is not 32 bytes hex: err=%v len=%d", err, len(raw)) + } + + data, err := os.ReadFile(filepath.Join(dir, "secrets", "turn-secret")) + if err != nil { + t.Fatalf("reading persisted secret failed: %v", err) + } + if strings.TrimSpace(string(data)) != secret { + t.Errorf("persisted secret %q != returned secret %q", strings.TrimSpace(string(data)), secret) + } +} + +// TestEnsureTURNSecret_idempotent verifies the secret is stable across calls — +// the property that keeps TURN credentials valid across restarts and identical +// across cluster nodes (feat-124 #913). +func TestEnsureTURNSecret_idempotent(t *testing.T) { + dir := t.TempDir() + sg := NewSecretGenerator(dir) + + first, err := sg.EnsureTURNSecret() + if err != nil { + t.Fatalf("first call failed: %v", err) + } + second, err := sg.EnsureTURNSecret() + if err != nil { + t.Fatalf("second call failed: %v", err) + } + if first != second { + t.Errorf("secret changed between calls: %q != %q", first, second) + } +} + +// TestEnsureTURNSecret_regeneratesInvalid verifies a corrupt/short on-disk +// secret is replaced with a fresh valid one. +func TestEnsureTURNSecret_regeneratesInvalid(t *testing.T) { + dir := t.TempDir() + secretsDir := filepath.Join(dir, "secrets") + if err := os.MkdirAll(secretsDir, 0700); err != nil { + t.Fatalf("mkdir failed: %v", err) + } + if err := os.WriteFile(filepath.Join(secretsDir, "turn-secret"), []byte("too-short"), 0600); err != nil { + t.Fatalf("write failed: %v", err) + } + + sg := NewSecretGenerator(dir) + secret, err := sg.EnsureTURNSecret() + if err != nil { + t.Fatalf("EnsureTURNSecret failed: %v", err) + } + if len(secret) != 64 { + t.Errorf("expected regenerated 64-char secret, got %d (%q)", len(secret), secret) + } +} + +// writeNodeYAML is a test helper that writes content to the canonical node +// config path the config generator reads/writes. +func writeNodeYAML(t *testing.T, oramaDir, content string) { + t.Helper() + configDir := filepath.Join(oramaDir, "configs") + if err := os.MkdirAll(configDir, 0755); err != nil { + t.Fatalf("mkdir configs failed: %v", err) + } + if err := os.WriteFile(filepath.Join(configDir, "node.yaml"), []byte(content), 0644); err != nil { + t.Fatalf("write node.yaml failed: %v", err) + } +} + +// TestGenerateNodeConfig_preservesExistingWebRTC is the regression test for the +// feat-124 #913 outage: a regen must NOT wipe an operator's webrtc block. We +// write a node.yaml with a full webrtc block, regenerate, and assert the block +// (enabled, sfu_port, turn_domain, turn_secret) survives — and that the secret +// gets persisted to the durable secrets file. +func TestGenerateNodeConfig_preservesExistingWebRTC(t *testing.T) { + const turnSecret = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" + const turnDomain = "turn.ns-anchat.dbrs.space" + + dir := t.TempDir() + writeNodeYAML(t, dir, `http_gateway: + enabled: true + webrtc: + enabled: true + sfu_port: 30007 + turn_domain: "turn.ns-anchat.dbrs.space" + turn_secret: "`+turnSecret+`" +`) + + cg := NewConfigGenerator(dir) + out, err := cg.GenerateNodeConfig(nil, "10.0.0.5", "", "node-1.dbrs.space", "dbrs.space", false) + if err != nil { + t.Fatalf("GenerateNodeConfig failed: %v", err) + } + + for _, want := range []string{ + "webrtc:", + "turn_secret: \"" + turnSecret + "\"", + "turn_domain: \"" + turnDomain + "\"", + "sfu_port: 30007", + } { + if !strings.Contains(out, want) { + t.Errorf("regenerated node.yaml missing %q\n---\n%s", want, out) + } + } + + // The secret must now be durable in the secrets file (yaml-had-secret → + // file gets persisted), so the NEXT regen survives even if the operator's + // yaml is gone. + persisted, err := os.ReadFile(filepath.Join(dir, "secrets", "turn-secret")) + if err != nil { + t.Fatalf("TURN secret was not persisted to secrets dir: %v", err) + } + if strings.TrimSpace(string(persisted)) != turnSecret { + t.Errorf("persisted secret %q != yaml secret %q", strings.TrimSpace(string(persisted)), turnSecret) + } +} + +// TestGenerateNodeConfig_persistedSecretSurvivesWipedYAML verifies the durable +// mechanism: once the secret is in secrets/turn-secret, a regen from a node.yaml +// that LOST its webrtc block still renders turn_secret (defaulting sfu_port). +func TestGenerateNodeConfig_persistedSecretSurvivesWipedYAML(t *testing.T) { + const turnSecret = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + + dir := t.TempDir() + secretsDir := filepath.Join(dir, "secrets") + if err := os.MkdirAll(secretsDir, 0700); err != nil { + t.Fatalf("mkdir secrets failed: %v", err) + } + if err := os.WriteFile(filepath.Join(secretsDir, "turn-secret"), []byte(turnSecret), 0600); err != nil { + t.Fatalf("write turn-secret failed: %v", err) + } + // Existing node.yaml with NO webrtc block (simulates the wiped state). + writeNodeYAML(t, dir, "http_gateway:\n enabled: true\n") + + cg := NewConfigGenerator(dir) + out, err := cg.GenerateNodeConfig(nil, "10.0.0.5", "", "node-1.dbrs.space", "dbrs.space", false) + if err != nil { + t.Fatalf("GenerateNodeConfig failed: %v", err) + } + + if !strings.Contains(out, "turn_secret: \""+turnSecret+"\"") { + t.Errorf("rendered node.yaml missing persisted turn_secret\n---\n%s", out) + } + // sfu_port had no source → defaults to the named constant. + if !strings.Contains(out, "sfu_port: 30000") { + t.Errorf("expected default sfu_port 30000, got:\n%s", out) + } +} + +// TestGenerateNodeConfig_noWebRTCOmitsBlock verifies clusters without any TURN +// config render no webrtc block at all (no empty values leak in). +func TestGenerateNodeConfig_noWebRTCOmitsBlock(t *testing.T) { + dir := t.TempDir() + cg := NewConfigGenerator(dir) + + out, err := cg.GenerateNodeConfig(nil, "10.0.0.5", "", "node-1.dbrs.space", "dbrs.space", false) + if err != nil { + t.Fatalf("GenerateNodeConfig failed: %v", err) + } + if strings.Contains(out, "webrtc:") { + t.Errorf("expected no webrtc block when no TURN config present, got:\n%s", out) + } + // Sanity: ensure no orphan turn-secret file was created. + if _, err := os.Stat(filepath.Join(dir, "secrets", "turn-secret")); !os.IsNotExist(err) { + t.Errorf("turn-secret file should not exist when no TURN config present") + } +} diff --git a/core/pkg/environments/templates/node.yaml b/core/pkg/environments/templates/node.yaml index 4cb1710..740d66d 100644 --- a/core/pkg/environments/templates/node.yaml +++ b/core/pkg/environments/templates/node.yaml @@ -94,6 +94,16 @@ http_gateway: # (bugboard #837). Sourced from ~/.orama/secrets/secrets-encryption-key. secrets_encryption_key: "{{.SecretsEncryptionKey}}" {{- end}} - +{{- if .TURNSecret}} + # WebRTC/TURN config (feat-124 #913). turn_secret is sourced from + # ~/.orama/secrets/turn-secret so it survives config regeneration; + # turn_domain/sfu_port are carried forward from the previous node.yaml. + webrtc: + enabled: true + sfu_port: {{.SFUPort}} + turn_domain: "{{.TURNDomain}}" + turn_secret: "{{.TURNSecret}}" +{{- end}} + # Routes for internal service reverse proxy (kept for backwards compatibility but not used by full gateway) routes: {} diff --git a/core/pkg/environments/templates/render.go b/core/pkg/environments/templates/render.go index 8258d26..27581f1 100644 --- a/core/pkg/environments/templates/render.go +++ b/core/pkg/environments/templates/render.go @@ -55,6 +55,17 @@ type NodeConfigData struct { // config (the gateway then reads the secret file directly / get_secret // stays disabled until the key is configured). SecretsEncryptionKey string + + // WebRTC/TURN configuration, rendered under http_gateway.webrtc when + // WebRTCEnabled is true (feat-124 #913). TURNSecret is sourced from + // ~/.orama/secrets/turn-secret so it survives Phase4 config regeneration; + // TURNDomain/SFUPort are operator-set values carried forward from the + // existing node.yaml. The whole block is conditional on TURNSecret being + // set — clusters without TURN render nothing. + WebRTCEnabled bool // Whether to emit the webrtc block + SFUPort int // Local SFU signaling port the gateway proxies to + TURNDomain string // TURN domain (e.g., "turn.ns-myapp.dbrs.space") + TURNSecret string // HMAC-SHA1 shared secret for TURN credential generation } // GatewayConfigData holds parameters for gateway.yaml rendering diff --git a/core/pkg/environments/templates/render_test.go b/core/pkg/environments/templates/render_test.go index ebf40ba..847f536 100644 --- a/core/pkg/environments/templates/render_test.go +++ b/core/pkg/environments/templates/render_test.go @@ -67,6 +67,42 @@ func TestRenderNodeConfig_secretsEncryptionKey(t *testing.T) { } } +func TestRenderNodeConfig_webRTC(t *testing.T) { + const secret = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" + + // Happy path: TURN secret present → full webrtc block rendered. + withWebRTC, err := RenderNodeConfig(NodeConfigData{ + NodeID: "node1", + WebRTCEnabled: true, + SFUPort: 30007, + TURNDomain: "turn.ns-anchat.dbrs.space", + TURNSecret: secret, + }) + if err != nil { + t.Fatalf("RenderNodeConfig failed: %v", err) + } + for _, want := range []string{ + "webrtc:", + "enabled: true", + "sfu_port: 30007", + "turn_domain: \"turn.ns-anchat.dbrs.space\"", + "turn_secret: \"" + secret + "\"", + } { + if !strings.Contains(withWebRTC, want) { + t.Errorf("rendered node config missing webrtc line %q\n---\n%s", want, withWebRTC) + } + } + + // Edge case: no TURN secret → block omitted entirely. + withoutWebRTC, err := RenderNodeConfig(NodeConfigData{NodeID: "node1"}) + if err != nil { + t.Fatalf("RenderNodeConfig failed: %v", err) + } + if strings.Contains(withoutWebRTC, "webrtc:") { + t.Errorf("empty TURN secret should omit webrtc block, got:\n%s", withoutWebRTC) + } +} + func TestRenderGatewayConfig(t *testing.T) { bootstrapMultiaddr := "/ip4/127.0.0.1/tcp/4001/p2p/Qm1234567890" data := GatewayConfigData{ diff --git a/core/pkg/gateway/handlers/join/handler.go b/core/pkg/gateway/handlers/join/handler.go index b4f3d76..16b7a6b 100644 --- a/core/pkg/gateway/handlers/join/handler.go +++ b/core/pkg/gateway/handlers/join/handler.go @@ -34,14 +34,17 @@ type JoinResponse struct { WGPeers []WGPeerInfo `json:"wg_peers"` // Secrets - ClusterSecret string `json:"cluster_secret"` - SwarmKey string `json:"swarm_key"` - APIKeyHMACSecret string `json:"api_key_hmac_secret,omitempty"` - RQLitePassword string `json:"rqlite_password,omitempty"` - OlricEncryptionKey string `json:"olric_encryption_key,omitempty"` + ClusterSecret string `json:"cluster_secret"` + SwarmKey string `json:"swarm_key"` + APIKeyHMACSecret string `json:"api_key_hmac_secret,omitempty"` + RQLitePassword string `json:"rqlite_password,omitempty"` + OlricEncryptionKey string `json:"olric_encryption_key,omitempty"` // Serverless secrets encryption key (bugboard #837) — must be identical on // every node so namespace function secrets decrypt cluster-wide. SecretsEncryptionKey string `json:"secrets_encryption_key,omitempty"` + // TURN shared secret (feat-124 #913) — must be identical on every node so + // WebRTC TURN credentials validate cluster-wide. + TURNSecret string `json:"turn_secret,omitempty"` // Cluster join info (all using WG IPs) RQLiteJoinAddress string `json:"rqlite_join_address"` @@ -210,6 +213,13 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { secretsEncryptionKey = strings.TrimSpace(string(data)) } + // Read TURN shared secret (optional — may not exist on older clusters; + // feat-124 #913) + turnSecret := "" + if data, err := os.ReadFile(h.oramaDir + "/secrets/turn-secret"); err == nil { + turnSecret = strings.TrimSpace(string(data)) + } + // 7. Get this node's WG IP (needed before peer list to check self-inclusion) myWGIP, err := h.getMyWGIP() if err != nil { @@ -274,21 +284,22 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { olricPeers = append(olricPeers, fmt.Sprintf("%s:3322", myWGIP)) resp := JoinResponse{ - WGIP: wgIP, - WGPeers: wgPeers, - ClusterSecret: strings.TrimSpace(string(clusterSecret)), - SwarmKey: strings.TrimSpace(string(swarmKey)), - APIKeyHMACSecret: apiKeyHMACSecret, - RQLitePassword: rqlitePassword, - OlricEncryptionKey: olricEncryptionKey, + WGIP: wgIP, + WGPeers: wgPeers, + ClusterSecret: strings.TrimSpace(string(clusterSecret)), + SwarmKey: strings.TrimSpace(string(swarmKey)), + APIKeyHMACSecret: apiKeyHMACSecret, + RQLitePassword: rqlitePassword, + OlricEncryptionKey: olricEncryptionKey, SecretsEncryptionKey: secretsEncryptionKey, - RQLiteJoinAddress: fmt.Sprintf("%s:7001", myWGIP), - IPFSPeer: ipfsPeer, - IPFSClusterPeer: ipfsClusterPeer, - IPFSClusterPeerIDs: ipfsClusterPeerIDs, - BootstrapPeers: bootstrapPeers, - OlricPeers: olricPeers, - BaseDomain: baseDomain, + TURNSecret: turnSecret, + RQLiteJoinAddress: fmt.Sprintf("%s:7001", myWGIP), + IPFSPeer: ipfsPeer, + IPFSClusterPeer: ipfsClusterPeer, + IPFSClusterPeerIDs: ipfsClusterPeerIDs, + BootstrapPeers: bootstrapPeers, + OlricPeers: olricPeers, + BaseDomain: baseDomain, } w.Header().Set("Content-Type", "application/json") diff --git a/core/pkg/gateway/handlers/namespace/spawn_handler.go b/core/pkg/gateway/handlers/namespace/spawn_handler.go index 392ce63..1758b50 100644 --- a/core/pkg/gateway/handlers/namespace/spawn_handler.go +++ b/core/pkg/gateway/handlers/namespace/spawn_handler.go @@ -45,33 +45,36 @@ type SpawnRequest struct { GatewayOlricServers []string `json:"gateway_olric_servers,omitempty"` GatewayOlricTimeout string `json:"gateway_olric_timeout,omitempty"` IPFSClusterAPIURL string `json:"ipfs_cluster_api_url,omitempty"` - IPFSAPIURL string `json:"ipfs_api_url,omitempty"` - IPFSTimeout string `json:"ipfs_timeout,omitempty"` - IPFSReplicationFactor int `json:"ipfs_replication_factor,omitempty"` + IPFSAPIURL string `json:"ipfs_api_url,omitempty"` + IPFSTimeout string `json:"ipfs_timeout,omitempty"` + IPFSReplicationFactor int `json:"ipfs_replication_factor,omitempty"` // Gateway WebRTC config (when action = "spawn-gateway" and WebRTC is enabled) GatewayWebRTCEnabled bool `json:"gateway_webrtc_enabled,omitempty"` GatewaySFUPort int `json:"gateway_sfu_port,omitempty"` GatewayTURNDomain string `json:"gateway_turn_domain,omitempty"` GatewayTURNSecret string `json:"gateway_turn_secret,omitempty"` + // Host serverless secrets encryption key forwarded to the spawned + // namespace gateway (bugboard #837 follow-up). Same value on every node. + GatewaySecretsEncryptionKey string `json:"gateway_secrets_encryption_key,omitempty"` // SFU config (when action = "spawn-sfu") - SFUListenAddr string `json:"sfu_listen_addr,omitempty"` - SFUMediaStart int `json:"sfu_media_start,omitempty"` - SFUMediaEnd int `json:"sfu_media_end,omitempty"` - TURNServers []sfu.TURNServerConfig `json:"turn_servers,omitempty"` - TURNSecret string `json:"turn_secret,omitempty"` - TURNCredTTL int `json:"turn_cred_ttl,omitempty"` - RQLiteDSN string `json:"rqlite_dsn,omitempty"` + SFUListenAddr string `json:"sfu_listen_addr,omitempty"` + SFUMediaStart int `json:"sfu_media_start,omitempty"` + SFUMediaEnd int `json:"sfu_media_end,omitempty"` + TURNServers []sfu.TURNServerConfig `json:"turn_servers,omitempty"` + TURNSecret string `json:"turn_secret,omitempty"` + TURNCredTTL int `json:"turn_cred_ttl,omitempty"` + RQLiteDSN string `json:"rqlite_dsn,omitempty"` // TURN config (when action = "spawn-turn") - TURNListenAddr string `json:"turn_listen_addr,omitempty"` - TURNTURNSAddr string `json:"turn_turns_addr,omitempty"` - TURNPublicIP string `json:"turn_public_ip,omitempty"` - TURNRealm string `json:"turn_realm,omitempty"` - TURNAuthSecret string `json:"turn_auth_secret,omitempty"` - TURNRelayStart int `json:"turn_relay_start,omitempty"` - TURNRelayEnd int `json:"turn_relay_end,omitempty"` - TURNDomain string `json:"turn_domain,omitempty"` + TURNListenAddr string `json:"turn_listen_addr,omitempty"` + TURNTURNSAddr string `json:"turn_turns_addr,omitempty"` + TURNPublicIP string `json:"turn_public_ip,omitempty"` + TURNRealm string `json:"turn_realm,omitempty"` + TURNAuthSecret string `json:"turn_auth_secret,omitempty"` + TURNRelayStart int `json:"turn_relay_start,omitempty"` + TURNRelayEnd int `json:"turn_relay_end,omitempty"` + TURNDomain string `json:"turn_domain,omitempty"` // Cluster state (when action = "save-cluster-state") ClusterState json.RawMessage `json:"cluster_state,omitempty"` @@ -235,6 +238,7 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { SFUPort: req.GatewaySFUPort, TURNDomain: req.GatewayTURNDomain, TURNSecret: req.GatewayTURNSecret, + SecretsEncryptionKey: req.GatewaySecretsEncryptionKey, } if err := h.systemdSpawner.SpawnGateway(ctx, req.Namespace, req.NodeID, cfg); err != nil { h.logger.Error("Failed to spawn Gateway instance", zap.Error(err)) @@ -288,6 +292,7 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { SFUPort: req.GatewaySFUPort, TURNDomain: req.GatewayTURNDomain, TURNSecret: req.GatewayTURNSecret, + SecretsEncryptionKey: req.GatewaySecretsEncryptionKey, } if err := h.systemdSpawner.RestartGateway(ctx, req.Namespace, req.NodeID, cfg); err != nil { h.logger.Error("Failed to restart Gateway instance", zap.Error(err)) diff --git a/core/pkg/gateway/handlers/push/resolve_caller_test.go b/core/pkg/gateway/handlers/push/resolve_caller_test.go index 548b61a..46986d9 100644 --- a/core/pkg/gateway/handlers/push/resolve_caller_test.go +++ b/core/pkg/gateway/handlers/push/resolve_caller_test.go @@ -10,9 +10,9 @@ import ( "github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys" ) -// Bugboard #548 — a push device must be keyed on the stable identity (rootId) +// Bugboard #548 — a push device must be keyed on the stable identity (accountId) // when the app provides one, not the wallet credential that authenticated the -// session. resolveCallerUserID prefers the `root_id` custom claim and falls +// session. resolveCallerUserID prefers the `account_id` custom claim and falls // back to the JWT subject so single-credential apps keep working. func reqWithClaims(t *testing.T, claims *authsvc.JWTClaims) *http.Request { @@ -28,10 +28,10 @@ func reqWithClaims(t *testing.T, claims *authsvc.JWTClaims) *http.Request { func TestResolveCallerUserID_prefersRootIDClaim(t *testing.T) { r := reqWithClaims(t, &authsvc.JWTClaims{ Sub: "0xWALLET", - Custom: map[string]string{rootIDClaim: "root-uuid-123"}, + Custom: map[string]string{accountIDClaim: "root-uuid-123"}, }) if got := resolveCallerUserID(r); got != "root-uuid-123" { - t.Errorf("want rootId from claim, got %q", got) + t.Errorf("want accountId from claim, got %q", got) } } @@ -44,13 +44,13 @@ func TestResolveCallerUserID_fallsBackToSubject(t *testing.T) { } func TestResolveCallerUserID_emptyRootIDFallsBack(t *testing.T) { - // An empty root_id must not collapse identity to "" — fall back to subject. + // An empty account_id must not collapse identity to "" — fall back to subject. r := reqWithClaims(t, &authsvc.JWTClaims{ Sub: "0xWALLET", - Custom: map[string]string{rootIDClaim: ""}, + Custom: map[string]string{accountIDClaim: ""}, }) if got := resolveCallerUserID(r); got != "0xWALLET" { - t.Errorf("want wallet fallback on empty root_id, got %q", got) + t.Errorf("want wallet fallback on empty account_id, got %q", got) } } diff --git a/core/pkg/gateway/handlers/push/types.go b/core/pkg/gateway/handlers/push/types.go index 9e2b568..6520126 100644 --- a/core/pkg/gateway/handlers/push/types.go +++ b/core/pkg/gateway/handlers/push/types.go @@ -141,17 +141,19 @@ func resolveNamespace(r *http.Request) string { return "" } -// rootIDClaim is the custom JWT claim an app may set to carry the stable -// identity (rootId) that a device should be keyed on, independent of which -// wallet credential authenticated the session. See bugboard #548. -const rootIDClaim = "root_id" +// accountIDClaim is the custom JWT claim an app may set to carry the stable +// account identity (e.g. anchat's users.user_id) that a device should be +// keyed on, independent of which wallet credential authenticated the +// session. Injected at mint time by the namespace's claims-provider hook. +// See bugboard #548 (name agreed in comment #906/#920). +const accountIDClaim = "account_id" // resolveCallerUserID extracts the identity a push device should be keyed on. // // In a multi-credential app (anchat), the JWT subject is the *wallet* — a // credential, not the identity. A single user (rootId) with N linked wallets // would otherwise register N device rows and receive N duplicate pushes -// (bugboard #548). When the app includes a stable `root_id` custom claim, we +// (bugboard #548). When the app includes a stable `account_id` custom claim, we // key on that; otherwise we fall back to the subject (wallet) so single- // credential apps and older tokens keep working unchanged. // @@ -159,7 +161,7 @@ const rootIDClaim = "root_id" func resolveCallerUserID(r *http.Request) string { if v := r.Context().Value(ctxkeys.JWT); v != nil { if claims, ok := v.(*auth.JWTClaims); ok && claims != nil { - if rootID, ok := claims.Custom[rootIDClaim]; ok && rootID != "" { + if rootID, ok := claims.Custom[accountIDClaim]; ok && rootID != "" { return rootID } return claims.Sub diff --git a/core/pkg/gateway/instance_spawner.go b/core/pkg/gateway/instance_spawner.go index 618c29a..3b21f32 100644 --- a/core/pkg/gateway/instance_spawner.go +++ b/core/pkg/gateway/instance_spawner.go @@ -55,17 +55,17 @@ type InstanceSpawner struct { // GatewayInstance represents a running Gateway instance for a namespace type GatewayInstance struct { - Namespace string - NodeID string - HTTPPort int - BaseDomain string - RQLiteDSN string // Connection to namespace RQLite - OlricServers []string // Connection to namespace Olric - ConfigPath string - PID int - StartedAt time.Time - cmd *exec.Cmd - logger *zap.Logger + Namespace string + NodeID string + HTTPPort int + BaseDomain string + RQLiteDSN string // Connection to namespace RQLite + OlricServers []string // Connection to namespace Olric + ConfigPath string + PID int + StartedAt time.Time + cmd *exec.Cmd + logger *zap.Logger // mu protects mutable state accessed concurrently by the monitor goroutine. mu sync.RWMutex @@ -75,16 +75,16 @@ type GatewayInstance struct { // InstanceConfig holds configuration for spawning a Gateway instance type InstanceConfig struct { - Namespace string // Namespace name (e.g., "alice") - NodeID string // Physical node ID - HTTPPort int // HTTP API port - BaseDomain string // Base domain (e.g., "orama-devnet.network") - RQLiteDSN string // RQLite connection DSN (e.g., "http://localhost:10000") - GlobalRQLiteDSN string // Global RQLite DSN for API key validation (empty = use RQLiteDSN) - OlricServers []string // Olric server addresses - OlricTimeout time.Duration // Timeout for Olric operations - NodePeerID string // Physical node's peer ID for home node management - DataDir string // Data directory for deployments, SQLite, etc. + Namespace string // Namespace name (e.g., "alice") + NodeID string // Physical node ID + HTTPPort int // HTTP API port + BaseDomain string // Base domain (e.g., "orama-devnet.network") + RQLiteDSN string // RQLite connection DSN (e.g., "http://localhost:10000") + GlobalRQLiteDSN string // Global RQLite DSN for API key validation (empty = use RQLiteDSN) + OlricServers []string // Olric server addresses + OlricTimeout time.Duration // Timeout for Olric operations + NodePeerID string // Physical node's peer ID for home node management + DataDir string // Data directory for deployments, SQLite, etc. // IPFS configuration for storage endpoints IPFSClusterAPIURL string // IPFS Cluster API URL (e.g., "http://localhost:9094") IPFSAPIURL string // IPFS API URL (e.g., "http://localhost:5001") @@ -95,6 +95,15 @@ type InstanceConfig struct { SFUPort int // SFU signaling port on this node TURNDomain string // TURN server domain (e.g., "turn.ns-alice.orama-devnet.network") TURNSecret string // TURN shared secret for credential generation + // SecretsEncryptionKey is the host-wide AES-256 serverless secrets + // encryption key (hex-encoded). Bugboard #837 follow-up: the host gateway + // receives this via gateway.Config but spawned namespace gateways never + // did, so `function secrets list` returned 501 on namespaces. It is the + // SAME value on every node — read once from the host's + // secrets/secrets-encryption-key file — and must be identical across the + // namespace cluster so a secret encrypted by one gateway decrypts on + // another. Empty means secrets management stays disabled (fail-loud). + SecretsEncryptionKey string } // GatewayYAMLWebRTC represents the webrtc section of the gateway YAML config. @@ -125,6 +134,13 @@ type GatewayYAMLConfig struct { IPFSTimeout string `yaml:"ipfs_timeout,omitempty"` IPFSReplicationFactor int `yaml:"ipfs_replication_factor,omitempty"` WebRTC GatewayYAMLWebRTC `yaml:"webrtc,omitempty"` + // SecretsEncryptionKey carries the host's serverless secrets encryption + // key into the spawned namespace gateway so it can decrypt/encrypt + // function secrets (bugboard #837 follow-up). The standalone gateway + // binary loads this back into gateway.Config.SecretsEncryptionKey on + // startup. Because this is key material, generateConfig writes the file + // 0600. Empty omits the field (secrets management stays disabled). + SecretsEncryptionKey string `yaml:"secrets_encryption_key,omitempty"` // ClusterSecretPath points to the host's cluster-secret file. Bug #215 // follow-up: namespace gateways spawned by systemd previously had no // way to access the cluster secret, so they fell back to per-node @@ -209,9 +225,9 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig // Find the gateway binary - look in common locations var gatewayBinary string possiblePaths := []string{ - "./bin/gateway", // Development build - "/usr/local/bin/orama-gateway", // System-wide install - "/opt/orama/bin/gateway", // Package install + "./bin/gateway", // Development build + "/usr/local/bin/orama-gateway", // System-wide install + "/opt/orama/bin/gateway", // Package install } for _, path := range possiblePaths { @@ -323,6 +339,7 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig, TURNDomain: cfg.TURNDomain, TURNSecret: cfg.TURNSecret, }, + SecretsEncryptionKey: cfg.SecretsEncryptionKey, } // Set Olric timeout if provided if cfg.OlricTimeout > 0 { @@ -341,12 +358,24 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig, } } - if err := os.WriteFile(configPath, data, 0644); err != nil { + // 0600: this YAML now embeds the serverless secrets encryption key + // (bugboard #837), so it must not be world/group readable. + if err := os.WriteFile(configPath, data, 0600); err != nil { return &InstanceError{ Message: "failed to write Gateway config", Cause: err, } } + // WriteFile's mode only applies on CREATE — a pre-existing file (e.g. + // written 0644 by an older release) keeps its old perms on rewrite. + // Converge explicitly so upgraded nodes don't leave the embedded + // secrets key group/world-readable. + if err := os.Chmod(configPath, 0600); err != nil { + return &InstanceError{ + Message: "failed to set Gateway config permissions", + Cause: err, + } + } return nil } diff --git a/core/pkg/gateway/instance_spawner_test.go b/core/pkg/gateway/instance_spawner_test.go index 56b210a..3849e29 100644 --- a/core/pkg/gateway/instance_spawner_test.go +++ b/core/pkg/gateway/instance_spawner_test.go @@ -1,9 +1,12 @@ package gateway import ( + "os" + "path/filepath" "strings" "testing" + "go.uber.org/zap" "gopkg.in/yaml.v3" ) @@ -65,6 +68,114 @@ func TestGatewayYAMLConfig_clusterSecretPathRoundTrip(t *testing.T) { } } +// TestGatewayYAMLConfig_secretsEncryptionKeyRoundTrip is the regression test +// for the bugboard #837 follow-up: the host gateway received the serverless +// secrets encryption key but namespace gateways spawned via systemd did not, +// because the YAML schema had no field to carry it — so `function secrets +// list` returned 501 on those namespaces. This guards the yaml tag and that +// the standalone gateway's yamlCfg mirror can read it back. +func TestGatewayYAMLConfig_secretsEncryptionKeyRoundTrip(t *testing.T) { + const key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" + cfg := GatewayYAMLConfig{ + ListenAddr: ":6001", + ClientNamespace: "anchat-test", + RQLiteDSN: "http://localhost:10000", + OlricServers: []string{"localhost:3320"}, + SecretsEncryptionKey: key, + } + out, err := yaml.Marshal(cfg) + if err != nil { + t.Fatalf("marshal: %v", err) + } + if !strings.Contains(string(out), "secrets_encryption_key: "+key) { + t.Fatalf("YAML output missing expected secrets_encryption_key line:\n%s", out) + } + + // Mirror of cmd/gateway/config.go's yamlCfg so this test catches drift + // between the two declarations (the standalone gateway uses strict + // decoding and would reject an unknown field). + type webrtc struct { + Enabled bool `yaml:"enabled"` + SFUPort int `yaml:"sfu_port"` + TURNDomain string `yaml:"turn_domain"` + TURNSecret string `yaml:"turn_secret"` + } + type yamlCfgMirror struct { + ListenAddr string `yaml:"listen_addr"` + ClientNamespace string `yaml:"client_namespace"` + RQLiteDSN string `yaml:"rqlite_dsn"` + OlricServers []string `yaml:"olric_servers"` + WebRTC webrtc `yaml:"webrtc"` + SecretsEncryptionKey string `yaml:"secrets_encryption_key"` + ClusterSecretPath string `yaml:"cluster_secret_path"` + } + var parsed yamlCfgMirror + if err := yaml.Unmarshal(out, &parsed); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if parsed.SecretsEncryptionKey != key { + t.Errorf("round-trip mismatch: got %q, want %q", parsed.SecretsEncryptionKey, key) + } +} + +// TestGatewayYAMLConfig_secretsKeyOmitWhenEmpty: a host with no secrets key +// (legacy/test rigs) must not emit a stray secrets_encryption_key line that +// operators could mistake for an empty-key directive. +func TestGatewayYAMLConfig_secretsKeyOmitWhenEmpty(t *testing.T) { + cfg := GatewayYAMLConfig{ + ListenAddr: ":6001", + ClientNamespace: "ns", + RQLiteDSN: "http://localhost:10000", + OlricServers: []string{"localhost:3320"}, + // SecretsEncryptionKey intentionally empty. + } + out, err := yaml.Marshal(cfg) + if err != nil { + t.Fatalf("marshal: %v", err) + } + if strings.Contains(string(out), "secrets_encryption_key") { + t.Errorf("empty SecretsEncryptionKey should be omitted from YAML; got:\n%s", out) + } +} + +// TestGenerateConfig_writesSecretsKeyWith0600 verifies the spawned namespace +// gateway YAML carries the secrets key AND is written 0600 (the file now +// holds key material — bugboard #837). +func TestGenerateConfig_writesSecretsKeyWith0600(t *testing.T) { + const key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" + dir := t.TempDir() + is := NewInstanceSpawner(dir, zap.NewNop()) + configPath := filepath.Join(dir, "gateway-node-1.yaml") + + cfg := InstanceConfig{ + Namespace: "anchat-test", + NodeID: "node-1", + HTTPPort: 6001, + RQLiteDSN: "http://localhost:10000", + OlricServers: []string{"localhost:3320"}, + SecretsEncryptionKey: key, + } + if err := is.generateConfig(configPath, cfg, dir); err != nil { + t.Fatalf("generateConfig: %v", err) + } + + info, err := os.Stat(configPath) + if err != nil { + t.Fatalf("stat: %v", err) + } + if perm := info.Mode().Perm(); perm != 0600 { + t.Errorf("config perms = %o, want 0600 (file holds the secrets key)", perm) + } + + data, err := os.ReadFile(configPath) + if err != nil { + t.Fatalf("read: %v", err) + } + if !strings.Contains(string(data), "secrets_encryption_key: "+key) { + t.Errorf("generated config missing secrets_encryption_key:\n%s", data) + } +} + // TestGatewayYAMLConfig_omitWhenEmpty: when the host has no cluster secret, // the field is omitted from the YAML so legacy single-node test rigs don't // see a stray "cluster_secret_path: " line that operators might mistake for diff --git a/core/pkg/namespace/cluster_manager.go b/core/pkg/namespace/cluster_manager.go index 3183aa8..4f687ce 100644 --- a/core/pkg/namespace/cluster_manager.go +++ b/core/pkg/namespace/cluster_manager.go @@ -45,6 +45,13 @@ type ClusterManagerConfig struct { // cluster-wide JWT signing key (bug #215 fix). Empty string disables // cross-node JWT verification within namespace clusters. ClusterSecretPath string + + // SecretsEncryptionKey is the host's serverless secrets encryption key + // (AES-256, hex-encoded), read once from secrets/secrets-encryption-key. + // Forwarded to spawned namespace gateways so `function secrets ...` + // works there (bugboard #837 follow-up). Empty leaves namespace-gateway + // secrets management disabled (fail-loud). + SecretsEncryptionKey string } // ClusterManager orchestrates namespace cluster provisioning and lifecycle @@ -56,9 +63,9 @@ type ClusterManager struct { systemdSpawner *SystemdSpawner // NEW: Systemd-based spawner replaces old spawners dnsManager *DNSRecordManager logger *zap.Logger - baseDomain string - baseDataDir string - globalRQLiteDSN string // Global RQLite DSN for namespace gateway auth + baseDomain string + baseDataDir string + globalRQLiteDSN string // Global RQLite DSN for namespace gateway auth // IPFS configuration for namespace gateways ipfsClusterAPIURL string @@ -72,6 +79,10 @@ type ClusterManager struct { // AES-256 key for encrypting TURN secrets in RQLite (nil = plaintext) turnEncryptionKey []byte + // Host's serverless secrets encryption key, forwarded to spawned + // namespace gateways (bugboard #837 follow-up). Empty = disabled. + secretsEncryptionKey string + // Track provisioning operations provisioningMu sync.RWMutex provisioning map[string]bool // namespace -> in progress @@ -123,6 +134,7 @@ func NewClusterManager( ipfsTimeout: ipfsTimeout, ipfsReplicationFactor: ipfsReplicationFactor, turnEncryptionKey: cfg.TurnEncryptionKey, + secretsEncryptionKey: cfg.SecretsEncryptionKey, logger: logger.With(zap.String("component", "cluster-manager")), provisioning: make(map[string]bool), } @@ -170,6 +182,7 @@ func NewClusterManagerWithComponents( ipfsTimeout: ipfsTimeout, ipfsReplicationFactor: ipfsReplicationFactor, turnEncryptionKey: cfg.TurnEncryptionKey, + secretsEncryptionKey: cfg.SecretsEncryptionKey, logger: logger.With(zap.String("component", "cluster-manager")), provisioning: make(map[string]bool), } @@ -566,6 +579,7 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, + SecretsEncryptionKey: cm.secretsEncryptionKey, } var instance *gateway.GatewayInstance @@ -681,6 +695,9 @@ func (cm *ClusterManager) spawnGatewayRemote(ctx context.Context, nodeIP string, "gateway_sfu_port": cfg.SFUPort, "gateway_turn_domain": cfg.TURNDomain, "gateway_turn_secret": cfg.TURNSecret, + // Bugboard #837 follow-up: carry the host secrets encryption key to + // the remote node so its spawned namespace gateway can manage secrets. + "gateway_secrets_encryption_key": cfg.SecretsEncryptionKey, }) if err != nil { return nil, err @@ -1587,6 +1604,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, + SecretsEncryptionKey: cm.secretsEncryptionKey, } // Add WebRTC config if enabled for this namespace @@ -1659,18 +1677,18 @@ type ClusterLocalState struct { SavedAt time.Time `json:"saved_at"` // WebRTC fields (zero values when WebRTC not enabled — backward compatible) - HasSFU bool `json:"has_sfu,omitempty"` - HasTURN bool `json:"has_turn,omitempty"` - TURNSharedSecret string `json:"turn_shared_secret,omitempty"` // Needed for gateway to generate TURN credentials on cold start - TURNDomain string `json:"turn_domain,omitempty"` // TURN server domain for gateway config - TURNCredentialTTL int `json:"turn_credential_ttl,omitempty"` - SFUSignalingPort int `json:"sfu_signaling_port,omitempty"` - SFUMediaPortStart int `json:"sfu_media_port_start,omitempty"` - SFUMediaPortEnd int `json:"sfu_media_port_end,omitempty"` - TURNListenPort int `json:"turn_listen_port,omitempty"` - TURNTLSPort int `json:"turn_tls_port,omitempty"` - TURNRelayPortStart int `json:"turn_relay_port_start,omitempty"` - TURNRelayPortEnd int `json:"turn_relay_port_end,omitempty"` + HasSFU bool `json:"has_sfu,omitempty"` + HasTURN bool `json:"has_turn,omitempty"` + TURNSharedSecret string `json:"turn_shared_secret,omitempty"` // Needed for gateway to generate TURN credentials on cold start + TURNDomain string `json:"turn_domain,omitempty"` // TURN server domain for gateway config + TURNCredentialTTL int `json:"turn_credential_ttl,omitempty"` + SFUSignalingPort int `json:"sfu_signaling_port,omitempty"` + SFUMediaPortStart int `json:"sfu_media_port_start,omitempty"` + SFUMediaPortEnd int `json:"sfu_media_port_end,omitempty"` + TURNListenPort int `json:"turn_listen_port,omitempty"` + TURNTLSPort int `json:"turn_tls_port,omitempty"` + TURNRelayPortStart int `json:"turn_relay_port_start,omitempty"` + TURNRelayPortEnd int `json:"turn_relay_port_end,omitempty"` } type ClusterLocalStatePorts struct { @@ -2022,6 +2040,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, + SecretsEncryptionKey: cm.secretsEncryptionKey, } // Resolve WebRTC config. Prefer the local state file; fall back to diff --git a/core/pkg/namespace/cluster_manager_webrtc.go b/core/pkg/namespace/cluster_manager_webrtc.go index dde2c14..b1b1859 100644 --- a/core/pkg/namespace/cluster_manager_webrtc.go +++ b/core/pkg/namespace/cluster_manager_webrtc.go @@ -470,16 +470,16 @@ func (cm *ClusterManager) spawnSFURemote(ctx context.Context, nodeIP string, cfg } _, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ - "action": "spawn-sfu", - "namespace": cfg.Namespace, - "node_id": cfg.NodeID, - "sfu_listen_addr": cfg.ListenAddr, - "sfu_media_start": cfg.MediaPortStart, - "sfu_media_end": cfg.MediaPortEnd, - "turn_servers": turnServers, - "turn_secret": cfg.TURNSecret, - "turn_cred_ttl": cfg.TURNCredTTL, - "rqlite_dsn": cfg.RQLiteDSN, + "action": "spawn-sfu", + "namespace": cfg.Namespace, + "node_id": cfg.NodeID, + "sfu_listen_addr": cfg.ListenAddr, + "sfu_media_start": cfg.MediaPortStart, + "sfu_media_end": cfg.MediaPortEnd, + "turn_servers": turnServers, + "turn_secret": cfg.TURNSecret, + "turn_cred_ttl": cfg.TURNCredTTL, + "rqlite_dsn": cfg.RQLiteDSN, }) return err } @@ -487,17 +487,17 @@ func (cm *ClusterManager) spawnSFURemote(ctx context.Context, nodeIP string, cfg // spawnTURNRemote sends a spawn-turn request to a remote node func (cm *ClusterManager) spawnTURNRemote(ctx context.Context, nodeIP string, cfg TURNInstanceConfig) error { _, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ - "action": "spawn-turn", - "namespace": cfg.Namespace, - "node_id": cfg.NodeID, - "turn_listen_addr": cfg.ListenAddr, - "turn_turns_addr": cfg.TURNSListenAddr, - "turn_public_ip": cfg.PublicIP, - "turn_realm": cfg.Realm, - "turn_auth_secret": cfg.AuthSecret, - "turn_relay_start": cfg.RelayPortStart, - "turn_relay_end": cfg.RelayPortEnd, - "turn_domain": cfg.TURNDomain, + "action": "spawn-turn", + "namespace": cfg.Namespace, + "node_id": cfg.NodeID, + "turn_listen_addr": cfg.ListenAddr, + "turn_turns_addr": cfg.TURNSListenAddr, + "turn_public_ip": cfg.PublicIP, + "turn_realm": cfg.Realm, + "turn_auth_secret": cfg.AuthSecret, + "turn_relay_start": cfg.RelayPortStart, + "turn_relay_end": cfg.RelayPortEnd, + "turn_domain": cfg.TURNDomain, }) return err } @@ -716,6 +716,9 @@ func (cm *ClusterManager) restartGatewaysWithWebRTC( SFUPort: sfuPort, TURNDomain: turnDomain, TURNSecret: turnSecret, + // Bugboard #837 follow-up: preserve the secrets key on WebRTC + // restarts so enabling WebRTC doesn't drop secrets management. + SecretsEncryptionKey: cm.secretsEncryptionKey, } if node.NodeID == cm.localNodeID { @@ -764,6 +767,8 @@ func (cm *ClusterManager) restartGatewayRemote(ctx context.Context, nodeIP strin "gateway_sfu_port": cfg.SFUPort, "gateway_turn_domain": cfg.TURNDomain, "gateway_turn_secret": cfg.TURNSecret, + // Bugboard #837 follow-up: preserve the secrets key on WebRTC restarts. + "gateway_secrets_encryption_key": cfg.SecretsEncryptionKey, }) if err != nil { cm.logger.Error("Failed to restart remote gateway with WebRTC config", diff --git a/core/pkg/namespace/cluster_recovery.go b/core/pkg/namespace/cluster_recovery.go index cdc2467..e367018 100644 --- a/core/pkg/namespace/cluster_recovery.go +++ b/core/pkg/namespace/cluster_recovery.go @@ -527,6 +527,7 @@ func (cm *ClusterManager) ReplaceClusterNode(ctx context.Context, cluster *Names IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, + SecretsEncryptionKey: cm.secretsEncryptionKey, } // Add WebRTC config if enabled for this namespace @@ -1069,6 +1070,7 @@ func (cm *ClusterManager) addNodeToCluster( IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, IPFSReplicationFactor: cm.ipfsReplicationFactor, + SecretsEncryptionKey: cm.secretsEncryptionKey, } // Add WebRTC config if enabled for this namespace diff --git a/core/pkg/namespace/reconcile_gateway_test.go b/core/pkg/namespace/reconcile_gateway_test.go index 97319e3..4e88aa4 100644 --- a/core/pkg/namespace/reconcile_gateway_test.go +++ b/core/pkg/namespace/reconcile_gateway_test.go @@ -90,6 +90,62 @@ func TestGatewayWebRTCInSync_bothDisabled_returnsTrue(t *testing.T) { } } +// Bugboard #837 follow-up (drift on the secrets encryption key) — +// gatewayConfigInSync extends the bug-25 WebRTC drift check with the +// serverless secrets key. A namespace gateway spawned before the key was +// plumbed has an empty on-disk key; once the desired key is non-empty we +// want a rewrite+restart so secrets management turns on. But both-empty must +// stay a no-op so non-secrets hosts don't restart-loop. + +func TestGatewayConfigInSync_secretsKeyMissingOnDisk_returnsFalse(t *testing.T) { + // On-disk YAML has no secrets key (pre-#837 gateway), desired has one. + // MUST drift so ReconcileGateway rewrites + restarts to enable secrets. + onDisk := gateway.GatewayYAMLConfig{} // empty secrets_encryption_key + desired := gateway.InstanceConfig{SecretsEncryptionKey: "the-key"} + if gatewayConfigInSync(onDisk, desired) { + t.Fatal("empty on-disk secrets key vs non-empty desired must be out-of-sync (needs restart to enable secrets)") + } +} + +func TestGatewayConfigInSync_secretsKeyMatches_returnsTrue(t *testing.T) { + // After a reconcile, on-disk key matches desired. MUST be in-sync so the + // next boot does not restart again (no loop). + onDisk := gateway.GatewayYAMLConfig{SecretsEncryptionKey: "the-key"} + desired := gateway.InstanceConfig{SecretsEncryptionKey: "the-key"} + if !gatewayConfigInSync(onDisk, desired) { + t.Error("matching secrets key must be in-sync (no restart) — else restart loop on every boot") + } +} + +func TestGatewayConfigInSync_bothSecretsKeysEmpty_returnsTrue(t *testing.T) { + // A host with no secrets key (empty desired) and an on-disk config also + // without one MUST be in-sync — otherwise every boot would restart a + // namespace gateway that legitimately has no secrets key. + if !gatewayConfigInSync(gateway.GatewayYAMLConfig{}, gateway.InstanceConfig{}) { + t.Error("empty on-disk + empty desired secrets key must be in-sync (no restart loop)") + } +} + +func TestGatewayConfigInSync_secretsKeyRotated_returnsFalse(t *testing.T) { + // A rotated key (both non-empty but different) must drift so the rewrite + // propagates the new key. + onDisk := gateway.GatewayYAMLConfig{SecretsEncryptionKey: "old-key"} + desired := gateway.InstanceConfig{SecretsEncryptionKey: "new-key"} + if gatewayConfigInSync(onDisk, desired) { + t.Error("rotated secrets key (old != new) must be out-of-sync") + } +} + +func TestGatewayConfigInSync_webrtcDriftStillDetected(t *testing.T) { + // The combined check must not lose the bug-25 WebRTC surface: WebRTC + // drift with matching (empty) secrets keys must still report out-of-sync. + onDisk := gateway.GatewayYAMLConfig{WebRTC: gateway.GatewayYAMLWebRTC{}} + desired := gateway.InstanceConfig{WebRTCEnabled: true, SFUPort: 30000} + if gatewayConfigInSync(onDisk, desired) { + t.Error("WebRTC drift must still be detected by the combined in-sync check") + } +} + // ReconcileGateway I/O paths that DON'T restart (the restart path needs // real systemd, so it's covered by the pure helper above). These pin // that a matching config is a clean no-op and that an unreadable config diff --git a/core/pkg/namespace/systemd_spawner.go b/core/pkg/namespace/systemd_spawner.go index 9fcd92c..c96d7f2 100644 --- a/core/pkg/namespace/systemd_spawner.go +++ b/core/pkg/namespace/systemd_spawner.go @@ -228,6 +228,11 @@ func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID str // random Ed25519 keys and host functions saw empty // caller_jwt_subject. ClusterSecretPath: s.clusterSecretPath, + // Bugboard #837 follow-up: forward the host's serverless secrets + // encryption key so the spawned namespace gateway can manage function + // secrets. Without this, `function secrets list` returned 501 on + // namespace gateways even though the host gateway had the key. + SecretsEncryptionKey: cfg.SecretsEncryptionKey, WebRTC: gateway.GatewayYAMLWebRTC{ Enabled: cfg.WebRTCEnabled, SFUPort: cfg.SFUPort, @@ -241,9 +246,17 @@ func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID str return fmt.Errorf("failed to marshal Gateway config: %w", err) } - if err := os.WriteFile(configPath, configBytes, 0644); err != nil { + // 0600: the gateway YAML embeds the secrets encryption key (bugboard + // #837), so it must not be world/group readable. + if err := os.WriteFile(configPath, configBytes, 0600); err != nil { return fmt.Errorf("failed to write Gateway config: %w", err) } + // WriteFile's mode only applies on CREATE — converge perms explicitly so + // a file written 0644 by an older release doesn't stay world-readable + // after an in-place rewrite. + if err := os.Chmod(configPath, 0600); err != nil { + return fmt.Errorf("failed to set Gateway config permissions: %w", err) + } s.logger.Info("Created Gateway config file", zap.String("path", configPath), @@ -333,6 +346,23 @@ func gatewayWebRTCInSync(onDisk gateway.GatewayYAMLWebRTC, cfg gateway.InstanceC onDisk.TURNDomain == cfg.TURNDomain } +// gatewayConfigInSync reports whether the full reconcile-relevant config on +// disk matches the desired config — i.e. no rewrite+restart is needed. +// Combines the WebRTC drift surface (bugboard #25) with the secrets +// encryption key (bugboard #837): a gateway that was spawned before the key +// was plumbed has an empty on-disk key and `function secrets list` returns +// 501; once the desired key is non-empty we want a rewrite+restart so the +// running gateway picks it up. +// +// Plain string equality keeps the "both empty → in sync" case a no-op: a +// namespace on a host with no secrets key (empty desired) whose on-disk key +// is also empty is in-sync, so it never restart-loops. Only a genuine +// difference (empty on-disk vs non-empty desired, or a rotated key) drifts. +func gatewayConfigInSync(onDisk gateway.GatewayYAMLConfig, cfg gateway.InstanceConfig) bool { + return gatewayWebRTCInSync(onDisk.WebRTC, cfg) && + onDisk.SecretsEncryptionKey == cfg.SecretsEncryptionKey +} + // ReconcileGateway is the WARM counterpart to SpawnGateway: when a // namespace gateway is already running, this compares its on-disk config // against the desired `cfg` and restarts it ONLY if the WebRTC block has @@ -366,18 +396,22 @@ func (s *SystemdSpawner) ReconcileGateway(ctx context.Context, namespace, nodeID return fmt.Errorf("parse gateway config for reconcile: %w", err) } - if gatewayWebRTCInSync(onDisk.WebRTC, cfg) { + if gatewayConfigInSync(onDisk, cfg) { // Already in sync — nothing to do, no restart. return nil } - s.logger.Info("Gateway WebRTC config drifted from desired; reconciling (rewrite + restart)", + // secretsKeyDrifted is logged (as a bool, never the key material) so + // operators can see when a #837 rewrite fires vs a #25 WebRTC rewrite. + secretsKeyDrifted := onDisk.SecretsEncryptionKey != cfg.SecretsEncryptionKey + s.logger.Info("Gateway config drifted from desired; reconciling (rewrite + restart)", zap.String("namespace", namespace), zap.String("node_id", nodeID), zap.Bool("ondisk_enabled", onDisk.WebRTC.Enabled), zap.Int("ondisk_sfu_port", onDisk.WebRTC.SFUPort), zap.Bool("desired_enabled", cfg.WebRTCEnabled), - zap.Int("desired_sfu_port", cfg.SFUPort)) + zap.Int("desired_sfu_port", cfg.SFUPort), + zap.Bool("secrets_key_drifted", secretsKeyDrifted)) return s.RestartGateway(ctx, namespace, nodeID, cfg) } @@ -385,13 +419,13 @@ func (s *SystemdSpawner) ReconcileGateway(ctx context.Context, namespace, nodeID type SFUInstanceConfig struct { Namespace string NodeID string - ListenAddr string // WireGuard IP:port (e.g., "10.0.0.1:30000") - MediaPortStart int // Start of RTP media port range - MediaPortEnd int // End of RTP media port range + ListenAddr string // WireGuard IP:port (e.g., "10.0.0.1:30000") + MediaPortStart int // Start of RTP media port range + MediaPortEnd int // End of RTP media port range TURNServers []sfu.TURNServerConfig // TURN servers to advertise to peers - TURNSecret string // HMAC-SHA1 shared secret - TURNCredTTL int // Credential TTL in seconds - RQLiteDSN string // Namespace-local RQLite DSN + TURNSecret string // HMAC-SHA1 shared secret + TURNCredTTL int // Credential TTL in seconds + RQLiteDSN string // Namespace-local RQLite DSN } // SpawnSFU starts an SFU instance using systemd diff --git a/core/pkg/node/gateway.go b/core/pkg/node/gateway.go index 2615674..2230bf7 100644 --- a/core/pkg/node/gateway.go +++ b/core/pkg/node/gateway.go @@ -129,6 +129,11 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { IPFSReplicationFactor: n.config.Database.IPFS.ReplicationFactor, TurnEncryptionKey: turnEncKey, ClusterSecretPath: clusterSecretPath, + // Bugboard #837 follow-up: forward the host's serverless secrets + // encryption key (read once above) so spawned namespace gateways + // can manage function secrets. Reuses the same variable the host + // gateway uses — no second file read. + SecretsEncryptionKey: secretsEncryptionKey, } clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger) clusterManager.SetLocalNodeID(gwCfg.NodePeerID) diff --git a/core/pkg/serverless/engine.go b/core/pkg/serverless/engine.go index 90caa13..85a5635 100644 --- a/core/pkg/serverless/engine.go +++ b/core/pkg/serverless/engine.go @@ -86,6 +86,11 @@ type Engine struct { // Invocation logger for metrics/debugging invocationLogger InvocationLogger + // logQueue moves invocation telemetry writes OFF the reply critical path + // (bugboard feat-27). Non-nil only when invocationLogger is set; logInvocation + // enqueues into it instead of calling invocationLogger.Log synchronously. + logQueue *invocationLogQueue + // Rate limiter rateLimiter RateLimiter } @@ -214,6 +219,14 @@ func NewEngine(cfg *Config, registry FunctionRegistry, hostServices HostServices opt(engine) } + // Start the async telemetry queue once we know whether a logger was wired + // in. Invocation logging is now OFF the reply critical path (bugboard + // feat-27): logInvocation enqueues, a single worker drains and writes with + // its own context. Without a logger there's nothing to queue. + if engine.invocationLogger != nil { + engine.logQueue = newInvocationLogQueue(engine.invocationLogger, logger) + } + // Register host functions if err := engine.registerHostModule(context.Background()); err != nil { return nil, fmt.Errorf("failed to register host module: %w", err) @@ -473,6 +486,13 @@ func (e *Engine) Invalidate(wasmCID string) { // Close shuts down the engine and releases resources. func (e *Engine) Close(ctx context.Context) error { + // Flush any pending invocation telemetry first (best-effort, bounded — + // see invocationLogQueue.Close). Losing a few records at shutdown is + // acceptable; blocking the process on telemetry is not. + if e.logQueue != nil { + e.logQueue.Close() + } + // Close all cached modules e.moduleCache.Clear(ctx) @@ -699,7 +719,16 @@ func (e *Engine) getOrCompileModule(ctx context.Context, wasmCID string) (wazero }) } -// logInvocation logs an invocation record. +// logInvocation records an invocation's telemetry. +// +// IMPORTANT behavior note (bugboard feat-27): the record is now ENQUEUED for +// asynchronous writing — it is NOT written on the reply path. A single worker +// goroutine drains the queue and writes with its own context, so a +// function_invocations row may lag the response by up to the queue drain time. +// That lag is acceptable for telemetry and is worth it: it removes ~500ms-3s +// of cross-region Raft write latency from every serverless RPC round-trip. +// `ctx` is therefore unused for the write (the request ctx dies when Execute +// returns); it is retained only to keep the call-site signature stable. // // `logBuf` is the per-invocation LogBuffer attached to ctx at Execute // start (bugboard #108 fix). When non-nil, the record's Logs field is @@ -708,7 +737,8 @@ func (e *Engine) getOrCompileModule(ctx context.Context, wasmCID string) (wazero // updated), falls back to the HostFunctions singleton via the // GetLogs() interface check — same behavior as pre-#108. func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *InvocationContext, logBuf *LogBuffer, startTime time.Time, outputSize int, status InvocationStatus, err error) { - if e.invocationLogger == nil || !e.config.LogInvocations { + _ = ctx // request context is intentionally not used for the async write + if e.logQueue == nil || !e.config.LogInvocations { return } @@ -742,9 +772,9 @@ func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *Invoca record.Logs = hf.GetLogs() } - if logErr := e.invocationLogger.Log(ctx, record); logErr != nil { - e.logger.Warn("Failed to log invocation", zap.Error(logErr)) - } + // Enqueue is non-blocking: a full queue drops the record (counted) rather + // than stalling the reply path. See invocationLogQueue.enqueue. + e.logQueue.enqueue(record) } // registerHostModule registers the Orama host functions with the wazero runtime. @@ -752,14 +782,14 @@ func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *Invoca // We expose the SAME export set under three module names: // // - "env" — canonical. Matches the WASI / TinyGo convention. The -// official SDK examples and docs use this name. +// official SDK examples and docs use this name. // - "host" — long-standing alias kept for backward compatibility. // - "orama" — alias added 2026-05-06 after multiple apps intuited the -// brand name as the import target and hit cryptic -// "module[orama] not instantiated" errors. Cheap insurance: -// a few KB of runtime metadata per alias, zero behavioral -// cost. Apps SHOULD prefer `env` going forward; `orama` is -// supported indefinitely to avoid breaking deployed code. +// brand name as the import target and hit cryptic +// "module[orama] not instantiated" errors. Cheap insurance: +// a few KB of runtime metadata per alias, zero behavioral +// cost. Apps SHOULD prefer `env` going forward; `orama` is +// supported indefinitely to avoid breaking deployed code. // // All three names resolve to identical function tables — a WASM module // can mix imports across the three with no consequence. @@ -1435,9 +1465,11 @@ func (e *Engine) hEphemeralStateClear(ctx context.Context, mod api.Module, // hPushSend is the WASM-callable wrapper for PushSend. // Inputs: -// userIDPtr/userIDLen — UTF-8 user ID to push to (within the function's -// own namespace; the namespace is server-side trusted) -// msgPtr/msgLen — JSON payload matching hostfunctions.PushSendArgs +// +// userIDPtr/userIDLen — UTF-8 user ID to push to (within the function's +// own namespace; the namespace is server-side trusted) +// msgPtr/msgLen — JSON payload matching hostfunctions.PushSendArgs +// // Returns 1 on success, 0 on error. func (e *Engine) hPushSend(ctx context.Context, mod api.Module, userIDPtr, userIDLen, msgPtr, msgLen uint32) uint32 { @@ -1508,7 +1540,16 @@ func (e *Engine) hTurnCredentials(ctx context.Context, mod api.Module) uint64 { return e.executor.WriteToGuest(ctx, mod, out) } +// maxLogMessageBytes caps a single oh.LogInfo/LogError message. A guest could +// otherwise pass its entire linear memory as one "log line", ballooning the +// per-invocation buffer (and the async invocation-log queue holding it). +// Truncation, not rejection — telemetry is best-effort. +const maxLogMessageBytes = 16 * 1024 + func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) { + if size > maxLogMessageBytes { + size = maxLogMessageBytes + } msg, ok := e.executor.ReadFromGuest(mod, ptr, size) if ok { e.hostServices.LogInfo(ctx, string(msg)) @@ -1516,6 +1557,9 @@ func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) } func (e *Engine) hLogError(ctx context.Context, mod api.Module, ptr, size uint32) { + if size > maxLogMessageBytes { + size = maxLogMessageBytes + } msg, ok := e.executor.ReadFromGuest(mod, ptr, size) if ok { e.hostServices.LogError(ctx, string(msg)) diff --git a/core/pkg/serverless/invocation_log_queue.go b/core/pkg/serverless/invocation_log_queue.go new file mode 100644 index 0000000..17e8a06 --- /dev/null +++ b/core/pkg/serverless/invocation_log_queue.go @@ -0,0 +1,148 @@ +package serverless + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" +) + +// invocationLogQueueSize bounds the number of pending invocation records held +// off the reply critical path. Telemetry must never block or OOM the data +// path: once this many records are queued, new records are DROPPED (counted) +// rather than backing up onto the caller. 4096 is generous — at a sustained +// drain rate of one cross-region Raft write per record, this absorbs multi- +// second bursts before any drop occurs. +const invocationLogQueueSize = 4096 + +// invocationLogWriteTimeout bounds a single record's write. The request +// context that produced the record is already dead by the time the worker +// drains it (Execute returned), so the worker uses its own context with this +// per-record deadline instead. +const invocationLogWriteTimeout = 10 * time.Second + +// invocationLogFlushTimeout caps how long Close waits for the worker to drain +// pending records at shutdown. Best-effort: losing telemetry at shutdown is +// acceptable, so we never block the process from exiting. +const invocationLogFlushTimeout = 5 * time.Second + +// dropWarnInterval rate-limits the "queue full, dropping" WARN so a sustained +// overload doesn't itself flood the logs. +const dropWarnInterval = 30 * time.Second + +// invocationLogQueue moves invocation telemetry OFF the reply critical path. +// +// Behavior note: records are now written ASYNCHRONOUSLY by a single worker +// goroutine, so a function_invocations row may lag the response by up to the +// queue drain time. That lag is acceptable for telemetry and is worth it — it +// removes ~500ms-3s of cross-region Raft write latency from every serverless +// RPC round-trip (bugboard feat-27). Each record's Logs are batched into a +// single multi-row INSERT by the logger impls, so a handler that emits N log +// lines no longer pays N sequential cross-region writes. +type invocationLogQueue struct { + logger *zap.Logger + sink InvocationLogger + + ch chan *InvocationRecord + wg sync.WaitGroup + + dropped atomic.Int64 + lastDropWarn atomic.Int64 // unix-nano of last drop warning emitted + closeOnce sync.Once +} + +// newInvocationLogQueue starts the single drain worker and returns the queue. +// sink is the underlying logger whose Log method performs the actual DB write; +// it is called with the worker's own context, never the request context. +func newInvocationLogQueue(sink InvocationLogger, logger *zap.Logger) *invocationLogQueue { + q := &invocationLogQueue{ + logger: logger, + sink: sink, + ch: make(chan *InvocationRecord, invocationLogQueueSize), + } + q.wg.Add(1) + go q.run() + return q +} + +// enqueue submits a record for asynchronous writing. It NEVER blocks: if the +// bounded queue is full, the record is dropped and a counter incremented, with +// a rate-limited WARN that reports the running drop count. Returns true if the +// record was accepted, false if dropped. +func (q *invocationLogQueue) enqueue(rec *InvocationRecord) bool { + if rec == nil { + return false + } + select { + case q.ch <- rec: + return true + default: + dropped := q.dropped.Add(1) + q.maybeWarnDrop(dropped) + return false + } +} + +// maybeWarnDrop emits a rate-limited WARN reporting the cumulative drop count. +func (q *invocationLogQueue) maybeWarnDrop(dropped int64) { + now := time.Now().UnixNano() + last := q.lastDropWarn.Load() + if now-last < int64(dropWarnInterval) { + return + } + if !q.lastDropWarn.CompareAndSwap(last, now) { + return + } + q.logger.Warn("invocation log queue full; dropping telemetry records", + zap.Int64("dropped_total", dropped), + zap.Int("queue_size", invocationLogQueueSize), + ) +} + +// run drains the queue, writing each record with the worker's own context and +// a per-record timeout. It exits when the channel is closed and fully drained. +func (q *invocationLogQueue) run() { + defer q.wg.Done() + for rec := range q.ch { + q.write(rec) + } +} + +// write performs a single record write with a bounded, request-independent +// context. Failures are logged (never swallowed silently) but do not stop the +// worker — telemetry loss must never cascade into the data path. +func (q *invocationLogQueue) write(rec *InvocationRecord) { + ctx, cancel := context.WithTimeout(context.Background(), invocationLogWriteTimeout) + defer cancel() + if err := q.sink.Log(ctx, rec); err != nil { + q.logger.Warn("failed to write invocation telemetry record", + zap.String("function_id", rec.FunctionID), + zap.String("request_id", rec.RequestID), + zap.Error(err), + ) + } +} + +// Close stops accepting new records and waits (bounded by +// invocationLogFlushTimeout) for the worker to flush what's already queued. +// Best-effort: if the worker doesn't finish in time, Close returns anyway so +// shutdown is never blocked by telemetry. Safe to call multiple times. +func (q *invocationLogQueue) Close() { + q.closeOnce.Do(func() { + close(q.ch) + flushed := make(chan struct{}) + go func() { + q.wg.Wait() + close(flushed) + }() + select { + case <-flushed: + case <-time.After(invocationLogFlushTimeout): + q.logger.Warn("invocation log queue flush timed out; dropping remaining telemetry", + zap.Duration("timeout", invocationLogFlushTimeout), + ) + } + }) +} diff --git a/core/pkg/serverless/invocation_log_queue_test.go b/core/pkg/serverless/invocation_log_queue_test.go new file mode 100644 index 0000000..5d6b9a6 --- /dev/null +++ b/core/pkg/serverless/invocation_log_queue_test.go @@ -0,0 +1,153 @@ +package serverless + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "go.uber.org/zap" +) + +// mockInvocationLogger is a thread-safe InvocationLogger that records every +// record it receives. blockUntil, when non-nil, makes Log block until the +// channel is closed — used to keep the worker busy and force the bounded +// queue to fill. +type mockInvocationLogger struct { + mu sync.Mutex + records []*InvocationRecord + calls atomic.Int64 + blockUntil chan struct{} +} + +func (m *mockInvocationLogger) Log(ctx context.Context, inv *InvocationRecord) error { + m.calls.Add(1) + if m.blockUntil != nil { + select { + case <-m.blockUntil: + case <-ctx.Done(): + return ctx.Err() + } + } + m.mu.Lock() + m.records = append(m.records, inv) + m.mu.Unlock() + return nil +} + +func (m *mockInvocationLogger) count() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.records) +} + +// eventually polls cond up to timeout, failing the test if it never holds. +// Avoids a fixed sleep — we wait only as long as needed. +func eventually(t *testing.T, timeout time.Duration, cond func() bool) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(time.Millisecond) + } + t.Fatalf("condition not met within %s", timeout) +} + +func TestInvocationLogQueue_enqueue_is_nonblocking_and_records_reach_logger(t *testing.T) { + sink := &mockInvocationLogger{} + q := newInvocationLogQueue(sink, zap.NewNop()) + defer q.Close() + + rec := &InvocationRecord{ID: "inv-1", FunctionID: "fn-1", RequestID: "req-1"} + if ok := q.enqueue(rec); !ok { + t.Fatal("expected enqueue to accept the record") + } + + eventually(t, time.Second, func() bool { return sink.count() == 1 }) + + sink.mu.Lock() + got := sink.records[0] + sink.mu.Unlock() + if got.ID != "inv-1" { + t.Errorf("logger received wrong record: %+v", got) + } +} + +func TestInvocationLogQueue_enqueue_nil_is_noop(t *testing.T) { + sink := &mockInvocationLogger{} + q := newInvocationLogQueue(sink, zap.NewNop()) + defer q.Close() + + if ok := q.enqueue(nil); ok { + t.Fatal("expected nil record to be rejected") + } +} + +func TestInvocationLogQueue_full_queue_drops_without_blocking_and_counts(t *testing.T) { + // Hold the worker on the first record so the bounded channel fills, then + // every further enqueue must drop (counted) without blocking. + block := make(chan struct{}) + sink := &mockInvocationLogger{blockUntil: block} + q := newInvocationLogQueue(sink, zap.NewNop()) + defer func() { + close(block) + q.Close() + }() + + // First record is pulled by the worker and blocks there. The next + // invocationLogQueueSize records fill the channel buffer. + for i := 0; i < invocationLogQueueSize+1; i++ { + _ = q.enqueue(&InvocationRecord{ID: "fill"}) + } + // Wait until the worker has actually taken the first record so the buffer + // is guaranteed full before we assert drops. + eventually(t, time.Second, func() bool { return sink.calls.Load() >= 1 }) + + // Now the channel is full; these must drop, and crucially must not block. + const extra = 50 + done := make(chan struct{}) + go func() { + for i := 0; i < extra; i++ { + if q.enqueue(&InvocationRecord{ID: "overflow"}) { + // Some may still squeak in if the worker drains; that's fine. + } + } + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("enqueue blocked on a full queue") + } + + if q.dropped.Load() == 0 { + t.Fatal("expected at least one dropped record to be counted") + } +} + +func TestInvocationLogQueue_close_flushes_pending(t *testing.T) { + sink := &mockInvocationLogger{} + q := newInvocationLogQueue(sink, zap.NewNop()) + + const n = 100 + for i := 0; i < n; i++ { + q.enqueue(&InvocationRecord{ID: "inv"}) + } + + // Close must drain everything already queued before returning. + q.Close() + + if got := sink.count(); got != n { + t.Fatalf("expected Close to flush all %d records, got %d", n, got) + } +} + +func TestInvocationLogQueue_close_is_idempotent(t *testing.T) { + sink := &mockInvocationLogger{} + q := newInvocationLogQueue(sink, zap.NewNop()) + q.Close() + q.Close() // must not panic on double close +} diff --git a/core/pkg/serverless/log_buffer.go b/core/pkg/serverless/log_buffer.go index 33eef9d..8244d44 100644 --- a/core/pkg/serverless/log_buffer.go +++ b/core/pkg/serverless/log_buffer.go @@ -41,12 +41,24 @@ func NewLogBuffer() *LogBuffer { return &LogBuffer{} } -// Append adds one log entry. Thread-safe — wazero modules aren't -// goroutine-safe in practice, but the lock makes the invariant explicit -// rather than relying on call-site discipline. +// maxLogEntriesPerInvocation caps how many log lines one invocation can +// buffer. Telemetry is best-effort; without a cap a tenant function looping +// oh.LogInfo could balloon gateway memory — amplified now that records sit +// in the async invocation-log queue (up to invocationLogQueueSize records +// resident) instead of being written and freed synchronously. +const maxLogEntriesPerInvocation = 1000 + +// Append adds one log entry, dropping silently once the per-invocation cap +// is reached (telemetry best-effort; bounds memory against log floods). +// Thread-safe — wazero modules aren't goroutine-safe in practice, but the +// lock makes the invariant explicit rather than relying on call-site +// discipline. func (b *LogBuffer) Append(entry LogEntry) { b.mu.Lock() defer b.mu.Unlock() + if len(b.entries) >= maxLogEntriesPerInvocation { + return + } b.entries = append(b.entries, entry) } diff --git a/core/pkg/serverless/log_buffer_cap_test.go b/core/pkg/serverless/log_buffer_cap_test.go new file mode 100644 index 0000000..cc5c41f --- /dev/null +++ b/core/pkg/serverless/log_buffer_cap_test.go @@ -0,0 +1,24 @@ +package serverless + +import ( + "fmt" + "testing" +) + +// Security hardening (feat-27 async-logging review): one invocation cannot +// buffer unbounded log lines — the cap bounds gateway memory while records +// sit in the async invocation-log queue. +func TestLogBuffer_capsEntriesPerInvocation(t *testing.T) { + b := NewLogBuffer() + for i := 0; i < maxLogEntriesPerInvocation+500; i++ { + b.Append(LogEntry{Level: "info", Message: fmt.Sprintf("line %d", i)}) + } + if got := b.Len(); got != maxLogEntriesPerInvocation { + t.Errorf("Len() = %d; want cap %d (excess lines must be dropped, not buffered)", got, maxLogEntriesPerInvocation) + } + // First entries are kept (drop-newest semantics). + snap := b.Snapshot() + if snap[0].Message != "line 0" { + t.Errorf("first entry = %q; want \"line 0\" (cap drops newest, keeps earliest)", snap[0].Message) + } +} diff --git a/core/pkg/serverless/log_buffer_test.go b/core/pkg/serverless/log_buffer_test.go index b61f943..79543e4 100644 --- a/core/pkg/serverless/log_buffer_test.go +++ b/core/pkg/serverless/log_buffer_test.go @@ -73,9 +73,12 @@ func TestLogBufferFromCtx_nilCtxIsSafe(t *testing.T) { // the race detector would flag this. func TestLogBuffer_concurrentAppendIsSafe(t *testing.T) { b := NewLogBuffer() + // Keep total below maxLogEntriesPerInvocation — this test pins + // race-safety (no lost writes), not the cap (covered separately in + // log_buffer_cap_test.go). const ( - writers = 16 - writesPerW = 100 + writers = 16 + writesPerW = 50 ) var wg sync.WaitGroup for w := 0; w < writers; w++ { diff --git a/core/pkg/serverless/registry.go b/core/pkg/serverless/registry.go index db8d0b5..e9a9d5b 100644 --- a/core/pkg/serverless/registry.go +++ b/core/pkg/serverless/registry.go @@ -409,28 +409,62 @@ func (r *Registry) Log(ctx context.Context, inv *InvocationRecord) error { return fmt.Errorf("failed to insert invocation record: %w", err) } - // Insert logs if any - if len(inv.Logs) > 0 { - for _, entry := range inv.Logs { - logID := uuid.New().String() - logQuery := ` - INSERT INTO function_logs ( - id, function_id, invocation_id, level, message, timestamp - ) VALUES (?, ?, ?, ?, ?, ?) - ` - _, err := r.db.Exec(ctx, logQuery, - logID, inv.FunctionID, inv.ID, entry.Level, entry.Message, entry.Timestamp, - ) - if err != nil { - r.logger.Warn("Failed to insert function log", zap.Error(err)) - // Continue with other logs - } + // Insert logs in batched multi-row INSERTs rather than one Exec per line. + // Pre-fix this loop paid one cross-region Raft write PER log line (N+1): + // a handler emitting 5 lines cost 6 sequential writes. Now a record's + // lines collapse into ceil(N/maxLogRowsPerInsert) writes (bugboard feat-27). + for _, chunk := range chunkLogEntries(inv.Logs, maxLogRowsPerInsert) { + query, args := buildFunctionLogsInsert(inv.FunctionID, inv.ID, chunk) + if _, err := r.db.Exec(ctx, query, args...); err != nil { + r.logger.Warn("Failed to insert function logs batch", zap.Error(err)) + // Continue with remaining chunks — telemetry is best-effort. } } return nil } +// maxLogRowsPerInsert caps how many function_logs rows go into a single +// multi-row INSERT statement. Keeps any one statement bounded (placeholder +// count, statement size) while still collapsing the per-line N+1 into a +// handful of writes for the common case. +const maxLogRowsPerInsert = 100 + +// chunkLogEntries splits entries into slices of at most size. Returns no +// chunks for an empty input. +func chunkLogEntries(entries []LogEntry, size int) [][]LogEntry { + if len(entries) == 0 { + return nil + } + var chunks [][]LogEntry + for i := 0; i < len(entries); i += size { + end := i + size + if end > len(entries) { + end = len(entries) + } + chunks = append(chunks, entries[i:end]) + } + return chunks +} + +// buildFunctionLogsInsert constructs a single multi-row INSERT for the given +// log entries: one VALUES tuple per entry, args flattened in column order +// (id, function_id, invocation_id, level, message, timestamp). Each row gets a +// fresh UUID id, matching the per-row behavior of the old loop. +func buildFunctionLogsInsert(functionID, invocationID string, entries []LogEntry) (string, []interface{}) { + var sb strings.Builder + sb.WriteString("INSERT INTO function_logs (id, function_id, invocation_id, level, message, timestamp) VALUES ") + args := make([]interface{}, 0, len(entries)*6) + for i, entry := range entries { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString("(?, ?, ?, ?, ?, ?)") + args = append(args, uuid.New().String(), functionID, invocationID, entry.Level, entry.Message, entry.Timestamp) + } + return sb.String(), args +} + // GetLogs retrieves logs for a function. func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) { if limit <= 0 { diff --git a/core/pkg/serverless/registry/invocation_logger.go b/core/pkg/serverless/registry/invocation_logger.go index c7f8bd7..ad38674 100644 --- a/core/pkg/serverless/registry/invocation_logger.go +++ b/core/pkg/serverless/registry/invocation_logger.go @@ -47,26 +47,61 @@ func (l *InvocationLogger) Log(ctx context.Context, inv *InvocationRecordData) e return fmt.Errorf("failed to insert invocation record: %w", err) } - if len(inv.Logs) > 0 { - for _, entry := range inv.Logs { - logID := uuid.New().String() - logQuery := ` - INSERT INTO function_logs ( - id, function_id, invocation_id, level, message, timestamp - ) VALUES (?, ?, ?, ?, ?, ?) - ` - _, err := l.db.Exec(ctx, logQuery, - logID, inv.FunctionID, inv.ID, entry.Level, entry.Message, entry.Timestamp, - ) - if err != nil { - l.logger.Warn("Failed to insert function log", zap.Error(err)) - } + // Insert logs in batched multi-row INSERTs rather than one Exec per line. + // Pre-fix this loop paid one cross-region Raft write PER log line (N+1). + // Now a record's lines collapse into ceil(N/maxLogRowsPerInsert) writes + // (bugboard feat-27). + for _, chunk := range chunkLogData(inv.Logs, maxLogRowsPerInsert) { + query, args := buildFunctionLogsInsert(inv.FunctionID, inv.ID, chunk) + if _, err := l.db.Exec(ctx, query, args...); err != nil { + l.logger.Warn("Failed to insert function logs batch", zap.Error(err)) + // Continue with remaining chunks — telemetry is best-effort. } } return nil } +// maxLogRowsPerInsert caps how many function_logs rows go into a single +// multi-row INSERT statement, bounding placeholder count and statement size +// while still collapsing the per-line N+1 into a handful of writes. +const maxLogRowsPerInsert = 100 + +// chunkLogData splits entries into slices of at most size. Returns no chunks +// for an empty input. +func chunkLogData(entries []LogData, size int) [][]LogData { + if len(entries) == 0 { + return nil + } + var chunks [][]LogData + for i := 0; i < len(entries); i += size { + end := i + size + if end > len(entries) { + end = len(entries) + } + chunks = append(chunks, entries[i:end]) + } + return chunks +} + +// buildFunctionLogsInsert constructs a single multi-row INSERT for the given +// log entries: one VALUES tuple per entry, args flattened in column order +// (id, function_id, invocation_id, level, message, timestamp). Each row gets a +// fresh UUID id, matching the per-row behavior of the old loop. +func buildFunctionLogsInsert(functionID, invocationID string, entries []LogData) (string, []interface{}) { + var sb strings.Builder + sb.WriteString("INSERT INTO function_logs (id, function_id, invocation_id, level, message, timestamp) VALUES ") + args := make([]interface{}, 0, len(entries)*6) + for i, entry := range entries { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString("(?, ?, ?, ?, ?, ?)") + args = append(args, uuid.New().String(), functionID, invocationID, entry.Level, entry.Message, entry.Timestamp) + } + return sb.String(), args +} + // GetLogs retrieves WASM-emitted log entries for a function (rows in // function_logs). Functions that don't call log_info / log_error from // their WASM code will return an empty slice here — that's expected. @@ -235,4 +270,3 @@ func (l *InvocationLogger) fetchLogsForInvocations(ctx context.Context, invocati } return out, nil } - diff --git a/core/pkg/serverless/registry/invocation_logger_batch_test.go b/core/pkg/serverless/registry/invocation_logger_batch_test.go new file mode 100644 index 0000000..36f2077 --- /dev/null +++ b/core/pkg/serverless/registry/invocation_logger_batch_test.go @@ -0,0 +1,126 @@ +package registry + +import ( + "context" + "database/sql" + "strings" + "sync" + "testing" + "time" + + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// recordingExecDB records Exec calls. It embeds rqlite.Client so only Exec is +// implemented — Log must not call any other method. +type recordingExecDB struct { + rqlite.Client + mu sync.Mutex + execs []recordedExec +} + +type recordedExec struct { + query string + args []interface{} +} + +func (d *recordingExecDB) Exec(_ context.Context, query string, args ...any) (sql.Result, error) { + d.mu.Lock() + defer d.mu.Unlock() + d.execs = append(d.execs, recordedExec{query: query, args: args}) + return recordingResult{}, nil +} + +type recordingResult struct{} + +func (recordingResult) LastInsertId() (int64, error) { return 0, nil } +func (recordingResult) RowsAffected() (int64, error) { return 1, nil } + +func TestBuildFunctionLogsInsert_shape(t *testing.T) { + ts := time.Unix(1700000000, 0).UTC() + entries := []LogData{ + {Level: "info", Message: "a", Timestamp: ts}, + {Level: "error", Message: "b", Timestamp: ts}, + } + query, args := buildFunctionLogsInsert("fn-1", "inv-1", entries) + + wantPrefix := "INSERT INTO function_logs (id, function_id, invocation_id, level, message, timestamp) VALUES " + if !strings.HasPrefix(query, wantPrefix) { + t.Fatalf("unexpected query prefix: %q", query) + } + if got, want := strings.Count(query, "(?, ?, ?, ?, ?, ?)"), 2; got != want { + t.Errorf("expected %d value tuples, got %d", want, got) + } + if got, want := len(args), 12; got != want { + t.Fatalf("expected %d args, got %d", want, got) + } + if args[1] != "fn-1" || args[2] != "inv-1" || args[3] != "info" || args[4] != "a" || args[5] != ts { + t.Errorf("row 0 args wrong: %#v", args[0:6]) + } + if args[9] != "error" || args[10] != "b" { + t.Errorf("row 1 args wrong: %#v", args[6:12]) + } +} + +func TestChunkLogData(t *testing.T) { + if got := chunkLogData(nil, 100); got != nil { + t.Errorf("expected nil for empty input, got %v", got) + } + entries := make([]LogData, 250) + chunks := chunkLogData(entries, 100) + if len(chunks) != 3 { + t.Fatalf("expected 3 chunks, got %d", len(chunks)) + } + if len(chunks[2]) != 50 { + t.Errorf("expected last chunk of 50, got %d", len(chunks[2])) + } +} + +func TestInvocationLoggerLog_batches_logs(t *testing.T) { + db := &recordingExecDB{} + il := NewInvocationLogger(db, zap.NewNop()) + + logs := make([]LogData, 5) + for i := range logs { + logs[i] = LogData{Level: "info", Message: "x", Timestamp: time.Now()} + } + inv := &InvocationRecordData{ID: "inv-1", FunctionID: "fn-1", Logs: logs} + + if err := il.Log(context.Background(), inv); err != nil { + t.Fatalf("Log returned error: %v", err) + } + + db.mu.Lock() + defer db.mu.Unlock() + if len(db.execs) != 2 { + t.Fatalf("expected 2 Exec calls (invocation + 1 batched logs), got %d", len(db.execs)) + } + if !strings.Contains(db.execs[1].query, "INSERT INTO function_logs") { + t.Errorf("second Exec should be batched logs, got %q", db.execs[1].query) + } + if got := strings.Count(db.execs[1].query, "(?, ?, ?, ?, ?, ?)"); got != 5 { + t.Errorf("expected 5 value tuples, got %d", got) + } +} + +func TestInvocationLoggerLog_chunks_over_cap(t *testing.T) { + db := &recordingExecDB{} + il := NewInvocationLogger(db, zap.NewNop()) + + logs := make([]LogData, maxLogRowsPerInsert+1) + for i := range logs { + logs[i] = LogData{Level: "info", Message: "x", Timestamp: time.Now()} + } + inv := &InvocationRecordData{ID: "inv-1", FunctionID: "fn-1", Logs: logs} + + if err := il.Log(context.Background(), inv); err != nil { + t.Fatalf("Log returned error: %v", err) + } + + db.mu.Lock() + defer db.mu.Unlock() + if len(db.execs) != 3 { + t.Fatalf("expected 3 Exec calls (invocation + 2 chunked logs), got %d", len(db.execs)) + } +} diff --git a/core/pkg/serverless/registry_log_batch_test.go b/core/pkg/serverless/registry_log_batch_test.go new file mode 100644 index 0000000..33af4ad --- /dev/null +++ b/core/pkg/serverless/registry_log_batch_test.go @@ -0,0 +1,154 @@ +package serverless + +import ( + "context" + "database/sql" + "strings" + "sync" + "testing" + "time" + + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// recordingExecClient is an rqlite.Client that records every Exec call. It +// embeds the interface so we only override Exec; calling any other method is a +// test bug (will nil-panic), which is what we want — Log must only Exec. +type recordingExecClient struct { + rqlite.Client + mu sync.Mutex + execs []recordedExec +} + +type recordedExec struct { + query string + args []interface{} +} + +func (c *recordingExecClient) Exec(_ context.Context, query string, args ...any) (sql.Result, error) { + c.mu.Lock() + defer c.mu.Unlock() + c.execs = append(c.execs, recordedExec{query: query, args: args}) + return &recordingResult{}, nil +} + +type recordingResult struct{} + +func (recordingResult) LastInsertId() (int64, error) { return 0, nil } +func (recordingResult) RowsAffected() (int64, error) { return 1, nil } + +func TestBuildFunctionLogsInsert_multi_row_shape(t *testing.T) { + ts := time.Unix(1700000000, 0).UTC() + entries := []LogEntry{ + {Level: "info", Message: "a", Timestamp: ts}, + {Level: "error", Message: "b", Timestamp: ts}, + } + query, args := buildFunctionLogsInsert("fn-1", "inv-1", entries) + + wantPrefix := "INSERT INTO function_logs (id, function_id, invocation_id, level, message, timestamp) VALUES " + if !strings.HasPrefix(query, wantPrefix) { + t.Fatalf("unexpected query prefix: %q", query) + } + if got, want := strings.Count(query, "(?, ?, ?, ?, ?, ?)"), 2; got != want { + t.Errorf("expected %d value tuples, got %d in %q", want, got, query) + } + if got, want := len(args), 2*6; got != want { + t.Fatalf("expected %d args, got %d", want, got) + } + // Row 0: id (generated), function_id, invocation_id, level, message, timestamp. + if args[1] != "fn-1" || args[2] != "inv-1" || args[3] != "info" || args[4] != "a" || args[5] != ts { + t.Errorf("row 0 args wrong: %#v", args[0:6]) + } + if args[7] != "fn-1" || args[8] != "inv-1" || args[9] != "error" || args[10] != "b" || args[11] != ts { + t.Errorf("row 1 args wrong: %#v", args[6:12]) + } + // Generated IDs must be present and distinct. + if args[0] == "" || args[6] == "" || args[0] == args[6] { + t.Errorf("expected distinct non-empty generated IDs, got %v and %v", args[0], args[6]) + } +} + +func TestChunkLogEntries(t *testing.T) { + if got := chunkLogEntries(nil, 100); got != nil { + t.Errorf("expected nil for empty input, got %v", got) + } + entries := make([]LogEntry, 250) + chunks := chunkLogEntries(entries, 100) + if len(chunks) != 3 { + t.Fatalf("expected ceil(250/100)=3 chunks, got %d", len(chunks)) + } + if len(chunks[0]) != 100 || len(chunks[1]) != 100 || len(chunks[2]) != 50 { + t.Errorf("unexpected chunk sizes: %d %d %d", len(chunks[0]), len(chunks[1]), len(chunks[2])) + } +} + +func TestRegistryLog_batches_logs_into_ceil_div_exec_calls(t *testing.T) { + db := &recordingExecClient{} + r := NewRegistry(db, nil, RegistryConfig{}, zap.NewNop()) + + // 5 log lines should collapse to: 1 invocation INSERT + 1 logs INSERT = 2 Execs. + logs := make([]LogEntry, 5) + for i := range logs { + logs[i] = LogEntry{Level: "info", Message: "x", Timestamp: time.Now()} + } + inv := &InvocationRecord{ID: "inv-1", FunctionID: "fn-1", Logs: logs} + + if err := r.Log(context.Background(), inv); err != nil { + t.Fatalf("Log returned error: %v", err) + } + + db.mu.Lock() + defer db.mu.Unlock() + if len(db.execs) != 2 { + t.Fatalf("expected 2 Exec calls (1 invocation + 1 batched logs), got %d", len(db.execs)) + } + if !strings.HasPrefix(db.execs[0].query, "\n\t\tINSERT INTO function_invocations") && + !strings.Contains(db.execs[0].query, "function_invocations") { + t.Errorf("first Exec should be the invocation insert, got %q", db.execs[0].query) + } + if !strings.Contains(db.execs[1].query, "INSERT INTO function_logs") { + t.Errorf("second Exec should be the batched logs insert, got %q", db.execs[1].query) + } + if got := strings.Count(db.execs[1].query, "(?, ?, ?, ?, ?, ?)"); got != 5 { + t.Errorf("expected 5 value tuples in the batched logs insert, got %d", got) + } +} + +func TestRegistryLog_chunks_logs_over_cap(t *testing.T) { + db := &recordingExecClient{} + r := NewRegistry(db, nil, RegistryConfig{}, zap.NewNop()) + + // maxLogRowsPerInsert+1 lines => ceil((cap+1)/cap)=2 logs INSERTs, plus + // the single invocation INSERT = 3 Execs total. + n := maxLogRowsPerInsert + 1 + logs := make([]LogEntry, n) + for i := range logs { + logs[i] = LogEntry{Level: "info", Message: "x", Timestamp: time.Now()} + } + inv := &InvocationRecord{ID: "inv-1", FunctionID: "fn-1", Logs: logs} + + if err := r.Log(context.Background(), inv); err != nil { + t.Fatalf("Log returned error: %v", err) + } + + db.mu.Lock() + defer db.mu.Unlock() + if len(db.execs) != 3 { + t.Fatalf("expected 3 Exec calls (1 invocation + 2 chunked logs), got %d", len(db.execs)) + } +} + +func TestRegistryLog_no_logs_single_exec(t *testing.T) { + db := &recordingExecClient{} + r := NewRegistry(db, nil, RegistryConfig{}, zap.NewNop()) + + if err := r.Log(context.Background(), &InvocationRecord{ID: "inv-1", FunctionID: "fn-1"}); err != nil { + t.Fatalf("Log returned error: %v", err) + } + db.mu.Lock() + defer db.mu.Unlock() + if len(db.execs) != 1 { + t.Fatalf("expected only the invocation Exec, got %d", len(db.execs)) + } +}