mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-06-16 22:54:12 +00:00
feat(gateway): implement persistent secrets and webrtc configuration
- add `secrets_encryption_key` to gateway config for serverless secrets - implement durable TURN secret persistence to prevent config regen outages - add regression test for gateway config loading and field mapping
This commit is contained in:
parent
4c631243b3
commit
ff3e273da8
@ -92,6 +92,12 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
|
||||
IPFSTimeout string `yaml:"ipfs_timeout"`
|
||||
IPFSReplicationFactor int `yaml:"ipfs_replication_factor"`
|
||||
WebRTC yamlWebRTCCfg `yaml:"webrtc"`
|
||||
// SecretsEncryptionKey: see GatewayYAMLConfig docstring. Optional;
|
||||
// when set, the standalone gateway populates
|
||||
// cfg.SecretsEncryptionKey so serverless function secrets can be
|
||||
// encrypted/decrypted (bugboard #837 follow-up). Empty leaves
|
||||
// secrets management disabled (fail-loud).
|
||||
SecretsEncryptionKey string `yaml:"secrets_encryption_key"`
|
||||
// ClusterSecretPath: see GatewayYAMLConfig docstring. Optional;
|
||||
// when set, the standalone gateway reads the file at this path
|
||||
// and populates cfg.ClusterSecret so JWT signing keys can be
|
||||
@ -229,6 +235,16 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
|
||||
}
|
||||
}
|
||||
|
||||
// Serverless secrets encryption key — bugboard #837 follow-up. The
|
||||
// host-managed gateway (pkg/node/gateway.go) reads this from
|
||||
// secrets/secrets-encryption-key; the standalone binary used by namespace
|
||||
// gateways via systemd receives it through this YAML field. Without it,
|
||||
// `function secrets list` returned 501 ("Secrets management not
|
||||
// available") on namespace gateways even though the host had the key.
|
||||
if v := strings.TrimSpace(y.SecretsEncryptionKey); v != "" {
|
||||
cfg.SecretsEncryptionKey = v
|
||||
}
|
||||
|
||||
// WebRTC configuration
|
||||
cfg.WebRTCEnabled = y.WebRTC.Enabled
|
||||
if y.WebRTC.SFUPort > 0 {
|
||||
|
||||
70
core/cmd/gateway/config_secrets_test.go
Normal file
70
core/cmd/gateway/config_secrets_test.go
Normal file
@ -0,0 +1,70 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/config"
|
||||
"github.com/DeBrosOfficial/network/pkg/gateway"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// TestSpawnedGatewayConfig_loadsSecretsEncryptionKey is the bugboard #837
|
||||
// follow-up regression test for the *load* half: a YAML written by the
|
||||
// namespace gateway spawner (gateway.GatewayYAMLConfig with the secrets key)
|
||||
// must (a) pass the standalone gateway's STRICT decoder — i.e. the
|
||||
// secrets_encryption_key field is a known field, not rejected — and (b) end
|
||||
// up in gateway.Config.SecretsEncryptionKey via the same trim/assign the real
|
||||
// parseGatewayConfig uses. Without the load mapping, `function secrets list`
|
||||
// returned 501 on namespace gateways.
|
||||
func TestSpawnedGatewayConfig_loadsSecretsEncryptionKey(t *testing.T) {
|
||||
const key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
|
||||
// Produce the exact YAML a spawned namespace gateway receives.
|
||||
written := gateway.GatewayYAMLConfig{
|
||||
ListenAddr: ":6001",
|
||||
ClientNamespace: "anchat-test",
|
||||
RQLiteDSN: "http://localhost:10000",
|
||||
OlricServers: []string{"localhost:3320"},
|
||||
SecretsEncryptionKey: key,
|
||||
}
|
||||
data, err := yaml.Marshal(written)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal: %v", err)
|
||||
}
|
||||
|
||||
// yamlCfgMirror mirrors the function-local yamlCfg in config.go. If the
|
||||
// real loader's field/tag drifts, the round-trip assertion below fails.
|
||||
type webrtc struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
SFUPort int `yaml:"sfu_port"`
|
||||
TURNDomain string `yaml:"turn_domain"`
|
||||
TURNSecret string `yaml:"turn_secret"`
|
||||
}
|
||||
type yamlCfgMirror struct {
|
||||
ListenAddr string `yaml:"listen_addr"`
|
||||
ClientNamespace string `yaml:"client_namespace"`
|
||||
RQLiteDSN string `yaml:"rqlite_dsn"`
|
||||
OlricServers []string `yaml:"olric_servers"`
|
||||
WebRTC webrtc `yaml:"webrtc"`
|
||||
SecretsEncryptionKey string `yaml:"secrets_encryption_key"`
|
||||
ClusterSecretPath string `yaml:"cluster_secret_path"`
|
||||
}
|
||||
|
||||
var y yamlCfgMirror
|
||||
// STRICT decode — the real loader rejects unknown fields, so this proves
|
||||
// secrets_encryption_key is recognized.
|
||||
if err := config.DecodeStrict(strings.NewReader(string(data)), &y); err != nil {
|
||||
t.Fatalf("strict decode rejected the spawned gateway YAML: %v", err)
|
||||
}
|
||||
|
||||
// Apply the same trim/assign as parseGatewayConfig.
|
||||
cfg := &gateway.Config{}
|
||||
if v := strings.TrimSpace(y.SecretsEncryptionKey); v != "" {
|
||||
cfg.SecretsEncryptionKey = v
|
||||
}
|
||||
|
||||
if cfg.SecretsEncryptionKey != key {
|
||||
t.Errorf("gateway.Config.SecretsEncryptionKey = %q, want %q", cfg.SecretsEncryptionKey, key)
|
||||
}
|
||||
}
|
||||
@ -485,6 +485,14 @@ func (o *Orchestrator) saveSecretsFromJoinResponse(resp *joinhandlers.JoinRespon
|
||||
}
|
||||
}
|
||||
|
||||
// Write TURN shared secret (feat-124 #913) — identical on every node so
|
||||
// WebRTC TURN credentials validate cluster-wide and survive config regen.
|
||||
if resp.TURNSecret != "" {
|
||||
if err := os.WriteFile(filepath.Join(secretsDir, "turn-secret"), []byte(resp.TURNSecret), 0600); err != nil {
|
||||
return fmt.Errorf("failed to write turn-secret: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Write IPFS Cluster trusted peer IDs
|
||||
if len(resp.IPFSClusterPeerIDs) > 0 {
|
||||
content := strings.Join(resp.IPFSClusterPeerIDs, "\n") + "\n"
|
||||
|
||||
@ -16,8 +16,16 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// defaultSFUSignalingPort is the SFU signaling port the namespace gateway
|
||||
// proxies WebRTC traffic to when an existing node.yaml did not record one.
|
||||
// Mirrors pkg/namespace.SFUSignalingPortRangeStart (30000); kept as a local
|
||||
// constant to avoid importing the namespace package (which other agents own
|
||||
// and which would create a dependency cycle here).
|
||||
const defaultSFUSignalingPort = 30000
|
||||
|
||||
// ConfigGenerator manages generation of node, gateway, and service configs
|
||||
type ConfigGenerator struct {
|
||||
oramaDir string
|
||||
@ -212,9 +220,127 @@ func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP stri
|
||||
data.SecretsEncryptionKey = strings.TrimSpace(string(keyBytes))
|
||||
}
|
||||
|
||||
// WebRTC/TURN config (feat-124 #913). The TURN secret lives in the secrets
|
||||
// dir so it survives Phase4 config regeneration; turn_domain/sfu_port/enabled
|
||||
// are operator-set values that only exist in the previous node.yaml, so we
|
||||
// carry them forward from the existing on-disk config. Without this, a regen
|
||||
// wipes the operator's manually-added webrtc block and the namespace
|
||||
// reconciler restarts gateways with an empty TURN secret (the outage).
|
||||
if err := cg.populateWebRTCConfig(&data); err != nil {
|
||||
return "", fmt.Errorf("failed to populate webrtc config: %w", err)
|
||||
}
|
||||
|
||||
return templates.RenderNodeConfig(data)
|
||||
}
|
||||
|
||||
// existingWebRTC is the minimal shape parsed out of an existing node.yaml to
|
||||
// carry forward operator-set WebRTC fields across a config regeneration.
|
||||
type existingWebRTC struct {
|
||||
Enabled bool
|
||||
SFUPort int
|
||||
TURNDomain string
|
||||
TURNSecret string
|
||||
}
|
||||
|
||||
// populateWebRTCConfig fills the WebRTC fields on data so the rendered node.yaml
|
||||
// preserves operator TURN configuration across regenerations.
|
||||
//
|
||||
// Sources, in order of authority:
|
||||
// - turn_secret: the persisted secrets/turn-secret file (durable, survives
|
||||
// regen). If absent but the existing node.yaml carried a secret, that secret
|
||||
// is persisted to the file so it becomes durable from now on.
|
||||
// - turn_domain / sfu_port / enabled: carried forward from the existing
|
||||
// node.yaml's http_gateway.webrtc block (operator-set, not in secrets).
|
||||
//
|
||||
// If there is no persisted secret and no existing webrtc block, WebRTC is left
|
||||
// disabled and the template renders nothing.
|
||||
func (cg *ConfigGenerator) populateWebRTCConfig(data *templates.NodeConfigData) error {
|
||||
existing := cg.readExistingWebRTC()
|
||||
|
||||
// Resolve the TURN secret: persisted file wins; otherwise adopt the secret
|
||||
// from the existing node.yaml and persist it so it is durable.
|
||||
secret := ""
|
||||
secretPath := filepath.Join(cg.oramaDir, "secrets", "turn-secret")
|
||||
if b, err := os.ReadFile(secretPath); err == nil {
|
||||
secret = strings.TrimSpace(string(b))
|
||||
}
|
||||
if secret == "" && existing != nil && existing.TURNSecret != "" {
|
||||
secret = existing.TURNSecret
|
||||
if err := cg.persistTURNSecret(secret); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if secret == "" {
|
||||
// No durable secret and nothing to adopt — leave WebRTC disabled.
|
||||
return nil
|
||||
}
|
||||
|
||||
data.TURNSecret = secret
|
||||
data.WebRTCEnabled = true
|
||||
|
||||
if existing != nil {
|
||||
data.TURNDomain = existing.TURNDomain
|
||||
data.SFUPort = existing.SFUPort
|
||||
}
|
||||
if data.SFUPort == 0 {
|
||||
data.SFUPort = defaultSFUSignalingPort
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// readExistingWebRTC parses just the http_gateway.webrtc block out of the
|
||||
// existing node.yaml. Absence of the file or block is tolerated (returns nil).
|
||||
func (cg *ConfigGenerator) readExistingWebRTC() *existingWebRTC {
|
||||
configPath := filepath.Join(cg.oramaDir, "configs", "node.yaml")
|
||||
raw, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return nil // No existing config (fresh install) — nothing to carry forward.
|
||||
}
|
||||
|
||||
var parsed struct {
|
||||
HTTPGateway struct {
|
||||
WebRTC struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
SFUPort int `yaml:"sfu_port"`
|
||||
TURNDomain string `yaml:"turn_domain"`
|
||||
TURNSecret string `yaml:"turn_secret"`
|
||||
} `yaml:"webrtc"`
|
||||
} `yaml:"http_gateway"`
|
||||
}
|
||||
if err := yaml.Unmarshal(raw, &parsed); err != nil {
|
||||
return nil // Malformed/old config — don't fail regen; just nothing to carry.
|
||||
}
|
||||
|
||||
wb := parsed.HTTPGateway.WebRTC
|
||||
if !wb.Enabled && wb.SFUPort == 0 && wb.TURNDomain == "" && wb.TURNSecret == "" {
|
||||
return nil // No webrtc block present.
|
||||
}
|
||||
return &existingWebRTC{
|
||||
Enabled: wb.Enabled,
|
||||
SFUPort: wb.SFUPort,
|
||||
TURNDomain: wb.TURNDomain,
|
||||
TURNSecret: wb.TURNSecret,
|
||||
}
|
||||
}
|
||||
|
||||
// persistTURNSecret writes the TURN secret to the secrets dir with 0600 perms
|
||||
// and correct ownership, making it durable across future config regenerations.
|
||||
func (cg *ConfigGenerator) persistTURNSecret(secret string) error {
|
||||
secretPath := filepath.Join(cg.oramaDir, "secrets", "turn-secret")
|
||||
if err := os.MkdirAll(filepath.Dir(secretPath), 0700); err != nil {
|
||||
return fmt.Errorf("failed to create secrets directory: %w", err)
|
||||
}
|
||||
if err := os.WriteFile(secretPath, []byte(secret), 0600); err != nil {
|
||||
return fmt.Errorf("failed to persist TURN secret: %w", err)
|
||||
}
|
||||
if err := ensureSecretFilePermissions(secretPath); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GenerateVaultConfig generates vault.yaml configuration for the Vault Guardian.
|
||||
// The vault config uses key=value format (not YAML, despite the file extension).
|
||||
// Peer discovery is dynamic via RQLite — no static peer list needed.
|
||||
@ -532,6 +658,57 @@ func (sg *SecretGenerator) EnsureSecretsEncryptionKey() (string, error) {
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// EnsureTURNSecret gets or generates the HMAC-SHA1 shared secret used to mint
|
||||
// TURN credentials for WebRTC (the http_gateway.webrtc.turn_secret field).
|
||||
// The secret is a 32-byte random value stored as 64 hex characters.
|
||||
//
|
||||
// It MUST be identical on every namespace-gateway node in a cluster and stable
|
||||
// across restarts AND config regenerations — otherwise the namespace reconciler
|
||||
// sees drift (desired vs on-disk) and restarts gateways with an empty secret,
|
||||
// which makes turn.credentials return namespace_not_configured (feat-124 #913,
|
||||
// the AnChat outage). Persisting the secret to the secrets dir is what lets it
|
||||
// survive Phase4 config regeneration: GenerateNodeConfig reads this file rather
|
||||
// than relying on the (regenerated-from-template) node.yaml. Joining nodes
|
||||
// receive the value through the join flow rather than generating their own.
|
||||
func (sg *SecretGenerator) EnsureTURNSecret() (string, error) {
|
||||
secretPath := filepath.Join(sg.oramaDir, "secrets", "turn-secret")
|
||||
secretDir := filepath.Dir(secretPath)
|
||||
|
||||
if err := os.MkdirAll(secretDir, 0700); err != nil {
|
||||
return "", fmt.Errorf("failed to create secrets directory: %w", err)
|
||||
}
|
||||
if err := os.Chmod(secretDir, 0700); err != nil {
|
||||
return "", fmt.Errorf("failed to set secrets directory permissions: %w", err)
|
||||
}
|
||||
|
||||
// Try to read existing secret
|
||||
if data, err := os.ReadFile(secretPath); err == nil {
|
||||
secret := strings.TrimSpace(string(data))
|
||||
if len(secret) == 64 {
|
||||
if err := ensureSecretFilePermissions(secretPath); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return secret, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Generate new secret (32 bytes = 64 hex chars)
|
||||
secretBytes := make([]byte, 32)
|
||||
if _, err := rand.Read(secretBytes); err != nil {
|
||||
return "", fmt.Errorf("failed to generate TURN secret: %w", err)
|
||||
}
|
||||
secret := hex.EncodeToString(secretBytes)
|
||||
|
||||
if err := os.WriteFile(secretPath, []byte(secret), 0600); err != nil {
|
||||
return "", fmt.Errorf("failed to save TURN secret: %w", err)
|
||||
}
|
||||
if err := ensureSecretFilePermissions(secretPath); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return secret, nil
|
||||
}
|
||||
|
||||
func ensureSecretFilePermissions(secretPath string) error {
|
||||
if err := os.Chmod(secretPath, 0600); err != nil {
|
||||
return fmt.Errorf("failed to set permissions on %s: %w", secretPath, err)
|
||||
|
||||
@ -599,6 +599,14 @@ func (ps *ProductionSetup) Phase3GenerateSecrets() error {
|
||||
}
|
||||
ps.logf(" ✓ Secrets encryption key ensured")
|
||||
|
||||
// WebRTC TURN shared secret (feat-124 #913). Persisting it here lets the
|
||||
// TURN config survive Phase4 config regeneration so namespace gateways are
|
||||
// never restarted with an empty turn_secret (the AnChat outage).
|
||||
if _, err := ps.secretGenerator.EnsureTURNSecret(); err != nil {
|
||||
return fmt.Errorf("failed to ensure TURN secret: %w", err)
|
||||
}
|
||||
ps.logf(" ✓ TURN secret ensured")
|
||||
|
||||
// Node identity (unified architecture)
|
||||
peerID, err := ps.secretGenerator.EnsureNodeIdentity()
|
||||
if err != nil {
|
||||
|
||||
190
core/pkg/environments/production/turn_secret_test.go
Normal file
190
core/pkg/environments/production/turn_secret_test.go
Normal file
@ -0,0 +1,190 @@
|
||||
package production
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestEnsureTURNSecret_generatesAndPersists verifies that a fresh oramaDir
|
||||
// produces a valid 32-byte hex secret written to secrets/turn-secret.
|
||||
func TestEnsureTURNSecret_generatesAndPersists(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
sg := NewSecretGenerator(dir)
|
||||
|
||||
secret, err := sg.EnsureTURNSecret()
|
||||
if err != nil {
|
||||
t.Fatalf("EnsureTURNSecret failed: %v", err)
|
||||
}
|
||||
if len(secret) != 64 {
|
||||
t.Fatalf("expected 64 hex chars, got %d (%q)", len(secret), secret)
|
||||
}
|
||||
raw, err := hex.DecodeString(secret)
|
||||
if err != nil || len(raw) != 32 {
|
||||
t.Fatalf("secret is not 32 bytes hex: err=%v len=%d", err, len(raw))
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(filepath.Join(dir, "secrets", "turn-secret"))
|
||||
if err != nil {
|
||||
t.Fatalf("reading persisted secret failed: %v", err)
|
||||
}
|
||||
if strings.TrimSpace(string(data)) != secret {
|
||||
t.Errorf("persisted secret %q != returned secret %q", strings.TrimSpace(string(data)), secret)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnsureTURNSecret_idempotent verifies the secret is stable across calls —
|
||||
// the property that keeps TURN credentials valid across restarts and identical
|
||||
// across cluster nodes (feat-124 #913).
|
||||
func TestEnsureTURNSecret_idempotent(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
sg := NewSecretGenerator(dir)
|
||||
|
||||
first, err := sg.EnsureTURNSecret()
|
||||
if err != nil {
|
||||
t.Fatalf("first call failed: %v", err)
|
||||
}
|
||||
second, err := sg.EnsureTURNSecret()
|
||||
if err != nil {
|
||||
t.Fatalf("second call failed: %v", err)
|
||||
}
|
||||
if first != second {
|
||||
t.Errorf("secret changed between calls: %q != %q", first, second)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnsureTURNSecret_regeneratesInvalid verifies a corrupt/short on-disk
|
||||
// secret is replaced with a fresh valid one.
|
||||
func TestEnsureTURNSecret_regeneratesInvalid(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
secretsDir := filepath.Join(dir, "secrets")
|
||||
if err := os.MkdirAll(secretsDir, 0700); err != nil {
|
||||
t.Fatalf("mkdir failed: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(secretsDir, "turn-secret"), []byte("too-short"), 0600); err != nil {
|
||||
t.Fatalf("write failed: %v", err)
|
||||
}
|
||||
|
||||
sg := NewSecretGenerator(dir)
|
||||
secret, err := sg.EnsureTURNSecret()
|
||||
if err != nil {
|
||||
t.Fatalf("EnsureTURNSecret failed: %v", err)
|
||||
}
|
||||
if len(secret) != 64 {
|
||||
t.Errorf("expected regenerated 64-char secret, got %d (%q)", len(secret), secret)
|
||||
}
|
||||
}
|
||||
|
||||
// writeNodeYAML is a test helper that writes content to the canonical node
|
||||
// config path the config generator reads/writes.
|
||||
func writeNodeYAML(t *testing.T, oramaDir, content string) {
|
||||
t.Helper()
|
||||
configDir := filepath.Join(oramaDir, "configs")
|
||||
if err := os.MkdirAll(configDir, 0755); err != nil {
|
||||
t.Fatalf("mkdir configs failed: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(configDir, "node.yaml"), []byte(content), 0644); err != nil {
|
||||
t.Fatalf("write node.yaml failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGenerateNodeConfig_preservesExistingWebRTC is the regression test for the
|
||||
// feat-124 #913 outage: a regen must NOT wipe an operator's webrtc block. We
|
||||
// write a node.yaml with a full webrtc block, regenerate, and assert the block
|
||||
// (enabled, sfu_port, turn_domain, turn_secret) survives — and that the secret
|
||||
// gets persisted to the durable secrets file.
|
||||
func TestGenerateNodeConfig_preservesExistingWebRTC(t *testing.T) {
|
||||
const turnSecret = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
const turnDomain = "turn.ns-anchat.dbrs.space"
|
||||
|
||||
dir := t.TempDir()
|
||||
writeNodeYAML(t, dir, `http_gateway:
|
||||
enabled: true
|
||||
webrtc:
|
||||
enabled: true
|
||||
sfu_port: 30007
|
||||
turn_domain: "turn.ns-anchat.dbrs.space"
|
||||
turn_secret: "`+turnSecret+`"
|
||||
`)
|
||||
|
||||
cg := NewConfigGenerator(dir)
|
||||
out, err := cg.GenerateNodeConfig(nil, "10.0.0.5", "", "node-1.dbrs.space", "dbrs.space", false)
|
||||
if err != nil {
|
||||
t.Fatalf("GenerateNodeConfig failed: %v", err)
|
||||
}
|
||||
|
||||
for _, want := range []string{
|
||||
"webrtc:",
|
||||
"turn_secret: \"" + turnSecret + "\"",
|
||||
"turn_domain: \"" + turnDomain + "\"",
|
||||
"sfu_port: 30007",
|
||||
} {
|
||||
if !strings.Contains(out, want) {
|
||||
t.Errorf("regenerated node.yaml missing %q\n---\n%s", want, out)
|
||||
}
|
||||
}
|
||||
|
||||
// The secret must now be durable in the secrets file (yaml-had-secret →
|
||||
// file gets persisted), so the NEXT regen survives even if the operator's
|
||||
// yaml is gone.
|
||||
persisted, err := os.ReadFile(filepath.Join(dir, "secrets", "turn-secret"))
|
||||
if err != nil {
|
||||
t.Fatalf("TURN secret was not persisted to secrets dir: %v", err)
|
||||
}
|
||||
if strings.TrimSpace(string(persisted)) != turnSecret {
|
||||
t.Errorf("persisted secret %q != yaml secret %q", strings.TrimSpace(string(persisted)), turnSecret)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGenerateNodeConfig_persistedSecretSurvivesWipedYAML verifies the durable
|
||||
// mechanism: once the secret is in secrets/turn-secret, a regen from a node.yaml
|
||||
// that LOST its webrtc block still renders turn_secret (defaulting sfu_port).
|
||||
func TestGenerateNodeConfig_persistedSecretSurvivesWipedYAML(t *testing.T) {
|
||||
const turnSecret = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"
|
||||
|
||||
dir := t.TempDir()
|
||||
secretsDir := filepath.Join(dir, "secrets")
|
||||
if err := os.MkdirAll(secretsDir, 0700); err != nil {
|
||||
t.Fatalf("mkdir secrets failed: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(secretsDir, "turn-secret"), []byte(turnSecret), 0600); err != nil {
|
||||
t.Fatalf("write turn-secret failed: %v", err)
|
||||
}
|
||||
// Existing node.yaml with NO webrtc block (simulates the wiped state).
|
||||
writeNodeYAML(t, dir, "http_gateway:\n enabled: true\n")
|
||||
|
||||
cg := NewConfigGenerator(dir)
|
||||
out, err := cg.GenerateNodeConfig(nil, "10.0.0.5", "", "node-1.dbrs.space", "dbrs.space", false)
|
||||
if err != nil {
|
||||
t.Fatalf("GenerateNodeConfig failed: %v", err)
|
||||
}
|
||||
|
||||
if !strings.Contains(out, "turn_secret: \""+turnSecret+"\"") {
|
||||
t.Errorf("rendered node.yaml missing persisted turn_secret\n---\n%s", out)
|
||||
}
|
||||
// sfu_port had no source → defaults to the named constant.
|
||||
if !strings.Contains(out, "sfu_port: 30000") {
|
||||
t.Errorf("expected default sfu_port 30000, got:\n%s", out)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGenerateNodeConfig_noWebRTCOmitsBlock verifies clusters without any TURN
|
||||
// config render no webrtc block at all (no empty values leak in).
|
||||
func TestGenerateNodeConfig_noWebRTCOmitsBlock(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
cg := NewConfigGenerator(dir)
|
||||
|
||||
out, err := cg.GenerateNodeConfig(nil, "10.0.0.5", "", "node-1.dbrs.space", "dbrs.space", false)
|
||||
if err != nil {
|
||||
t.Fatalf("GenerateNodeConfig failed: %v", err)
|
||||
}
|
||||
if strings.Contains(out, "webrtc:") {
|
||||
t.Errorf("expected no webrtc block when no TURN config present, got:\n%s", out)
|
||||
}
|
||||
// Sanity: ensure no orphan turn-secret file was created.
|
||||
if _, err := os.Stat(filepath.Join(dir, "secrets", "turn-secret")); !os.IsNotExist(err) {
|
||||
t.Errorf("turn-secret file should not exist when no TURN config present")
|
||||
}
|
||||
}
|
||||
@ -94,6 +94,16 @@ http_gateway:
|
||||
# (bugboard #837). Sourced from ~/.orama/secrets/secrets-encryption-key.
|
||||
secrets_encryption_key: "{{.SecretsEncryptionKey}}"
|
||||
{{- end}}
|
||||
{{- if .TURNSecret}}
|
||||
# WebRTC/TURN config (feat-124 #913). turn_secret is sourced from
|
||||
# ~/.orama/secrets/turn-secret so it survives config regeneration;
|
||||
# turn_domain/sfu_port are carried forward from the previous node.yaml.
|
||||
webrtc:
|
||||
enabled: true
|
||||
sfu_port: {{.SFUPort}}
|
||||
turn_domain: "{{.TURNDomain}}"
|
||||
turn_secret: "{{.TURNSecret}}"
|
||||
{{- end}}
|
||||
|
||||
# Routes for internal service reverse proxy (kept for backwards compatibility but not used by full gateway)
|
||||
routes: {}
|
||||
|
||||
@ -55,6 +55,17 @@ type NodeConfigData struct {
|
||||
// config (the gateway then reads the secret file directly / get_secret
|
||||
// stays disabled until the key is configured).
|
||||
SecretsEncryptionKey string
|
||||
|
||||
// WebRTC/TURN configuration, rendered under http_gateway.webrtc when
|
||||
// WebRTCEnabled is true (feat-124 #913). TURNSecret is sourced from
|
||||
// ~/.orama/secrets/turn-secret so it survives Phase4 config regeneration;
|
||||
// TURNDomain/SFUPort are operator-set values carried forward from the
|
||||
// existing node.yaml. The whole block is conditional on TURNSecret being
|
||||
// set — clusters without TURN render nothing.
|
||||
WebRTCEnabled bool // Whether to emit the webrtc block
|
||||
SFUPort int // Local SFU signaling port the gateway proxies to
|
||||
TURNDomain string // TURN domain (e.g., "turn.ns-myapp.dbrs.space")
|
||||
TURNSecret string // HMAC-SHA1 shared secret for TURN credential generation
|
||||
}
|
||||
|
||||
// GatewayConfigData holds parameters for gateway.yaml rendering
|
||||
|
||||
@ -67,6 +67,42 @@ func TestRenderNodeConfig_secretsEncryptionKey(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenderNodeConfig_webRTC(t *testing.T) {
|
||||
const secret = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
|
||||
// Happy path: TURN secret present → full webrtc block rendered.
|
||||
withWebRTC, err := RenderNodeConfig(NodeConfigData{
|
||||
NodeID: "node1",
|
||||
WebRTCEnabled: true,
|
||||
SFUPort: 30007,
|
||||
TURNDomain: "turn.ns-anchat.dbrs.space",
|
||||
TURNSecret: secret,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("RenderNodeConfig failed: %v", err)
|
||||
}
|
||||
for _, want := range []string{
|
||||
"webrtc:",
|
||||
"enabled: true",
|
||||
"sfu_port: 30007",
|
||||
"turn_domain: \"turn.ns-anchat.dbrs.space\"",
|
||||
"turn_secret: \"" + secret + "\"",
|
||||
} {
|
||||
if !strings.Contains(withWebRTC, want) {
|
||||
t.Errorf("rendered node config missing webrtc line %q\n---\n%s", want, withWebRTC)
|
||||
}
|
||||
}
|
||||
|
||||
// Edge case: no TURN secret → block omitted entirely.
|
||||
withoutWebRTC, err := RenderNodeConfig(NodeConfigData{NodeID: "node1"})
|
||||
if err != nil {
|
||||
t.Fatalf("RenderNodeConfig failed: %v", err)
|
||||
}
|
||||
if strings.Contains(withoutWebRTC, "webrtc:") {
|
||||
t.Errorf("empty TURN secret should omit webrtc block, got:\n%s", withoutWebRTC)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenderGatewayConfig(t *testing.T) {
|
||||
bootstrapMultiaddr := "/ip4/127.0.0.1/tcp/4001/p2p/Qm1234567890"
|
||||
data := GatewayConfigData{
|
||||
|
||||
@ -42,6 +42,9 @@ type JoinResponse struct {
|
||||
// Serverless secrets encryption key (bugboard #837) — must be identical on
|
||||
// every node so namespace function secrets decrypt cluster-wide.
|
||||
SecretsEncryptionKey string `json:"secrets_encryption_key,omitempty"`
|
||||
// TURN shared secret (feat-124 #913) — must be identical on every node so
|
||||
// WebRTC TURN credentials validate cluster-wide.
|
||||
TURNSecret string `json:"turn_secret,omitempty"`
|
||||
|
||||
// Cluster join info (all using WG IPs)
|
||||
RQLiteJoinAddress string `json:"rqlite_join_address"`
|
||||
@ -210,6 +213,13 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
|
||||
secretsEncryptionKey = strings.TrimSpace(string(data))
|
||||
}
|
||||
|
||||
// Read TURN shared secret (optional — may not exist on older clusters;
|
||||
// feat-124 #913)
|
||||
turnSecret := ""
|
||||
if data, err := os.ReadFile(h.oramaDir + "/secrets/turn-secret"); err == nil {
|
||||
turnSecret = strings.TrimSpace(string(data))
|
||||
}
|
||||
|
||||
// 7. Get this node's WG IP (needed before peer list to check self-inclusion)
|
||||
myWGIP, err := h.getMyWGIP()
|
||||
if err != nil {
|
||||
@ -282,6 +292,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
|
||||
RQLitePassword: rqlitePassword,
|
||||
OlricEncryptionKey: olricEncryptionKey,
|
||||
SecretsEncryptionKey: secretsEncryptionKey,
|
||||
TURNSecret: turnSecret,
|
||||
RQLiteJoinAddress: fmt.Sprintf("%s:7001", myWGIP),
|
||||
IPFSPeer: ipfsPeer,
|
||||
IPFSClusterPeer: ipfsClusterPeer,
|
||||
|
||||
@ -53,6 +53,9 @@ type SpawnRequest struct {
|
||||
GatewaySFUPort int `json:"gateway_sfu_port,omitempty"`
|
||||
GatewayTURNDomain string `json:"gateway_turn_domain,omitempty"`
|
||||
GatewayTURNSecret string `json:"gateway_turn_secret,omitempty"`
|
||||
// Host serverless secrets encryption key forwarded to the spawned
|
||||
// namespace gateway (bugboard #837 follow-up). Same value on every node.
|
||||
GatewaySecretsEncryptionKey string `json:"gateway_secrets_encryption_key,omitempty"`
|
||||
|
||||
// SFU config (when action = "spawn-sfu")
|
||||
SFUListenAddr string `json:"sfu_listen_addr,omitempty"`
|
||||
@ -235,6 +238,7 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
SFUPort: req.GatewaySFUPort,
|
||||
TURNDomain: req.GatewayTURNDomain,
|
||||
TURNSecret: req.GatewayTURNSecret,
|
||||
SecretsEncryptionKey: req.GatewaySecretsEncryptionKey,
|
||||
}
|
||||
if err := h.systemdSpawner.SpawnGateway(ctx, req.Namespace, req.NodeID, cfg); err != nil {
|
||||
h.logger.Error("Failed to spawn Gateway instance", zap.Error(err))
|
||||
@ -288,6 +292,7 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
SFUPort: req.GatewaySFUPort,
|
||||
TURNDomain: req.GatewayTURNDomain,
|
||||
TURNSecret: req.GatewayTURNSecret,
|
||||
SecretsEncryptionKey: req.GatewaySecretsEncryptionKey,
|
||||
}
|
||||
if err := h.systemdSpawner.RestartGateway(ctx, req.Namespace, req.NodeID, cfg); err != nil {
|
||||
h.logger.Error("Failed to restart Gateway instance", zap.Error(err))
|
||||
|
||||
@ -10,9 +10,9 @@ import (
|
||||
"github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys"
|
||||
)
|
||||
|
||||
// Bugboard #548 — a push device must be keyed on the stable identity (rootId)
|
||||
// Bugboard #548 — a push device must be keyed on the stable identity (accountId)
|
||||
// when the app provides one, not the wallet credential that authenticated the
|
||||
// session. resolveCallerUserID prefers the `root_id` custom claim and falls
|
||||
// session. resolveCallerUserID prefers the `account_id` custom claim and falls
|
||||
// back to the JWT subject so single-credential apps keep working.
|
||||
|
||||
func reqWithClaims(t *testing.T, claims *authsvc.JWTClaims) *http.Request {
|
||||
@ -28,10 +28,10 @@ func reqWithClaims(t *testing.T, claims *authsvc.JWTClaims) *http.Request {
|
||||
func TestResolveCallerUserID_prefersRootIDClaim(t *testing.T) {
|
||||
r := reqWithClaims(t, &authsvc.JWTClaims{
|
||||
Sub: "0xWALLET",
|
||||
Custom: map[string]string{rootIDClaim: "root-uuid-123"},
|
||||
Custom: map[string]string{accountIDClaim: "root-uuid-123"},
|
||||
})
|
||||
if got := resolveCallerUserID(r); got != "root-uuid-123" {
|
||||
t.Errorf("want rootId from claim, got %q", got)
|
||||
t.Errorf("want accountId from claim, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,13 +44,13 @@ func TestResolveCallerUserID_fallsBackToSubject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestResolveCallerUserID_emptyRootIDFallsBack(t *testing.T) {
|
||||
// An empty root_id must not collapse identity to "" — fall back to subject.
|
||||
// An empty account_id must not collapse identity to "" — fall back to subject.
|
||||
r := reqWithClaims(t, &authsvc.JWTClaims{
|
||||
Sub: "0xWALLET",
|
||||
Custom: map[string]string{rootIDClaim: ""},
|
||||
Custom: map[string]string{accountIDClaim: ""},
|
||||
})
|
||||
if got := resolveCallerUserID(r); got != "0xWALLET" {
|
||||
t.Errorf("want wallet fallback on empty root_id, got %q", got)
|
||||
t.Errorf("want wallet fallback on empty account_id, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -141,17 +141,19 @@ func resolveNamespace(r *http.Request) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// rootIDClaim is the custom JWT claim an app may set to carry the stable
|
||||
// identity (rootId) that a device should be keyed on, independent of which
|
||||
// wallet credential authenticated the session. See bugboard #548.
|
||||
const rootIDClaim = "root_id"
|
||||
// accountIDClaim is the custom JWT claim an app may set to carry the stable
|
||||
// account identity (e.g. anchat's users.user_id) that a device should be
|
||||
// keyed on, independent of which wallet credential authenticated the
|
||||
// session. Injected at mint time by the namespace's claims-provider hook.
|
||||
// See bugboard #548 (name agreed in comment #906/#920).
|
||||
const accountIDClaim = "account_id"
|
||||
|
||||
// resolveCallerUserID extracts the identity a push device should be keyed on.
|
||||
//
|
||||
// In a multi-credential app (anchat), the JWT subject is the *wallet* — a
|
||||
// credential, not the identity. A single user (rootId) with N linked wallets
|
||||
// would otherwise register N device rows and receive N duplicate pushes
|
||||
// (bugboard #548). When the app includes a stable `root_id` custom claim, we
|
||||
// (bugboard #548). When the app includes a stable `account_id` custom claim, we
|
||||
// key on that; otherwise we fall back to the subject (wallet) so single-
|
||||
// credential apps and older tokens keep working unchanged.
|
||||
//
|
||||
@ -159,7 +161,7 @@ const rootIDClaim = "root_id"
|
||||
func resolveCallerUserID(r *http.Request) string {
|
||||
if v := r.Context().Value(ctxkeys.JWT); v != nil {
|
||||
if claims, ok := v.(*auth.JWTClaims); ok && claims != nil {
|
||||
if rootID, ok := claims.Custom[rootIDClaim]; ok && rootID != "" {
|
||||
if rootID, ok := claims.Custom[accountIDClaim]; ok && rootID != "" {
|
||||
return rootID
|
||||
}
|
||||
return claims.Sub
|
||||
|
||||
@ -95,6 +95,15 @@ type InstanceConfig struct {
|
||||
SFUPort int // SFU signaling port on this node
|
||||
TURNDomain string // TURN server domain (e.g., "turn.ns-alice.orama-devnet.network")
|
||||
TURNSecret string // TURN shared secret for credential generation
|
||||
// SecretsEncryptionKey is the host-wide AES-256 serverless secrets
|
||||
// encryption key (hex-encoded). Bugboard #837 follow-up: the host gateway
|
||||
// receives this via gateway.Config but spawned namespace gateways never
|
||||
// did, so `function secrets list` returned 501 on namespaces. It is the
|
||||
// SAME value on every node — read once from the host's
|
||||
// secrets/secrets-encryption-key file — and must be identical across the
|
||||
// namespace cluster so a secret encrypted by one gateway decrypts on
|
||||
// another. Empty means secrets management stays disabled (fail-loud).
|
||||
SecretsEncryptionKey string
|
||||
}
|
||||
|
||||
// GatewayYAMLWebRTC represents the webrtc section of the gateway YAML config.
|
||||
@ -125,6 +134,13 @@ type GatewayYAMLConfig struct {
|
||||
IPFSTimeout string `yaml:"ipfs_timeout,omitempty"`
|
||||
IPFSReplicationFactor int `yaml:"ipfs_replication_factor,omitempty"`
|
||||
WebRTC GatewayYAMLWebRTC `yaml:"webrtc,omitempty"`
|
||||
// SecretsEncryptionKey carries the host's serverless secrets encryption
|
||||
// key into the spawned namespace gateway so it can decrypt/encrypt
|
||||
// function secrets (bugboard #837 follow-up). The standalone gateway
|
||||
// binary loads this back into gateway.Config.SecretsEncryptionKey on
|
||||
// startup. Because this is key material, generateConfig writes the file
|
||||
// 0600. Empty omits the field (secrets management stays disabled).
|
||||
SecretsEncryptionKey string `yaml:"secrets_encryption_key,omitempty"`
|
||||
// ClusterSecretPath points to the host's cluster-secret file. Bug #215
|
||||
// follow-up: namespace gateways spawned by systemd previously had no
|
||||
// way to access the cluster secret, so they fell back to per-node
|
||||
@ -323,6 +339,7 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig,
|
||||
TURNDomain: cfg.TURNDomain,
|
||||
TURNSecret: cfg.TURNSecret,
|
||||
},
|
||||
SecretsEncryptionKey: cfg.SecretsEncryptionKey,
|
||||
}
|
||||
// Set Olric timeout if provided
|
||||
if cfg.OlricTimeout > 0 {
|
||||
@ -341,12 +358,24 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig,
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.WriteFile(configPath, data, 0644); err != nil {
|
||||
// 0600: this YAML now embeds the serverless secrets encryption key
|
||||
// (bugboard #837), so it must not be world/group readable.
|
||||
if err := os.WriteFile(configPath, data, 0600); err != nil {
|
||||
return &InstanceError{
|
||||
Message: "failed to write Gateway config",
|
||||
Cause: err,
|
||||
}
|
||||
}
|
||||
// WriteFile's mode only applies on CREATE — a pre-existing file (e.g.
|
||||
// written 0644 by an older release) keeps its old perms on rewrite.
|
||||
// Converge explicitly so upgraded nodes don't leave the embedded
|
||||
// secrets key group/world-readable.
|
||||
if err := os.Chmod(configPath, 0600); err != nil {
|
||||
return &InstanceError{
|
||||
Message: "failed to set Gateway config permissions",
|
||||
Cause: err,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1,9 +1,12 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
@ -65,6 +68,114 @@ func TestGatewayYAMLConfig_clusterSecretPathRoundTrip(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestGatewayYAMLConfig_secretsEncryptionKeyRoundTrip is the regression test
|
||||
// for the bugboard #837 follow-up: the host gateway received the serverless
|
||||
// secrets encryption key but namespace gateways spawned via systemd did not,
|
||||
// because the YAML schema had no field to carry it — so `function secrets
|
||||
// list` returned 501 on those namespaces. This guards the yaml tag and that
|
||||
// the standalone gateway's yamlCfg mirror can read it back.
|
||||
func TestGatewayYAMLConfig_secretsEncryptionKeyRoundTrip(t *testing.T) {
|
||||
const key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
cfg := GatewayYAMLConfig{
|
||||
ListenAddr: ":6001",
|
||||
ClientNamespace: "anchat-test",
|
||||
RQLiteDSN: "http://localhost:10000",
|
||||
OlricServers: []string{"localhost:3320"},
|
||||
SecretsEncryptionKey: key,
|
||||
}
|
||||
out, err := yaml.Marshal(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal: %v", err)
|
||||
}
|
||||
if !strings.Contains(string(out), "secrets_encryption_key: "+key) {
|
||||
t.Fatalf("YAML output missing expected secrets_encryption_key line:\n%s", out)
|
||||
}
|
||||
|
||||
// Mirror of cmd/gateway/config.go's yamlCfg so this test catches drift
|
||||
// between the two declarations (the standalone gateway uses strict
|
||||
// decoding and would reject an unknown field).
|
||||
type webrtc struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
SFUPort int `yaml:"sfu_port"`
|
||||
TURNDomain string `yaml:"turn_domain"`
|
||||
TURNSecret string `yaml:"turn_secret"`
|
||||
}
|
||||
type yamlCfgMirror struct {
|
||||
ListenAddr string `yaml:"listen_addr"`
|
||||
ClientNamespace string `yaml:"client_namespace"`
|
||||
RQLiteDSN string `yaml:"rqlite_dsn"`
|
||||
OlricServers []string `yaml:"olric_servers"`
|
||||
WebRTC webrtc `yaml:"webrtc"`
|
||||
SecretsEncryptionKey string `yaml:"secrets_encryption_key"`
|
||||
ClusterSecretPath string `yaml:"cluster_secret_path"`
|
||||
}
|
||||
var parsed yamlCfgMirror
|
||||
if err := yaml.Unmarshal(out, &parsed); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
if parsed.SecretsEncryptionKey != key {
|
||||
t.Errorf("round-trip mismatch: got %q, want %q", parsed.SecretsEncryptionKey, key)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGatewayYAMLConfig_secretsKeyOmitWhenEmpty: a host with no secrets key
|
||||
// (legacy/test rigs) must not emit a stray secrets_encryption_key line that
|
||||
// operators could mistake for an empty-key directive.
|
||||
func TestGatewayYAMLConfig_secretsKeyOmitWhenEmpty(t *testing.T) {
|
||||
cfg := GatewayYAMLConfig{
|
||||
ListenAddr: ":6001",
|
||||
ClientNamespace: "ns",
|
||||
RQLiteDSN: "http://localhost:10000",
|
||||
OlricServers: []string{"localhost:3320"},
|
||||
// SecretsEncryptionKey intentionally empty.
|
||||
}
|
||||
out, err := yaml.Marshal(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal: %v", err)
|
||||
}
|
||||
if strings.Contains(string(out), "secrets_encryption_key") {
|
||||
t.Errorf("empty SecretsEncryptionKey should be omitted from YAML; got:\n%s", out)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGenerateConfig_writesSecretsKeyWith0600 verifies the spawned namespace
|
||||
// gateway YAML carries the secrets key AND is written 0600 (the file now
|
||||
// holds key material — bugboard #837).
|
||||
func TestGenerateConfig_writesSecretsKeyWith0600(t *testing.T) {
|
||||
const key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
dir := t.TempDir()
|
||||
is := NewInstanceSpawner(dir, zap.NewNop())
|
||||
configPath := filepath.Join(dir, "gateway-node-1.yaml")
|
||||
|
||||
cfg := InstanceConfig{
|
||||
Namespace: "anchat-test",
|
||||
NodeID: "node-1",
|
||||
HTTPPort: 6001,
|
||||
RQLiteDSN: "http://localhost:10000",
|
||||
OlricServers: []string{"localhost:3320"},
|
||||
SecretsEncryptionKey: key,
|
||||
}
|
||||
if err := is.generateConfig(configPath, cfg, dir); err != nil {
|
||||
t.Fatalf("generateConfig: %v", err)
|
||||
}
|
||||
|
||||
info, err := os.Stat(configPath)
|
||||
if err != nil {
|
||||
t.Fatalf("stat: %v", err)
|
||||
}
|
||||
if perm := info.Mode().Perm(); perm != 0600 {
|
||||
t.Errorf("config perms = %o, want 0600 (file holds the secrets key)", perm)
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read: %v", err)
|
||||
}
|
||||
if !strings.Contains(string(data), "secrets_encryption_key: "+key) {
|
||||
t.Errorf("generated config missing secrets_encryption_key:\n%s", data)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGatewayYAMLConfig_omitWhenEmpty: when the host has no cluster secret,
|
||||
// the field is omitted from the YAML so legacy single-node test rigs don't
|
||||
// see a stray "cluster_secret_path: " line that operators might mistake for
|
||||
|
||||
@ -45,6 +45,13 @@ type ClusterManagerConfig struct {
|
||||
// cluster-wide JWT signing key (bug #215 fix). Empty string disables
|
||||
// cross-node JWT verification within namespace clusters.
|
||||
ClusterSecretPath string
|
||||
|
||||
// SecretsEncryptionKey is the host's serverless secrets encryption key
|
||||
// (AES-256, hex-encoded), read once from secrets/secrets-encryption-key.
|
||||
// Forwarded to spawned namespace gateways so `function secrets ...`
|
||||
// works there (bugboard #837 follow-up). Empty leaves namespace-gateway
|
||||
// secrets management disabled (fail-loud).
|
||||
SecretsEncryptionKey string
|
||||
}
|
||||
|
||||
// ClusterManager orchestrates namespace cluster provisioning and lifecycle
|
||||
@ -72,6 +79,10 @@ type ClusterManager struct {
|
||||
// AES-256 key for encrypting TURN secrets in RQLite (nil = plaintext)
|
||||
turnEncryptionKey []byte
|
||||
|
||||
// Host's serverless secrets encryption key, forwarded to spawned
|
||||
// namespace gateways (bugboard #837 follow-up). Empty = disabled.
|
||||
secretsEncryptionKey string
|
||||
|
||||
// Track provisioning operations
|
||||
provisioningMu sync.RWMutex
|
||||
provisioning map[string]bool // namespace -> in progress
|
||||
@ -123,6 +134,7 @@ func NewClusterManager(
|
||||
ipfsTimeout: ipfsTimeout,
|
||||
ipfsReplicationFactor: ipfsReplicationFactor,
|
||||
turnEncryptionKey: cfg.TurnEncryptionKey,
|
||||
secretsEncryptionKey: cfg.SecretsEncryptionKey,
|
||||
logger: logger.With(zap.String("component", "cluster-manager")),
|
||||
provisioning: make(map[string]bool),
|
||||
}
|
||||
@ -170,6 +182,7 @@ func NewClusterManagerWithComponents(
|
||||
ipfsTimeout: ipfsTimeout,
|
||||
ipfsReplicationFactor: ipfsReplicationFactor,
|
||||
turnEncryptionKey: cfg.TurnEncryptionKey,
|
||||
secretsEncryptionKey: cfg.SecretsEncryptionKey,
|
||||
logger: logger.With(zap.String("component", "cluster-manager")),
|
||||
provisioning: make(map[string]bool),
|
||||
}
|
||||
@ -566,6 +579,7 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name
|
||||
IPFSAPIURL: cm.ipfsAPIURL,
|
||||
IPFSTimeout: cm.ipfsTimeout,
|
||||
IPFSReplicationFactor: cm.ipfsReplicationFactor,
|
||||
SecretsEncryptionKey: cm.secretsEncryptionKey,
|
||||
}
|
||||
|
||||
var instance *gateway.GatewayInstance
|
||||
@ -681,6 +695,9 @@ func (cm *ClusterManager) spawnGatewayRemote(ctx context.Context, nodeIP string,
|
||||
"gateway_sfu_port": cfg.SFUPort,
|
||||
"gateway_turn_domain": cfg.TURNDomain,
|
||||
"gateway_turn_secret": cfg.TURNSecret,
|
||||
// Bugboard #837 follow-up: carry the host secrets encryption key to
|
||||
// the remote node so its spawned namespace gateway can manage secrets.
|
||||
"gateway_secrets_encryption_key": cfg.SecretsEncryptionKey,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -1587,6 +1604,7 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
|
||||
IPFSAPIURL: cm.ipfsAPIURL,
|
||||
IPFSTimeout: cm.ipfsTimeout,
|
||||
IPFSReplicationFactor: cm.ipfsReplicationFactor,
|
||||
SecretsEncryptionKey: cm.secretsEncryptionKey,
|
||||
}
|
||||
|
||||
// Add WebRTC config if enabled for this namespace
|
||||
@ -2022,6 +2040,7 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
|
||||
IPFSAPIURL: cm.ipfsAPIURL,
|
||||
IPFSTimeout: cm.ipfsTimeout,
|
||||
IPFSReplicationFactor: cm.ipfsReplicationFactor,
|
||||
SecretsEncryptionKey: cm.secretsEncryptionKey,
|
||||
}
|
||||
|
||||
// Resolve WebRTC config. Prefer the local state file; fall back to
|
||||
|
||||
@ -716,6 +716,9 @@ func (cm *ClusterManager) restartGatewaysWithWebRTC(
|
||||
SFUPort: sfuPort,
|
||||
TURNDomain: turnDomain,
|
||||
TURNSecret: turnSecret,
|
||||
// Bugboard #837 follow-up: preserve the secrets key on WebRTC
|
||||
// restarts so enabling WebRTC doesn't drop secrets management.
|
||||
SecretsEncryptionKey: cm.secretsEncryptionKey,
|
||||
}
|
||||
|
||||
if node.NodeID == cm.localNodeID {
|
||||
@ -764,6 +767,8 @@ func (cm *ClusterManager) restartGatewayRemote(ctx context.Context, nodeIP strin
|
||||
"gateway_sfu_port": cfg.SFUPort,
|
||||
"gateway_turn_domain": cfg.TURNDomain,
|
||||
"gateway_turn_secret": cfg.TURNSecret,
|
||||
// Bugboard #837 follow-up: preserve the secrets key on WebRTC restarts.
|
||||
"gateway_secrets_encryption_key": cfg.SecretsEncryptionKey,
|
||||
})
|
||||
if err != nil {
|
||||
cm.logger.Error("Failed to restart remote gateway with WebRTC config",
|
||||
|
||||
@ -527,6 +527,7 @@ func (cm *ClusterManager) ReplaceClusterNode(ctx context.Context, cluster *Names
|
||||
IPFSAPIURL: cm.ipfsAPIURL,
|
||||
IPFSTimeout: cm.ipfsTimeout,
|
||||
IPFSReplicationFactor: cm.ipfsReplicationFactor,
|
||||
SecretsEncryptionKey: cm.secretsEncryptionKey,
|
||||
}
|
||||
|
||||
// Add WebRTC config if enabled for this namespace
|
||||
@ -1069,6 +1070,7 @@ func (cm *ClusterManager) addNodeToCluster(
|
||||
IPFSAPIURL: cm.ipfsAPIURL,
|
||||
IPFSTimeout: cm.ipfsTimeout,
|
||||
IPFSReplicationFactor: cm.ipfsReplicationFactor,
|
||||
SecretsEncryptionKey: cm.secretsEncryptionKey,
|
||||
}
|
||||
|
||||
// Add WebRTC config if enabled for this namespace
|
||||
|
||||
@ -90,6 +90,62 @@ func TestGatewayWebRTCInSync_bothDisabled_returnsTrue(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Bugboard #837 follow-up (drift on the secrets encryption key) —
|
||||
// gatewayConfigInSync extends the bug-25 WebRTC drift check with the
|
||||
// serverless secrets key. A namespace gateway spawned before the key was
|
||||
// plumbed has an empty on-disk key; once the desired key is non-empty we
|
||||
// want a rewrite+restart so secrets management turns on. But both-empty must
|
||||
// stay a no-op so non-secrets hosts don't restart-loop.
|
||||
|
||||
func TestGatewayConfigInSync_secretsKeyMissingOnDisk_returnsFalse(t *testing.T) {
|
||||
// On-disk YAML has no secrets key (pre-#837 gateway), desired has one.
|
||||
// MUST drift so ReconcileGateway rewrites + restarts to enable secrets.
|
||||
onDisk := gateway.GatewayYAMLConfig{} // empty secrets_encryption_key
|
||||
desired := gateway.InstanceConfig{SecretsEncryptionKey: "the-key"}
|
||||
if gatewayConfigInSync(onDisk, desired) {
|
||||
t.Fatal("empty on-disk secrets key vs non-empty desired must be out-of-sync (needs restart to enable secrets)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayConfigInSync_secretsKeyMatches_returnsTrue(t *testing.T) {
|
||||
// After a reconcile, on-disk key matches desired. MUST be in-sync so the
|
||||
// next boot does not restart again (no loop).
|
||||
onDisk := gateway.GatewayYAMLConfig{SecretsEncryptionKey: "the-key"}
|
||||
desired := gateway.InstanceConfig{SecretsEncryptionKey: "the-key"}
|
||||
if !gatewayConfigInSync(onDisk, desired) {
|
||||
t.Error("matching secrets key must be in-sync (no restart) — else restart loop on every boot")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayConfigInSync_bothSecretsKeysEmpty_returnsTrue(t *testing.T) {
|
||||
// A host with no secrets key (empty desired) and an on-disk config also
|
||||
// without one MUST be in-sync — otherwise every boot would restart a
|
||||
// namespace gateway that legitimately has no secrets key.
|
||||
if !gatewayConfigInSync(gateway.GatewayYAMLConfig{}, gateway.InstanceConfig{}) {
|
||||
t.Error("empty on-disk + empty desired secrets key must be in-sync (no restart loop)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayConfigInSync_secretsKeyRotated_returnsFalse(t *testing.T) {
|
||||
// A rotated key (both non-empty but different) must drift so the rewrite
|
||||
// propagates the new key.
|
||||
onDisk := gateway.GatewayYAMLConfig{SecretsEncryptionKey: "old-key"}
|
||||
desired := gateway.InstanceConfig{SecretsEncryptionKey: "new-key"}
|
||||
if gatewayConfigInSync(onDisk, desired) {
|
||||
t.Error("rotated secrets key (old != new) must be out-of-sync")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayConfigInSync_webrtcDriftStillDetected(t *testing.T) {
|
||||
// The combined check must not lose the bug-25 WebRTC surface: WebRTC
|
||||
// drift with matching (empty) secrets keys must still report out-of-sync.
|
||||
onDisk := gateway.GatewayYAMLConfig{WebRTC: gateway.GatewayYAMLWebRTC{}}
|
||||
desired := gateway.InstanceConfig{WebRTCEnabled: true, SFUPort: 30000}
|
||||
if gatewayConfigInSync(onDisk, desired) {
|
||||
t.Error("WebRTC drift must still be detected by the combined in-sync check")
|
||||
}
|
||||
}
|
||||
|
||||
// ReconcileGateway I/O paths that DON'T restart (the restart path needs
|
||||
// real systemd, so it's covered by the pure helper above). These pin
|
||||
// that a matching config is a clean no-op and that an unreadable config
|
||||
|
||||
@ -228,6 +228,11 @@ func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID str
|
||||
// random Ed25519 keys and host functions saw empty
|
||||
// caller_jwt_subject.
|
||||
ClusterSecretPath: s.clusterSecretPath,
|
||||
// Bugboard #837 follow-up: forward the host's serverless secrets
|
||||
// encryption key so the spawned namespace gateway can manage function
|
||||
// secrets. Without this, `function secrets list` returned 501 on
|
||||
// namespace gateways even though the host gateway had the key.
|
||||
SecretsEncryptionKey: cfg.SecretsEncryptionKey,
|
||||
WebRTC: gateway.GatewayYAMLWebRTC{
|
||||
Enabled: cfg.WebRTCEnabled,
|
||||
SFUPort: cfg.SFUPort,
|
||||
@ -241,9 +246,17 @@ func (s *SystemdSpawner) SpawnGateway(ctx context.Context, namespace, nodeID str
|
||||
return fmt.Errorf("failed to marshal Gateway config: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(configPath, configBytes, 0644); err != nil {
|
||||
// 0600: the gateway YAML embeds the secrets encryption key (bugboard
|
||||
// #837), so it must not be world/group readable.
|
||||
if err := os.WriteFile(configPath, configBytes, 0600); err != nil {
|
||||
return fmt.Errorf("failed to write Gateway config: %w", err)
|
||||
}
|
||||
// WriteFile's mode only applies on CREATE — converge perms explicitly so
|
||||
// a file written 0644 by an older release doesn't stay world-readable
|
||||
// after an in-place rewrite.
|
||||
if err := os.Chmod(configPath, 0600); err != nil {
|
||||
return fmt.Errorf("failed to set Gateway config permissions: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Info("Created Gateway config file",
|
||||
zap.String("path", configPath),
|
||||
@ -333,6 +346,23 @@ func gatewayWebRTCInSync(onDisk gateway.GatewayYAMLWebRTC, cfg gateway.InstanceC
|
||||
onDisk.TURNDomain == cfg.TURNDomain
|
||||
}
|
||||
|
||||
// gatewayConfigInSync reports whether the full reconcile-relevant config on
|
||||
// disk matches the desired config — i.e. no rewrite+restart is needed.
|
||||
// Combines the WebRTC drift surface (bugboard #25) with the secrets
|
||||
// encryption key (bugboard #837): a gateway that was spawned before the key
|
||||
// was plumbed has an empty on-disk key and `function secrets list` returns
|
||||
// 501; once the desired key is non-empty we want a rewrite+restart so the
|
||||
// running gateway picks it up.
|
||||
//
|
||||
// Plain string equality keeps the "both empty → in sync" case a no-op: a
|
||||
// namespace on a host with no secrets key (empty desired) whose on-disk key
|
||||
// is also empty is in-sync, so it never restart-loops. Only a genuine
|
||||
// difference (empty on-disk vs non-empty desired, or a rotated key) drifts.
|
||||
func gatewayConfigInSync(onDisk gateway.GatewayYAMLConfig, cfg gateway.InstanceConfig) bool {
|
||||
return gatewayWebRTCInSync(onDisk.WebRTC, cfg) &&
|
||||
onDisk.SecretsEncryptionKey == cfg.SecretsEncryptionKey
|
||||
}
|
||||
|
||||
// ReconcileGateway is the WARM counterpart to SpawnGateway: when a
|
||||
// namespace gateway is already running, this compares its on-disk config
|
||||
// against the desired `cfg` and restarts it ONLY if the WebRTC block has
|
||||
@ -366,18 +396,22 @@ func (s *SystemdSpawner) ReconcileGateway(ctx context.Context, namespace, nodeID
|
||||
return fmt.Errorf("parse gateway config for reconcile: %w", err)
|
||||
}
|
||||
|
||||
if gatewayWebRTCInSync(onDisk.WebRTC, cfg) {
|
||||
if gatewayConfigInSync(onDisk, cfg) {
|
||||
// Already in sync — nothing to do, no restart.
|
||||
return nil
|
||||
}
|
||||
|
||||
s.logger.Info("Gateway WebRTC config drifted from desired; reconciling (rewrite + restart)",
|
||||
// secretsKeyDrifted is logged (as a bool, never the key material) so
|
||||
// operators can see when a #837 rewrite fires vs a #25 WebRTC rewrite.
|
||||
secretsKeyDrifted := onDisk.SecretsEncryptionKey != cfg.SecretsEncryptionKey
|
||||
s.logger.Info("Gateway config drifted from desired; reconciling (rewrite + restart)",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("node_id", nodeID),
|
||||
zap.Bool("ondisk_enabled", onDisk.WebRTC.Enabled),
|
||||
zap.Int("ondisk_sfu_port", onDisk.WebRTC.SFUPort),
|
||||
zap.Bool("desired_enabled", cfg.WebRTCEnabled),
|
||||
zap.Int("desired_sfu_port", cfg.SFUPort))
|
||||
zap.Int("desired_sfu_port", cfg.SFUPort),
|
||||
zap.Bool("secrets_key_drifted", secretsKeyDrifted))
|
||||
return s.RestartGateway(ctx, namespace, nodeID, cfg)
|
||||
}
|
||||
|
||||
|
||||
@ -129,6 +129,11 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
|
||||
IPFSReplicationFactor: n.config.Database.IPFS.ReplicationFactor,
|
||||
TurnEncryptionKey: turnEncKey,
|
||||
ClusterSecretPath: clusterSecretPath,
|
||||
// Bugboard #837 follow-up: forward the host's serverless secrets
|
||||
// encryption key (read once above) so spawned namespace gateways
|
||||
// can manage function secrets. Reuses the same variable the host
|
||||
// gateway uses — no second file read.
|
||||
SecretsEncryptionKey: secretsEncryptionKey,
|
||||
}
|
||||
clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger)
|
||||
clusterManager.SetLocalNodeID(gwCfg.NodePeerID)
|
||||
|
||||
@ -86,6 +86,11 @@ type Engine struct {
|
||||
// Invocation logger for metrics/debugging
|
||||
invocationLogger InvocationLogger
|
||||
|
||||
// logQueue moves invocation telemetry writes OFF the reply critical path
|
||||
// (bugboard feat-27). Non-nil only when invocationLogger is set; logInvocation
|
||||
// enqueues into it instead of calling invocationLogger.Log synchronously.
|
||||
logQueue *invocationLogQueue
|
||||
|
||||
// Rate limiter
|
||||
rateLimiter RateLimiter
|
||||
}
|
||||
@ -214,6 +219,14 @@ func NewEngine(cfg *Config, registry FunctionRegistry, hostServices HostServices
|
||||
opt(engine)
|
||||
}
|
||||
|
||||
// Start the async telemetry queue once we know whether a logger was wired
|
||||
// in. Invocation logging is now OFF the reply critical path (bugboard
|
||||
// feat-27): logInvocation enqueues, a single worker drains and writes with
|
||||
// its own context. Without a logger there's nothing to queue.
|
||||
if engine.invocationLogger != nil {
|
||||
engine.logQueue = newInvocationLogQueue(engine.invocationLogger, logger)
|
||||
}
|
||||
|
||||
// Register host functions
|
||||
if err := engine.registerHostModule(context.Background()); err != nil {
|
||||
return nil, fmt.Errorf("failed to register host module: %w", err)
|
||||
@ -473,6 +486,13 @@ func (e *Engine) Invalidate(wasmCID string) {
|
||||
|
||||
// Close shuts down the engine and releases resources.
|
||||
func (e *Engine) Close(ctx context.Context) error {
|
||||
// Flush any pending invocation telemetry first (best-effort, bounded —
|
||||
// see invocationLogQueue.Close). Losing a few records at shutdown is
|
||||
// acceptable; blocking the process on telemetry is not.
|
||||
if e.logQueue != nil {
|
||||
e.logQueue.Close()
|
||||
}
|
||||
|
||||
// Close all cached modules
|
||||
e.moduleCache.Clear(ctx)
|
||||
|
||||
@ -699,7 +719,16 @@ func (e *Engine) getOrCompileModule(ctx context.Context, wasmCID string) (wazero
|
||||
})
|
||||
}
|
||||
|
||||
// logInvocation logs an invocation record.
|
||||
// logInvocation records an invocation's telemetry.
|
||||
//
|
||||
// IMPORTANT behavior note (bugboard feat-27): the record is now ENQUEUED for
|
||||
// asynchronous writing — it is NOT written on the reply path. A single worker
|
||||
// goroutine drains the queue and writes with its own context, so a
|
||||
// function_invocations row may lag the response by up to the queue drain time.
|
||||
// That lag is acceptable for telemetry and is worth it: it removes ~500ms-3s
|
||||
// of cross-region Raft write latency from every serverless RPC round-trip.
|
||||
// `ctx` is therefore unused for the write (the request ctx dies when Execute
|
||||
// returns); it is retained only to keep the call-site signature stable.
|
||||
//
|
||||
// `logBuf` is the per-invocation LogBuffer attached to ctx at Execute
|
||||
// start (bugboard #108 fix). When non-nil, the record's Logs field is
|
||||
@ -708,7 +737,8 @@ func (e *Engine) getOrCompileModule(ctx context.Context, wasmCID string) (wazero
|
||||
// updated), falls back to the HostFunctions singleton via the
|
||||
// GetLogs() interface check — same behavior as pre-#108.
|
||||
func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *InvocationContext, logBuf *LogBuffer, startTime time.Time, outputSize int, status InvocationStatus, err error) {
|
||||
if e.invocationLogger == nil || !e.config.LogInvocations {
|
||||
_ = ctx // request context is intentionally not used for the async write
|
||||
if e.logQueue == nil || !e.config.LogInvocations {
|
||||
return
|
||||
}
|
||||
|
||||
@ -742,9 +772,9 @@ func (e *Engine) logInvocation(ctx context.Context, fn *Function, invCtx *Invoca
|
||||
record.Logs = hf.GetLogs()
|
||||
}
|
||||
|
||||
if logErr := e.invocationLogger.Log(ctx, record); logErr != nil {
|
||||
e.logger.Warn("Failed to log invocation", zap.Error(logErr))
|
||||
}
|
||||
// Enqueue is non-blocking: a full queue drops the record (counted) rather
|
||||
// than stalling the reply path. See invocationLogQueue.enqueue.
|
||||
e.logQueue.enqueue(record)
|
||||
}
|
||||
|
||||
// registerHostModule registers the Orama host functions with the wazero runtime.
|
||||
@ -1435,9 +1465,11 @@ func (e *Engine) hEphemeralStateClear(ctx context.Context, mod api.Module,
|
||||
|
||||
// hPushSend is the WASM-callable wrapper for PushSend.
|
||||
// Inputs:
|
||||
//
|
||||
// userIDPtr/userIDLen — UTF-8 user ID to push to (within the function's
|
||||
// own namespace; the namespace is server-side trusted)
|
||||
// msgPtr/msgLen — JSON payload matching hostfunctions.PushSendArgs
|
||||
//
|
||||
// Returns 1 on success, 0 on error.
|
||||
func (e *Engine) hPushSend(ctx context.Context, mod api.Module,
|
||||
userIDPtr, userIDLen, msgPtr, msgLen uint32) uint32 {
|
||||
@ -1508,7 +1540,16 @@ func (e *Engine) hTurnCredentials(ctx context.Context, mod api.Module) uint64 {
|
||||
return e.executor.WriteToGuest(ctx, mod, out)
|
||||
}
|
||||
|
||||
// maxLogMessageBytes caps a single oh.LogInfo/LogError message. A guest could
|
||||
// otherwise pass its entire linear memory as one "log line", ballooning the
|
||||
// per-invocation buffer (and the async invocation-log queue holding it).
|
||||
// Truncation, not rejection — telemetry is best-effort.
|
||||
const maxLogMessageBytes = 16 * 1024
|
||||
|
||||
func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32) {
|
||||
if size > maxLogMessageBytes {
|
||||
size = maxLogMessageBytes
|
||||
}
|
||||
msg, ok := e.executor.ReadFromGuest(mod, ptr, size)
|
||||
if ok {
|
||||
e.hostServices.LogInfo(ctx, string(msg))
|
||||
@ -1516,6 +1557,9 @@ func (e *Engine) hLogInfo(ctx context.Context, mod api.Module, ptr, size uint32)
|
||||
}
|
||||
|
||||
func (e *Engine) hLogError(ctx context.Context, mod api.Module, ptr, size uint32) {
|
||||
if size > maxLogMessageBytes {
|
||||
size = maxLogMessageBytes
|
||||
}
|
||||
msg, ok := e.executor.ReadFromGuest(mod, ptr, size)
|
||||
if ok {
|
||||
e.hostServices.LogError(ctx, string(msg))
|
||||
|
||||
148
core/pkg/serverless/invocation_log_queue.go
Normal file
148
core/pkg/serverless/invocation_log_queue.go
Normal file
@ -0,0 +1,148 @@
|
||||
package serverless
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// invocationLogQueueSize bounds the number of pending invocation records held
|
||||
// off the reply critical path. Telemetry must never block or OOM the data
|
||||
// path: once this many records are queued, new records are DROPPED (counted)
|
||||
// rather than backing up onto the caller. 4096 is generous — at a sustained
|
||||
// drain rate of one cross-region Raft write per record, this absorbs multi-
|
||||
// second bursts before any drop occurs.
|
||||
const invocationLogQueueSize = 4096
|
||||
|
||||
// invocationLogWriteTimeout bounds a single record's write. The request
|
||||
// context that produced the record is already dead by the time the worker
|
||||
// drains it (Execute returned), so the worker uses its own context with this
|
||||
// per-record deadline instead.
|
||||
const invocationLogWriteTimeout = 10 * time.Second
|
||||
|
||||
// invocationLogFlushTimeout caps how long Close waits for the worker to drain
|
||||
// pending records at shutdown. Best-effort: losing telemetry at shutdown is
|
||||
// acceptable, so we never block the process from exiting.
|
||||
const invocationLogFlushTimeout = 5 * time.Second
|
||||
|
||||
// dropWarnInterval rate-limits the "queue full, dropping" WARN so a sustained
|
||||
// overload doesn't itself flood the logs.
|
||||
const dropWarnInterval = 30 * time.Second
|
||||
|
||||
// invocationLogQueue moves invocation telemetry OFF the reply critical path.
|
||||
//
|
||||
// Behavior note: records are now written ASYNCHRONOUSLY by a single worker
|
||||
// goroutine, so a function_invocations row may lag the response by up to the
|
||||
// queue drain time. That lag is acceptable for telemetry and is worth it — it
|
||||
// removes ~500ms-3s of cross-region Raft write latency from every serverless
|
||||
// RPC round-trip (bugboard feat-27). Each record's Logs are batched into a
|
||||
// single multi-row INSERT by the logger impls, so a handler that emits N log
|
||||
// lines no longer pays N sequential cross-region writes.
|
||||
type invocationLogQueue struct {
|
||||
logger *zap.Logger
|
||||
sink InvocationLogger
|
||||
|
||||
ch chan *InvocationRecord
|
||||
wg sync.WaitGroup
|
||||
|
||||
dropped atomic.Int64
|
||||
lastDropWarn atomic.Int64 // unix-nano of last drop warning emitted
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// newInvocationLogQueue starts the single drain worker and returns the queue.
|
||||
// sink is the underlying logger whose Log method performs the actual DB write;
|
||||
// it is called with the worker's own context, never the request context.
|
||||
func newInvocationLogQueue(sink InvocationLogger, logger *zap.Logger) *invocationLogQueue {
|
||||
q := &invocationLogQueue{
|
||||
logger: logger,
|
||||
sink: sink,
|
||||
ch: make(chan *InvocationRecord, invocationLogQueueSize),
|
||||
}
|
||||
q.wg.Add(1)
|
||||
go q.run()
|
||||
return q
|
||||
}
|
||||
|
||||
// enqueue submits a record for asynchronous writing. It NEVER blocks: if the
|
||||
// bounded queue is full, the record is dropped and a counter incremented, with
|
||||
// a rate-limited WARN that reports the running drop count. Returns true if the
|
||||
// record was accepted, false if dropped.
|
||||
func (q *invocationLogQueue) enqueue(rec *InvocationRecord) bool {
|
||||
if rec == nil {
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case q.ch <- rec:
|
||||
return true
|
||||
default:
|
||||
dropped := q.dropped.Add(1)
|
||||
q.maybeWarnDrop(dropped)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// maybeWarnDrop emits a rate-limited WARN reporting the cumulative drop count.
|
||||
func (q *invocationLogQueue) maybeWarnDrop(dropped int64) {
|
||||
now := time.Now().UnixNano()
|
||||
last := q.lastDropWarn.Load()
|
||||
if now-last < int64(dropWarnInterval) {
|
||||
return
|
||||
}
|
||||
if !q.lastDropWarn.CompareAndSwap(last, now) {
|
||||
return
|
||||
}
|
||||
q.logger.Warn("invocation log queue full; dropping telemetry records",
|
||||
zap.Int64("dropped_total", dropped),
|
||||
zap.Int("queue_size", invocationLogQueueSize),
|
||||
)
|
||||
}
|
||||
|
||||
// run drains the queue, writing each record with the worker's own context and
|
||||
// a per-record timeout. It exits when the channel is closed and fully drained.
|
||||
func (q *invocationLogQueue) run() {
|
||||
defer q.wg.Done()
|
||||
for rec := range q.ch {
|
||||
q.write(rec)
|
||||
}
|
||||
}
|
||||
|
||||
// write performs a single record write with a bounded, request-independent
|
||||
// context. Failures are logged (never swallowed silently) but do not stop the
|
||||
// worker — telemetry loss must never cascade into the data path.
|
||||
func (q *invocationLogQueue) write(rec *InvocationRecord) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), invocationLogWriteTimeout)
|
||||
defer cancel()
|
||||
if err := q.sink.Log(ctx, rec); err != nil {
|
||||
q.logger.Warn("failed to write invocation telemetry record",
|
||||
zap.String("function_id", rec.FunctionID),
|
||||
zap.String("request_id", rec.RequestID),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops accepting new records and waits (bounded by
|
||||
// invocationLogFlushTimeout) for the worker to flush what's already queued.
|
||||
// Best-effort: if the worker doesn't finish in time, Close returns anyway so
|
||||
// shutdown is never blocked by telemetry. Safe to call multiple times.
|
||||
func (q *invocationLogQueue) Close() {
|
||||
q.closeOnce.Do(func() {
|
||||
close(q.ch)
|
||||
flushed := make(chan struct{})
|
||||
go func() {
|
||||
q.wg.Wait()
|
||||
close(flushed)
|
||||
}()
|
||||
select {
|
||||
case <-flushed:
|
||||
case <-time.After(invocationLogFlushTimeout):
|
||||
q.logger.Warn("invocation log queue flush timed out; dropping remaining telemetry",
|
||||
zap.Duration("timeout", invocationLogFlushTimeout),
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
153
core/pkg/serverless/invocation_log_queue_test.go
Normal file
153
core/pkg/serverless/invocation_log_queue_test.go
Normal file
@ -0,0 +1,153 @@
|
||||
package serverless
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// mockInvocationLogger is a thread-safe InvocationLogger that records every
|
||||
// record it receives. blockUntil, when non-nil, makes Log block until the
|
||||
// channel is closed — used to keep the worker busy and force the bounded
|
||||
// queue to fill.
|
||||
type mockInvocationLogger struct {
|
||||
mu sync.Mutex
|
||||
records []*InvocationRecord
|
||||
calls atomic.Int64
|
||||
blockUntil chan struct{}
|
||||
}
|
||||
|
||||
func (m *mockInvocationLogger) Log(ctx context.Context, inv *InvocationRecord) error {
|
||||
m.calls.Add(1)
|
||||
if m.blockUntil != nil {
|
||||
select {
|
||||
case <-m.blockUntil:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.records = append(m.records, inv)
|
||||
m.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockInvocationLogger) count() int {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return len(m.records)
|
||||
}
|
||||
|
||||
// eventually polls cond up to timeout, failing the test if it never holds.
|
||||
// Avoids a fixed sleep — we wait only as long as needed.
|
||||
func eventually(t *testing.T, timeout time.Duration, cond func() bool) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
if cond() {
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
t.Fatalf("condition not met within %s", timeout)
|
||||
}
|
||||
|
||||
func TestInvocationLogQueue_enqueue_is_nonblocking_and_records_reach_logger(t *testing.T) {
|
||||
sink := &mockInvocationLogger{}
|
||||
q := newInvocationLogQueue(sink, zap.NewNop())
|
||||
defer q.Close()
|
||||
|
||||
rec := &InvocationRecord{ID: "inv-1", FunctionID: "fn-1", RequestID: "req-1"}
|
||||
if ok := q.enqueue(rec); !ok {
|
||||
t.Fatal("expected enqueue to accept the record")
|
||||
}
|
||||
|
||||
eventually(t, time.Second, func() bool { return sink.count() == 1 })
|
||||
|
||||
sink.mu.Lock()
|
||||
got := sink.records[0]
|
||||
sink.mu.Unlock()
|
||||
if got.ID != "inv-1" {
|
||||
t.Errorf("logger received wrong record: %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvocationLogQueue_enqueue_nil_is_noop(t *testing.T) {
|
||||
sink := &mockInvocationLogger{}
|
||||
q := newInvocationLogQueue(sink, zap.NewNop())
|
||||
defer q.Close()
|
||||
|
||||
if ok := q.enqueue(nil); ok {
|
||||
t.Fatal("expected nil record to be rejected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvocationLogQueue_full_queue_drops_without_blocking_and_counts(t *testing.T) {
|
||||
// Hold the worker on the first record so the bounded channel fills, then
|
||||
// every further enqueue must drop (counted) without blocking.
|
||||
block := make(chan struct{})
|
||||
sink := &mockInvocationLogger{blockUntil: block}
|
||||
q := newInvocationLogQueue(sink, zap.NewNop())
|
||||
defer func() {
|
||||
close(block)
|
||||
q.Close()
|
||||
}()
|
||||
|
||||
// First record is pulled by the worker and blocks there. The next
|
||||
// invocationLogQueueSize records fill the channel buffer.
|
||||
for i := 0; i < invocationLogQueueSize+1; i++ {
|
||||
_ = q.enqueue(&InvocationRecord{ID: "fill"})
|
||||
}
|
||||
// Wait until the worker has actually taken the first record so the buffer
|
||||
// is guaranteed full before we assert drops.
|
||||
eventually(t, time.Second, func() bool { return sink.calls.Load() >= 1 })
|
||||
|
||||
// Now the channel is full; these must drop, and crucially must not block.
|
||||
const extra = 50
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for i := 0; i < extra; i++ {
|
||||
if q.enqueue(&InvocationRecord{ID: "overflow"}) {
|
||||
// Some may still squeak in if the worker drains; that's fine.
|
||||
}
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("enqueue blocked on a full queue")
|
||||
}
|
||||
|
||||
if q.dropped.Load() == 0 {
|
||||
t.Fatal("expected at least one dropped record to be counted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvocationLogQueue_close_flushes_pending(t *testing.T) {
|
||||
sink := &mockInvocationLogger{}
|
||||
q := newInvocationLogQueue(sink, zap.NewNop())
|
||||
|
||||
const n = 100
|
||||
for i := 0; i < n; i++ {
|
||||
q.enqueue(&InvocationRecord{ID: "inv"})
|
||||
}
|
||||
|
||||
// Close must drain everything already queued before returning.
|
||||
q.Close()
|
||||
|
||||
if got := sink.count(); got != n {
|
||||
t.Fatalf("expected Close to flush all %d records, got %d", n, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvocationLogQueue_close_is_idempotent(t *testing.T) {
|
||||
sink := &mockInvocationLogger{}
|
||||
q := newInvocationLogQueue(sink, zap.NewNop())
|
||||
q.Close()
|
||||
q.Close() // must not panic on double close
|
||||
}
|
||||
@ -41,12 +41,24 @@ func NewLogBuffer() *LogBuffer {
|
||||
return &LogBuffer{}
|
||||
}
|
||||
|
||||
// Append adds one log entry. Thread-safe — wazero modules aren't
|
||||
// goroutine-safe in practice, but the lock makes the invariant explicit
|
||||
// rather than relying on call-site discipline.
|
||||
// maxLogEntriesPerInvocation caps how many log lines one invocation can
|
||||
// buffer. Telemetry is best-effort; without a cap a tenant function looping
|
||||
// oh.LogInfo could balloon gateway memory — amplified now that records sit
|
||||
// in the async invocation-log queue (up to invocationLogQueueSize records
|
||||
// resident) instead of being written and freed synchronously.
|
||||
const maxLogEntriesPerInvocation = 1000
|
||||
|
||||
// Append adds one log entry, dropping silently once the per-invocation cap
|
||||
// is reached (telemetry best-effort; bounds memory against log floods).
|
||||
// Thread-safe — wazero modules aren't goroutine-safe in practice, but the
|
||||
// lock makes the invariant explicit rather than relying on call-site
|
||||
// discipline.
|
||||
func (b *LogBuffer) Append(entry LogEntry) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if len(b.entries) >= maxLogEntriesPerInvocation {
|
||||
return
|
||||
}
|
||||
b.entries = append(b.entries, entry)
|
||||
}
|
||||
|
||||
|
||||
24
core/pkg/serverless/log_buffer_cap_test.go
Normal file
24
core/pkg/serverless/log_buffer_cap_test.go
Normal file
@ -0,0 +1,24 @@
|
||||
package serverless
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Security hardening (feat-27 async-logging review): one invocation cannot
|
||||
// buffer unbounded log lines — the cap bounds gateway memory while records
|
||||
// sit in the async invocation-log queue.
|
||||
func TestLogBuffer_capsEntriesPerInvocation(t *testing.T) {
|
||||
b := NewLogBuffer()
|
||||
for i := 0; i < maxLogEntriesPerInvocation+500; i++ {
|
||||
b.Append(LogEntry{Level: "info", Message: fmt.Sprintf("line %d", i)})
|
||||
}
|
||||
if got := b.Len(); got != maxLogEntriesPerInvocation {
|
||||
t.Errorf("Len() = %d; want cap %d (excess lines must be dropped, not buffered)", got, maxLogEntriesPerInvocation)
|
||||
}
|
||||
// First entries are kept (drop-newest semantics).
|
||||
snap := b.Snapshot()
|
||||
if snap[0].Message != "line 0" {
|
||||
t.Errorf("first entry = %q; want \"line 0\" (cap drops newest, keeps earliest)", snap[0].Message)
|
||||
}
|
||||
}
|
||||
@ -73,9 +73,12 @@ func TestLogBufferFromCtx_nilCtxIsSafe(t *testing.T) {
|
||||
// the race detector would flag this.
|
||||
func TestLogBuffer_concurrentAppendIsSafe(t *testing.T) {
|
||||
b := NewLogBuffer()
|
||||
// Keep total below maxLogEntriesPerInvocation — this test pins
|
||||
// race-safety (no lost writes), not the cap (covered separately in
|
||||
// log_buffer_cap_test.go).
|
||||
const (
|
||||
writers = 16
|
||||
writesPerW = 100
|
||||
writesPerW = 50
|
||||
)
|
||||
var wg sync.WaitGroup
|
||||
for w := 0; w < writers; w++ {
|
||||
|
||||
@ -409,28 +409,62 @@ func (r *Registry) Log(ctx context.Context, inv *InvocationRecord) error {
|
||||
return fmt.Errorf("failed to insert invocation record: %w", err)
|
||||
}
|
||||
|
||||
// Insert logs if any
|
||||
if len(inv.Logs) > 0 {
|
||||
for _, entry := range inv.Logs {
|
||||
logID := uuid.New().String()
|
||||
logQuery := `
|
||||
INSERT INTO function_logs (
|
||||
id, function_id, invocation_id, level, message, timestamp
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
_, err := r.db.Exec(ctx, logQuery,
|
||||
logID, inv.FunctionID, inv.ID, entry.Level, entry.Message, entry.Timestamp,
|
||||
)
|
||||
if err != nil {
|
||||
r.logger.Warn("Failed to insert function log", zap.Error(err))
|
||||
// Continue with other logs
|
||||
}
|
||||
// Insert logs in batched multi-row INSERTs rather than one Exec per line.
|
||||
// Pre-fix this loop paid one cross-region Raft write PER log line (N+1):
|
||||
// a handler emitting 5 lines cost 6 sequential writes. Now a record's
|
||||
// lines collapse into ceil(N/maxLogRowsPerInsert) writes (bugboard feat-27).
|
||||
for _, chunk := range chunkLogEntries(inv.Logs, maxLogRowsPerInsert) {
|
||||
query, args := buildFunctionLogsInsert(inv.FunctionID, inv.ID, chunk)
|
||||
if _, err := r.db.Exec(ctx, query, args...); err != nil {
|
||||
r.logger.Warn("Failed to insert function logs batch", zap.Error(err))
|
||||
// Continue with remaining chunks — telemetry is best-effort.
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// maxLogRowsPerInsert caps how many function_logs rows go into a single
|
||||
// multi-row INSERT statement. Keeps any one statement bounded (placeholder
|
||||
// count, statement size) while still collapsing the per-line N+1 into a
|
||||
// handful of writes for the common case.
|
||||
const maxLogRowsPerInsert = 100
|
||||
|
||||
// chunkLogEntries splits entries into slices of at most size. Returns no
|
||||
// chunks for an empty input.
|
||||
func chunkLogEntries(entries []LogEntry, size int) [][]LogEntry {
|
||||
if len(entries) == 0 {
|
||||
return nil
|
||||
}
|
||||
var chunks [][]LogEntry
|
||||
for i := 0; i < len(entries); i += size {
|
||||
end := i + size
|
||||
if end > len(entries) {
|
||||
end = len(entries)
|
||||
}
|
||||
chunks = append(chunks, entries[i:end])
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
// buildFunctionLogsInsert constructs a single multi-row INSERT for the given
|
||||
// log entries: one VALUES tuple per entry, args flattened in column order
|
||||
// (id, function_id, invocation_id, level, message, timestamp). Each row gets a
|
||||
// fresh UUID id, matching the per-row behavior of the old loop.
|
||||
func buildFunctionLogsInsert(functionID, invocationID string, entries []LogEntry) (string, []interface{}) {
|
||||
var sb strings.Builder
|
||||
sb.WriteString("INSERT INTO function_logs (id, function_id, invocation_id, level, message, timestamp) VALUES ")
|
||||
args := make([]interface{}, 0, len(entries)*6)
|
||||
for i, entry := range entries {
|
||||
if i > 0 {
|
||||
sb.WriteString(", ")
|
||||
}
|
||||
sb.WriteString("(?, ?, ?, ?, ?, ?)")
|
||||
args = append(args, uuid.New().String(), functionID, invocationID, entry.Level, entry.Message, entry.Timestamp)
|
||||
}
|
||||
return sb.String(), args
|
||||
}
|
||||
|
||||
// GetLogs retrieves logs for a function.
|
||||
func (r *Registry) GetLogs(ctx context.Context, namespace, name string, limit int) ([]LogEntry, error) {
|
||||
if limit <= 0 {
|
||||
|
||||
@ -47,26 +47,61 @@ func (l *InvocationLogger) Log(ctx context.Context, inv *InvocationRecordData) e
|
||||
return fmt.Errorf("failed to insert invocation record: %w", err)
|
||||
}
|
||||
|
||||
if len(inv.Logs) > 0 {
|
||||
for _, entry := range inv.Logs {
|
||||
logID := uuid.New().String()
|
||||
logQuery := `
|
||||
INSERT INTO function_logs (
|
||||
id, function_id, invocation_id, level, message, timestamp
|
||||
) VALUES (?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
_, err := l.db.Exec(ctx, logQuery,
|
||||
logID, inv.FunctionID, inv.ID, entry.Level, entry.Message, entry.Timestamp,
|
||||
)
|
||||
if err != nil {
|
||||
l.logger.Warn("Failed to insert function log", zap.Error(err))
|
||||
}
|
||||
// Insert logs in batched multi-row INSERTs rather than one Exec per line.
|
||||
// Pre-fix this loop paid one cross-region Raft write PER log line (N+1).
|
||||
// Now a record's lines collapse into ceil(N/maxLogRowsPerInsert) writes
|
||||
// (bugboard feat-27).
|
||||
for _, chunk := range chunkLogData(inv.Logs, maxLogRowsPerInsert) {
|
||||
query, args := buildFunctionLogsInsert(inv.FunctionID, inv.ID, chunk)
|
||||
if _, err := l.db.Exec(ctx, query, args...); err != nil {
|
||||
l.logger.Warn("Failed to insert function logs batch", zap.Error(err))
|
||||
// Continue with remaining chunks — telemetry is best-effort.
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// maxLogRowsPerInsert caps how many function_logs rows go into a single
|
||||
// multi-row INSERT statement, bounding placeholder count and statement size
|
||||
// while still collapsing the per-line N+1 into a handful of writes.
|
||||
const maxLogRowsPerInsert = 100
|
||||
|
||||
// chunkLogData splits entries into slices of at most size. Returns no chunks
|
||||
// for an empty input.
|
||||
func chunkLogData(entries []LogData, size int) [][]LogData {
|
||||
if len(entries) == 0 {
|
||||
return nil
|
||||
}
|
||||
var chunks [][]LogData
|
||||
for i := 0; i < len(entries); i += size {
|
||||
end := i + size
|
||||
if end > len(entries) {
|
||||
end = len(entries)
|
||||
}
|
||||
chunks = append(chunks, entries[i:end])
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
// buildFunctionLogsInsert constructs a single multi-row INSERT for the given
|
||||
// log entries: one VALUES tuple per entry, args flattened in column order
|
||||
// (id, function_id, invocation_id, level, message, timestamp). Each row gets a
|
||||
// fresh UUID id, matching the per-row behavior of the old loop.
|
||||
func buildFunctionLogsInsert(functionID, invocationID string, entries []LogData) (string, []interface{}) {
|
||||
var sb strings.Builder
|
||||
sb.WriteString("INSERT INTO function_logs (id, function_id, invocation_id, level, message, timestamp) VALUES ")
|
||||
args := make([]interface{}, 0, len(entries)*6)
|
||||
for i, entry := range entries {
|
||||
if i > 0 {
|
||||
sb.WriteString(", ")
|
||||
}
|
||||
sb.WriteString("(?, ?, ?, ?, ?, ?)")
|
||||
args = append(args, uuid.New().String(), functionID, invocationID, entry.Level, entry.Message, entry.Timestamp)
|
||||
}
|
||||
return sb.String(), args
|
||||
}
|
||||
|
||||
// GetLogs retrieves WASM-emitted log entries for a function (rows in
|
||||
// function_logs). Functions that don't call log_info / log_error from
|
||||
// their WASM code will return an empty slice here — that's expected.
|
||||
@ -235,4 +270,3 @@ func (l *InvocationLogger) fetchLogsForInvocations(ctx context.Context, invocati
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
|
||||
126
core/pkg/serverless/registry/invocation_logger_batch_test.go
Normal file
126
core/pkg/serverless/registry/invocation_logger_batch_test.go
Normal file
@ -0,0 +1,126 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// recordingExecDB records Exec calls. It embeds rqlite.Client so only Exec is
|
||||
// implemented — Log must not call any other method.
|
||||
type recordingExecDB struct {
|
||||
rqlite.Client
|
||||
mu sync.Mutex
|
||||
execs []recordedExec
|
||||
}
|
||||
|
||||
type recordedExec struct {
|
||||
query string
|
||||
args []interface{}
|
||||
}
|
||||
|
||||
func (d *recordingExecDB) Exec(_ context.Context, query string, args ...any) (sql.Result, error) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
d.execs = append(d.execs, recordedExec{query: query, args: args})
|
||||
return recordingResult{}, nil
|
||||
}
|
||||
|
||||
type recordingResult struct{}
|
||||
|
||||
func (recordingResult) LastInsertId() (int64, error) { return 0, nil }
|
||||
func (recordingResult) RowsAffected() (int64, error) { return 1, nil }
|
||||
|
||||
func TestBuildFunctionLogsInsert_shape(t *testing.T) {
|
||||
ts := time.Unix(1700000000, 0).UTC()
|
||||
entries := []LogData{
|
||||
{Level: "info", Message: "a", Timestamp: ts},
|
||||
{Level: "error", Message: "b", Timestamp: ts},
|
||||
}
|
||||
query, args := buildFunctionLogsInsert("fn-1", "inv-1", entries)
|
||||
|
||||
wantPrefix := "INSERT INTO function_logs (id, function_id, invocation_id, level, message, timestamp) VALUES "
|
||||
if !strings.HasPrefix(query, wantPrefix) {
|
||||
t.Fatalf("unexpected query prefix: %q", query)
|
||||
}
|
||||
if got, want := strings.Count(query, "(?, ?, ?, ?, ?, ?)"), 2; got != want {
|
||||
t.Errorf("expected %d value tuples, got %d", want, got)
|
||||
}
|
||||
if got, want := len(args), 12; got != want {
|
||||
t.Fatalf("expected %d args, got %d", want, got)
|
||||
}
|
||||
if args[1] != "fn-1" || args[2] != "inv-1" || args[3] != "info" || args[4] != "a" || args[5] != ts {
|
||||
t.Errorf("row 0 args wrong: %#v", args[0:6])
|
||||
}
|
||||
if args[9] != "error" || args[10] != "b" {
|
||||
t.Errorf("row 1 args wrong: %#v", args[6:12])
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkLogData(t *testing.T) {
|
||||
if got := chunkLogData(nil, 100); got != nil {
|
||||
t.Errorf("expected nil for empty input, got %v", got)
|
||||
}
|
||||
entries := make([]LogData, 250)
|
||||
chunks := chunkLogData(entries, 100)
|
||||
if len(chunks) != 3 {
|
||||
t.Fatalf("expected 3 chunks, got %d", len(chunks))
|
||||
}
|
||||
if len(chunks[2]) != 50 {
|
||||
t.Errorf("expected last chunk of 50, got %d", len(chunks[2]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvocationLoggerLog_batches_logs(t *testing.T) {
|
||||
db := &recordingExecDB{}
|
||||
il := NewInvocationLogger(db, zap.NewNop())
|
||||
|
||||
logs := make([]LogData, 5)
|
||||
for i := range logs {
|
||||
logs[i] = LogData{Level: "info", Message: "x", Timestamp: time.Now()}
|
||||
}
|
||||
inv := &InvocationRecordData{ID: "inv-1", FunctionID: "fn-1", Logs: logs}
|
||||
|
||||
if err := il.Log(context.Background(), inv); err != nil {
|
||||
t.Fatalf("Log returned error: %v", err)
|
||||
}
|
||||
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
if len(db.execs) != 2 {
|
||||
t.Fatalf("expected 2 Exec calls (invocation + 1 batched logs), got %d", len(db.execs))
|
||||
}
|
||||
if !strings.Contains(db.execs[1].query, "INSERT INTO function_logs") {
|
||||
t.Errorf("second Exec should be batched logs, got %q", db.execs[1].query)
|
||||
}
|
||||
if got := strings.Count(db.execs[1].query, "(?, ?, ?, ?, ?, ?)"); got != 5 {
|
||||
t.Errorf("expected 5 value tuples, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvocationLoggerLog_chunks_over_cap(t *testing.T) {
|
||||
db := &recordingExecDB{}
|
||||
il := NewInvocationLogger(db, zap.NewNop())
|
||||
|
||||
logs := make([]LogData, maxLogRowsPerInsert+1)
|
||||
for i := range logs {
|
||||
logs[i] = LogData{Level: "info", Message: "x", Timestamp: time.Now()}
|
||||
}
|
||||
inv := &InvocationRecordData{ID: "inv-1", FunctionID: "fn-1", Logs: logs}
|
||||
|
||||
if err := il.Log(context.Background(), inv); err != nil {
|
||||
t.Fatalf("Log returned error: %v", err)
|
||||
}
|
||||
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
if len(db.execs) != 3 {
|
||||
t.Fatalf("expected 3 Exec calls (invocation + 2 chunked logs), got %d", len(db.execs))
|
||||
}
|
||||
}
|
||||
154
core/pkg/serverless/registry_log_batch_test.go
Normal file
154
core/pkg/serverless/registry_log_batch_test.go
Normal file
@ -0,0 +1,154 @@
|
||||
package serverless
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// recordingExecClient is an rqlite.Client that records every Exec call. It
|
||||
// embeds the interface so we only override Exec; calling any other method is a
|
||||
// test bug (will nil-panic), which is what we want — Log must only Exec.
|
||||
type recordingExecClient struct {
|
||||
rqlite.Client
|
||||
mu sync.Mutex
|
||||
execs []recordedExec
|
||||
}
|
||||
|
||||
type recordedExec struct {
|
||||
query string
|
||||
args []interface{}
|
||||
}
|
||||
|
||||
func (c *recordingExecClient) Exec(_ context.Context, query string, args ...any) (sql.Result, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.execs = append(c.execs, recordedExec{query: query, args: args})
|
||||
return &recordingResult{}, nil
|
||||
}
|
||||
|
||||
type recordingResult struct{}
|
||||
|
||||
func (recordingResult) LastInsertId() (int64, error) { return 0, nil }
|
||||
func (recordingResult) RowsAffected() (int64, error) { return 1, nil }
|
||||
|
||||
func TestBuildFunctionLogsInsert_multi_row_shape(t *testing.T) {
|
||||
ts := time.Unix(1700000000, 0).UTC()
|
||||
entries := []LogEntry{
|
||||
{Level: "info", Message: "a", Timestamp: ts},
|
||||
{Level: "error", Message: "b", Timestamp: ts},
|
||||
}
|
||||
query, args := buildFunctionLogsInsert("fn-1", "inv-1", entries)
|
||||
|
||||
wantPrefix := "INSERT INTO function_logs (id, function_id, invocation_id, level, message, timestamp) VALUES "
|
||||
if !strings.HasPrefix(query, wantPrefix) {
|
||||
t.Fatalf("unexpected query prefix: %q", query)
|
||||
}
|
||||
if got, want := strings.Count(query, "(?, ?, ?, ?, ?, ?)"), 2; got != want {
|
||||
t.Errorf("expected %d value tuples, got %d in %q", want, got, query)
|
||||
}
|
||||
if got, want := len(args), 2*6; got != want {
|
||||
t.Fatalf("expected %d args, got %d", want, got)
|
||||
}
|
||||
// Row 0: id (generated), function_id, invocation_id, level, message, timestamp.
|
||||
if args[1] != "fn-1" || args[2] != "inv-1" || args[3] != "info" || args[4] != "a" || args[5] != ts {
|
||||
t.Errorf("row 0 args wrong: %#v", args[0:6])
|
||||
}
|
||||
if args[7] != "fn-1" || args[8] != "inv-1" || args[9] != "error" || args[10] != "b" || args[11] != ts {
|
||||
t.Errorf("row 1 args wrong: %#v", args[6:12])
|
||||
}
|
||||
// Generated IDs must be present and distinct.
|
||||
if args[0] == "" || args[6] == "" || args[0] == args[6] {
|
||||
t.Errorf("expected distinct non-empty generated IDs, got %v and %v", args[0], args[6])
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkLogEntries(t *testing.T) {
|
||||
if got := chunkLogEntries(nil, 100); got != nil {
|
||||
t.Errorf("expected nil for empty input, got %v", got)
|
||||
}
|
||||
entries := make([]LogEntry, 250)
|
||||
chunks := chunkLogEntries(entries, 100)
|
||||
if len(chunks) != 3 {
|
||||
t.Fatalf("expected ceil(250/100)=3 chunks, got %d", len(chunks))
|
||||
}
|
||||
if len(chunks[0]) != 100 || len(chunks[1]) != 100 || len(chunks[2]) != 50 {
|
||||
t.Errorf("unexpected chunk sizes: %d %d %d", len(chunks[0]), len(chunks[1]), len(chunks[2]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryLog_batches_logs_into_ceil_div_exec_calls(t *testing.T) {
|
||||
db := &recordingExecClient{}
|
||||
r := NewRegistry(db, nil, RegistryConfig{}, zap.NewNop())
|
||||
|
||||
// 5 log lines should collapse to: 1 invocation INSERT + 1 logs INSERT = 2 Execs.
|
||||
logs := make([]LogEntry, 5)
|
||||
for i := range logs {
|
||||
logs[i] = LogEntry{Level: "info", Message: "x", Timestamp: time.Now()}
|
||||
}
|
||||
inv := &InvocationRecord{ID: "inv-1", FunctionID: "fn-1", Logs: logs}
|
||||
|
||||
if err := r.Log(context.Background(), inv); err != nil {
|
||||
t.Fatalf("Log returned error: %v", err)
|
||||
}
|
||||
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
if len(db.execs) != 2 {
|
||||
t.Fatalf("expected 2 Exec calls (1 invocation + 1 batched logs), got %d", len(db.execs))
|
||||
}
|
||||
if !strings.HasPrefix(db.execs[0].query, "\n\t\tINSERT INTO function_invocations") &&
|
||||
!strings.Contains(db.execs[0].query, "function_invocations") {
|
||||
t.Errorf("first Exec should be the invocation insert, got %q", db.execs[0].query)
|
||||
}
|
||||
if !strings.Contains(db.execs[1].query, "INSERT INTO function_logs") {
|
||||
t.Errorf("second Exec should be the batched logs insert, got %q", db.execs[1].query)
|
||||
}
|
||||
if got := strings.Count(db.execs[1].query, "(?, ?, ?, ?, ?, ?)"); got != 5 {
|
||||
t.Errorf("expected 5 value tuples in the batched logs insert, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryLog_chunks_logs_over_cap(t *testing.T) {
|
||||
db := &recordingExecClient{}
|
||||
r := NewRegistry(db, nil, RegistryConfig{}, zap.NewNop())
|
||||
|
||||
// maxLogRowsPerInsert+1 lines => ceil((cap+1)/cap)=2 logs INSERTs, plus
|
||||
// the single invocation INSERT = 3 Execs total.
|
||||
n := maxLogRowsPerInsert + 1
|
||||
logs := make([]LogEntry, n)
|
||||
for i := range logs {
|
||||
logs[i] = LogEntry{Level: "info", Message: "x", Timestamp: time.Now()}
|
||||
}
|
||||
inv := &InvocationRecord{ID: "inv-1", FunctionID: "fn-1", Logs: logs}
|
||||
|
||||
if err := r.Log(context.Background(), inv); err != nil {
|
||||
t.Fatalf("Log returned error: %v", err)
|
||||
}
|
||||
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
if len(db.execs) != 3 {
|
||||
t.Fatalf("expected 3 Exec calls (1 invocation + 2 chunked logs), got %d", len(db.execs))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryLog_no_logs_single_exec(t *testing.T) {
|
||||
db := &recordingExecClient{}
|
||||
r := NewRegistry(db, nil, RegistryConfig{}, zap.NewNop())
|
||||
|
||||
if err := r.Log(context.Background(), &InvocationRecord{ID: "inv-1", FunctionID: "fn-1"}); err != nil {
|
||||
t.Fatalf("Log returned error: %v", err)
|
||||
}
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
if len(db.execs) != 1 {
|
||||
t.Fatalf("expected only the invocation Exec, got %d", len(db.execs))
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user