diff --git a/core/pkg/cli/cmd/functioncmd/function.go b/core/pkg/cli/cmd/functioncmd/function.go index 1fcdf82..a89afd0 100644 --- a/core/pkg/cli/cmd/functioncmd/function.go +++ b/core/pkg/cli/cmd/functioncmd/function.go @@ -31,6 +31,8 @@ func init() { Cmd.AddCommand(functions.ListCmd) Cmd.AddCommand(functions.GetCmd) Cmd.AddCommand(functions.DeleteCmd) + Cmd.AddCommand(functions.DisableCmd) + Cmd.AddCommand(functions.EnableCmd) Cmd.AddCommand(functions.LogsCmd) Cmd.AddCommand(functions.VersionsCmd) Cmd.AddCommand(functions.SecretsCmd) diff --git a/core/pkg/cli/functions/enable_disable.go b/core/pkg/cli/functions/enable_disable.go new file mode 100644 index 0000000..d5634ac --- /dev/null +++ b/core/pkg/cli/functions/enable_disable.go @@ -0,0 +1,86 @@ +package functions + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/spf13/cobra" +) + +// DisableCmd pauses a function without redeploying. +// +// Plan 11.5 — operators flip a function's status during incident +// response, then re-enable when fixed. Existing in-flight invocations +// finish; new ones return 503 because the invoker treats inactive +// functions as missing. +var DisableCmd = &cobra.Command{ + Use: "disable ", + Short: "Disable a function without deleting it", + Long: `Disables a deployed function. The function row stays in the registry but +new invocations are rejected. Use 'orama function enable' to resume. + +Useful during incident response — pause a misbehaving function until you +can root-cause without losing its deployed code or version history.`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runSetEnabled(args[0], false) + }, +} + +// EnableCmd resumes a disabled function. Inverse of DisableCmd. +var EnableCmd = &cobra.Command{ + Use: "enable ", + Short: "Re-enable a previously disabled function", + Long: `Re-enables a function that was paused with 'orama function disable'.`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return runSetEnabled(args[0], true) + }, +} + +func runSetEnabled(name string, enabled bool) error { + action := "disable" + if enabled { + action = "enable" + } + resp, err := apiPostNoBody("/v1/functions/" + name + "/" + action) + if err != nil { + return err + } + verb := "disabled" + if enabled { + verb = "enabled" + } + if msg, ok := resp["message"]; ok { + fmt.Println(msg) + } else { + fmt.Printf("Function %q %s.\n", name, verb) + } + return nil +} + +// apiPostNoBody performs an authenticated POST with no body. Used by +// the disable/enable endpoints which take no payload (action is in the +// URL path). +func apiPostNoBody(endpoint string) (map[string]interface{}, error) { + resp, err := apiRequest(http.MethodPost, endpoint, nil, "") + if err != nil { + return nil, err + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API error (%d): %s", resp.StatusCode, string(respBody)) + } + var result map[string]interface{} + if err := json.Unmarshal(respBody, &result); err != nil { + return nil, fmt.Errorf("failed to parse response: %w", err) + } + return result, nil +} diff --git a/core/pkg/cli/production/upgrade/flags.go b/core/pkg/cli/production/upgrade/flags.go index 1071c79..1af397c 100644 --- a/core/pkg/cli/production/upgrade/flags.go +++ b/core/pkg/cli/production/upgrade/flags.go @@ -18,6 +18,19 @@ type Flags struct { NodeFilter string // Single node IP to upgrade (optional) Delay int // Delay in seconds between nodes during rolling upgrade + // ReexecedAfterBinarySwap is set by the orchestrator when it re-execs + // itself with the NEWLY-INSTALLED binary, post Phase 2b. The new + // process detects this flag, skips the pre-binary phases (1, 2, 2b) + // already done by the old binary, and runs Phase 3+ using its OWN + // up-to-date compiled config-generation logic. Closes bugboard #15 + // chicken-and-egg: pre-fix, Phase 4 ran with the old binary's + // compiled Phase4GenerateConfigs, so config changes only took effect + // on the NEXT rollout. + // + // Hidden flag — set programmatically by orchestrator.go via os.Args, + // not a documented user-facing option. + ReexecedAfterBinarySwap bool + // Anyone flags AnyoneClient bool AnyoneRelay bool @@ -43,6 +56,11 @@ func ParseFlags(args []string) (*Flags, error) { fs.BoolVar(&flags.RestartServices, "restart", false, "Automatically restart services after upgrade") fs.BoolVar(&flags.SkipChecks, "skip-checks", false, "Skip minimum resource checks (RAM/CPU)") + // Hidden flag — see Flags.ReexecedAfterBinarySwap doc. The fs.Bool + // registers it without exposing in help output (no .Usage doc text + // that operators would normally search for). + fs.BoolVar(&flags.ReexecedAfterBinarySwap, "reexeced-after-binary-swap", false, "") + // Remote upgrade flags fs.StringVar(&flags.Env, "env", "", "Target environment for remote rolling upgrade (devnet, testnet)") fs.StringVar(&flags.NodeFilter, "node", "", "Upgrade a single node IP only") diff --git a/core/pkg/cli/production/upgrade/orchestrator.go b/core/pkg/cli/production/upgrade/orchestrator.go index 38f3319..bc67e61 100644 --- a/core/pkg/cli/production/upgrade/orchestrator.go +++ b/core/pkg/cli/production/upgrade/orchestrator.go @@ -10,12 +10,17 @@ import ( "os/exec" "path/filepath" "strings" + "syscall" "time" "github.com/DeBrosOfficial/network/pkg/cli/utils" "github.com/DeBrosOfficial/network/pkg/environments/production" ) +// newOramaBinaryPath is the on-disk path Phase 2b installs the new +// orama binary to. Re-exec target for bugboard #15 chicken-and-egg fix. +const newOramaBinaryPath = "/opt/orama/bin/orama" + // Orchestrator manages the upgrade process type Orchestrator struct { oramaHome string @@ -98,50 +103,85 @@ func NewOrchestrator(flags *Flags) *Orchestrator { // Execute runs the upgrade process func (o *Orchestrator) Execute() error { fmt.Printf("šŸ”„ Upgrading production installation...\n") - fmt.Printf(" This will preserve existing configurations and data\n") - fmt.Printf(" Configurations will be updated to latest format\n\n") - - // Handle branch preferences - if err := o.handleBranchPreferences(); err != nil { - return err + if o.flags.ReexecedAfterBinarySwap { + fmt.Printf(" (Resumed under newly-installed binary — bug #15 chicken-and-egg fix.)\n") + fmt.Printf(" Skipping Phase 1/2/2b (already done by previous process); Phase 3+ runs here.\n") + } else { + fmt.Printf(" This will preserve existing configurations and data\n") + fmt.Printf(" Configurations will be updated to latest format\n\n") } - // Phase 1: Check prerequisites - fmt.Printf("\nšŸ“‹ Phase 1: Checking prerequisites...\n") - if err := o.setup.Phase1CheckPrerequisites(); err != nil { - return fmt.Errorf("prerequisites check failed: %w", err) - } - - // Phase 2: Provision environment - fmt.Printf("\nšŸ› ļø Phase 2: Provisioning environment...\n") - if err := o.setup.Phase2ProvisionEnvironment(); err != nil { - return fmt.Errorf("environment provisioning failed: %w", err) - } - - // Stop services before upgrading binaries - if o.setup.IsUpdate() { - if err := o.stopServices(); err != nil { + // Phases 1, 2, 2b are skipped on the re-execed run — already + // performed by the prior (old-binary) process. Phase 3 (secrets) + // onward runs here, deliberately under the new binary so Phase 4 + // (config regen, the actual point of the re-exec) uses current code. + if !o.flags.ReexecedAfterBinarySwap { + // Handle branch preferences + if err := o.handleBranchPreferences(); err != nil { return err } + + // Phase 1: Check prerequisites + fmt.Printf("\nšŸ“‹ Phase 1: Checking prerequisites...\n") + if err := o.setup.Phase1CheckPrerequisites(); err != nil { + return fmt.Errorf("prerequisites check failed: %w", err) + } + + // Phase 2: Provision environment + fmt.Printf("\nšŸ› ļø Phase 2: Provisioning environment...\n") + if err := o.setup.Phase2ProvisionEnvironment(); err != nil { + return fmt.Errorf("environment provisioning failed: %w", err) + } + + // Stop services before upgrading binaries + if o.setup.IsUpdate() { + if err := o.stopServices(); err != nil { + return err + } + } + + // Check port availability after stopping services + if err := utils.EnsurePortsAvailable("prod upgrade", utils.DefaultPorts()); err != nil { + return err + } + + // Phase 2b: Install/update binaries + fmt.Printf("\nPhase 2b: Installing/updating binaries...\n") + if err := o.setup.Phase2bInstallBinaries(); err != nil { + return fmt.Errorf("binary installation failed: %w", err) + } + + // Detect existing installation + if o.setup.IsUpdate() { + fmt.Printf(" Detected existing installation\n") + } else { + fmt.Printf(" āš ļø No existing installation detected, treating as fresh install\n") + fmt.Printf(" Use 'orama install' for fresh installation\n") + } } - // Check port availability after stopping services - if err := utils.EnsurePortsAvailable("prod upgrade", utils.DefaultPorts()); err != nil { - return err - } - - // Phase 2b: Install/update binaries - fmt.Printf("\nPhase 2b: Installing/updating binaries...\n") - if err := o.setup.Phase2bInstallBinaries(); err != nil { - return fmt.Errorf("binary installation failed: %w", err) - } - - // Detect existing installation - if o.setup.IsUpdate() { - fmt.Printf(" Detected existing installation\n") - } else { - fmt.Printf(" āš ļø No existing installation detected, treating as fresh install\n") - fmt.Printf(" Use 'orama install' for fresh installation\n") + // Bugboard #15 fix — chicken-and-egg. + // + // Up to here we are still running the OLD orama binary's compiled + // code. The next phases (3 secrets, 4 configs, 5 systemd) include + // Phase4GenerateConfigs which is COMPILED into this process. If we + // keep running, those phases use OLD logic and any config-shape + // changes shipped in this release only take effect on the NEXT + // upgrade. + // + // Re-exec the just-installed binary with the same args + a hidden + // marker so it skips the pre-binary phases (already done above) and + // runs Phase 3+ with its OWN up-to-date code. syscall.Exec replaces + // this process — control never returns past it on success. + if !o.flags.ReexecedAfterBinarySwap { + if err := o.reexecAfterBinarySwap(); err != nil { + // Soft-fail: log and continue with old-binary phases as a + // fallback. Operator gets a clear warning that the chicken- + // and-egg fix didn't apply for this run. + fmt.Fprintf(os.Stderr, "āš ļø Could not re-exec post-binary-swap (%v); "+ + "continuing with current binary — config changes from this release "+ + "may only take effect on the NEXT upgrade. See bugboard #15.\n", err) + } } // Phase 3: Ensure secrets exist @@ -604,6 +644,45 @@ func (o *Orchestrator) extractGatewayConfig() (enableHTTPS bool, domain string, return enableHTTPS, domain, baseDomain } +// reexecAfterBinarySwap replaces this process with the newly-installed +// orama binary at /opt/orama/bin/orama, preserving all original CLI args +// and appending --reexeced-after-binary-swap so the new process knows +// to skip the pre-binary phases. Bugboard #15 chicken-and-egg fix. +// +// Returns nil only when syscall.Exec is about to take effect; on success +// the function never actually returns (the process image is replaced). +// On any failure before the exec syscall, returns the wrapping error so +// the caller can fall back to running the rest of the upgrade with the +// old binary (with a warning). +func (o *Orchestrator) reexecAfterBinarySwap() error { + if _, err := os.Stat(newOramaBinaryPath); err != nil { + return fmt.Errorf("new binary not found at %s: %w", newOramaBinaryPath, err) + } + // Defensive: don't re-exec ourselves into a loop if the install + // somehow placed our currently-running binary at that path. Compare + // inode-stable identity via os.Stat. + if cur, err := os.Executable(); err == nil { + curInfo, e1 := os.Stat(cur) + newInfo, e2 := os.Stat(newOramaBinaryPath) + if e1 == nil && e2 == nil && os.SameFile(curInfo, newInfo) { + // Already running the new binary (e.g. someone manually pre- + // installed it). No re-exec needed. + fmt.Printf(" (current binary already matches installed binary; skipping re-exec)\n") + return nil + } + } + + args := append([]string{newOramaBinaryPath}, os.Args[1:]...) + args = append(args, "--reexeced-after-binary-swap") + fmt.Printf("\nšŸ” Re-executing with newly-installed binary to run remaining phases with current code (#15 fix)...\n") + // syscall.Exec replaces this process image; argv[0] is the binary + // path, env inherited as-is. On success we never return. + if err := syscall.Exec(newOramaBinaryPath, args, os.Environ()); err != nil { + return fmt.Errorf("syscall.Exec %s: %w", newOramaBinaryPath, err) + } + return nil +} + func (o *Orchestrator) regenerateConfigs() error { peers := o.extractPeers() vpsIP, joinAddress := o.extractNetworkConfig() diff --git a/core/pkg/cli/production/upgrade/orchestrator_reexec_test.go b/core/pkg/cli/production/upgrade/orchestrator_reexec_test.go new file mode 100644 index 0000000..d77c0f7 --- /dev/null +++ b/core/pkg/cli/production/upgrade/orchestrator_reexec_test.go @@ -0,0 +1,84 @@ +package upgrade + +import ( + "os" + "strings" + "testing" +) + +// Bugboard #15 — Upgrade orchestrator chicken-and-egg. +// +// Pre-fix: Phase 4 (config regen) ran with the pre-swap binary's +// compiled Go code, so config-shape changes shipped in this release +// only took effect on the NEXT rollout. Operators had to upgrade +// twice for a config-changing release to apply. +// +// Post-fix: after Phase 2b installs the new binary, the orchestrator +// re-execs itself using the newly-installed binary so Phase 3+ runs +// with current code. A hidden --reexeced-after-binary-swap flag tells +// the new process to skip the pre-binary phases. +// +// These tests pin the flag plumbing and helper behavior. End-to-end +// re-exec can only be verified on a real install (tests can't safely +// call syscall.Exec). + +func TestFlags_ReexecedAfterBinarySwap_parses(t *testing.T) { + // The hidden flag must be parseable; orchestrator sets it on the + // re-execed argv. If this regresses (e.g. someone removes the + // fs.BoolVar registration to clean up the help output), the + // re-execed process would fail with "flag provided but not defined" + // and the upgrade would error mid-way. + flags, err := ParseFlags([]string{"--reexeced-after-binary-swap"}) + if err != nil { + t.Fatalf("ParseFlags must accept the hidden flag: %v", err) + } + if !flags.ReexecedAfterBinarySwap { + t.Error("flag value not surfaced on Flags struct") + } +} + +func TestFlags_ReexecedAfterBinarySwap_defaultFalse(t *testing.T) { + // Default value MUST be false. If it ever defaults to true, the + // orchestrator would skip its own pre-binary phases on the FIRST + // user-initiated upgrade and bricks would happen — Phase 2b would + // never run. + flags, err := ParseFlags([]string{}) + if err != nil { + t.Fatalf("ParseFlags empty args: %v", err) + } + if flags.ReexecedAfterBinarySwap { + t.Fatal("FATAL DEFAULT: ReexecedAfterBinarySwap defaults to true; this would skip "+ + "Phase 2b (binary install) on every upgrade. MUST be false by default.") + } +} + +func TestReexecAfterBinarySwap_missingBinaryReturnsError(t *testing.T) { + // When the new binary isn't on disk at the expected path, the + // helper must surface an error so the orchestrator can fall back + // (with a warning) rather than silently no-op or panic. This is + // the "Phase 2b succeeded but the file vanished" case — defensive + // path, but cheap to pin. + if _, err := os.Stat(newOramaBinaryPath); err == nil { + t.Skipf("test machine has %s present; skipping (real install env)", newOramaBinaryPath) + } + o := &Orchestrator{flags: &Flags{}} + err := o.reexecAfterBinarySwap() + if err == nil { + t.Error("expected error when new binary path is missing; got nil") + } + if err != nil && !strings.Contains(err.Error(), newOramaBinaryPath) { + t.Errorf("error should mention the missing path %q for operator debuggability; got: %v", + newOramaBinaryPath, err) + } +} + +func TestReexecPathConstant_isAbsolute(t *testing.T) { + // syscall.Exec requires an absolute path. If someone refactors the + // constant to "orama" expecting PATH lookup, the exec call would + // fail at runtime ONLY in production (test env never reaches + // syscall.Exec). Pin the absolute-path invariant statically. + if !strings.HasPrefix(newOramaBinaryPath, "/") { + t.Fatalf("newOramaBinaryPath must be absolute (syscall.Exec requirement); got %q", + newOramaBinaryPath) + } +} diff --git a/core/pkg/gateway/handlers/serverless/enable_disable_handler.go b/core/pkg/gateway/handlers/serverless/enable_disable_handler.go new file mode 100644 index 0000000..ec3a182 --- /dev/null +++ b/core/pkg/gateway/handlers/serverless/enable_disable_handler.go @@ -0,0 +1,57 @@ +package serverless + +import ( + "context" + "net/http" + "time" + + "github.com/DeBrosOfficial/network/pkg/serverless" +) + +// SetEnabledFunction handles POST /v1/functions/{name}/disable and +// POST /v1/functions/{name}/enable. +// +// Plan 11.5 — operators flip a function's status without redeploying +// during incident response. Targets ALL versions by name; the registry +// SetEnabled call does the UPDATE atomically. +// +// On success returns {"status":"ok","function":,"enabled":}. +// On 404 returns {"error":"function not found"}. +// +// SECURITY NOTE: this is an operator-scope endpoint. The auth middleware +// upstream gates by namespace (JWT or API-key); within a namespace any +// authenticated caller can flip. Tighten with an explicit admin-scope +// check before exposing to multi-tenant production. +func (h *ServerlessHandlers) SetEnabledFunction(w http.ResponseWriter, r *http.Request, name string, enabled bool) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + namespace := r.URL.Query().Get("namespace") + if namespace == "" { + namespace = h.getNamespaceFromRequest(r) + } + if namespace == "" { + writeError(w, http.StatusBadRequest, "namespace required") + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + if err := h.registry.SetEnabled(ctx, namespace, name, enabled); err != nil { + if serverless.IsNotFound(err) { + writeError(w, http.StatusNotFound, "function not found") + } else { + writeError(w, http.StatusInternalServerError, "failed to set function enabled state") + } + return + } + + writeJSON(w, http.StatusOK, map[string]interface{}{ + "status": "ok", + "function": name, + "enabled": enabled, + }) +} diff --git a/core/pkg/gateway/handlers/serverless/handlers_test.go b/core/pkg/gateway/handlers/serverless/handlers_test.go index 1e74469..b645aa5 100644 --- a/core/pkg/gateway/handlers/serverless/handlers_test.go +++ b/core/pkg/gateway/handlers/serverless/handlers_test.go @@ -68,6 +68,10 @@ func (m *mockRegistry) Delete(_ context.Context, _, _ string, _ int) error { return m.deleteErr } +func (m *mockRegistry) SetEnabled(_ context.Context, _, _ string, _ bool) error { + return nil +} + func (m *mockRegistry) GetWASMBytes(_ context.Context, _ string) ([]byte, error) { return nil, nil } diff --git a/core/pkg/gateway/handlers/serverless/routes.go b/core/pkg/gateway/handlers/serverless/routes.go index a2f95e4..8ac1284 100644 --- a/core/pkg/gateway/handlers/serverless/routes.go +++ b/core/pkg/gateway/handlers/serverless/routes.go @@ -37,6 +37,8 @@ func (h *ServerlessHandlers) handleFunctions(w http.ResponseWriter, r *http.Requ // - GET /v1/functions/{name} - Get function info // - DELETE /v1/functions/{name} - Delete function // - POST /v1/functions/{name}/invoke - Invoke function +// - POST /v1/functions/{name}/disable - Pause without redeploy (plan 11.5) +// - POST /v1/functions/{name}/enable - Resume (plan 11.5) // - GET /v1/functions/{name}/versions - List versions // - GET /v1/functions/{name}/logs - Get logs // - WS /v1/functions/{name}/ws - WebSocket invoke @@ -98,6 +100,10 @@ func (h *ServerlessHandlers) handleFunctionByName(w http.ResponseWriter, r *http switch action { case "invoke": h.InvokeFunction(w, r, name, version) + case "disable": + h.SetEnabledFunction(w, r, name, false) + case "enable": + h.SetEnabledFunction(w, r, name, true) case "ws": h.HandleWebSocket(w, r, name, version) case "versions": diff --git a/core/pkg/gateway/serverless_handlers_test.go b/core/pkg/gateway/serverless_handlers_test.go index 0dcb4d6..e6fb98f 100644 --- a/core/pkg/gateway/serverless_handlers_test.go +++ b/core/pkg/gateway/serverless_handlers_test.go @@ -33,6 +33,10 @@ func (m *mockFunctionRegistry) Delete(ctx context.Context, namespace, name strin return nil } +func (m *mockFunctionRegistry) SetEnabled(ctx context.Context, namespace, name string, enabled bool) error { + return nil +} + func (m *mockFunctionRegistry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error) { return []byte("wasm"), nil } diff --git a/core/pkg/push/dispatcher.go b/core/pkg/push/dispatcher.go index 5876907..9d786c9 100644 --- a/core/pkg/push/dispatcher.go +++ b/core/pkg/push/dispatcher.go @@ -104,6 +104,15 @@ func (d *PushDispatcher) SendToUserDetailed( // to "apns_voip" devices, even though both are registered on the // same iPhone. Unset = fanout (back-compat for every existing // caller, including unmigrated functions in other namespaces). + // + // Bugboard feat-10 — exclude_provider filter. The inverse: drop + // devices whose Provider EQUALS msg.ExcludeProvider. Useful for the + // "fan out to everyone EXCEPT VoIP" pattern (chat handler that wants + // ntfy+apns+expo but never apns_voip — cleaner than listing every + // included provider). If both are set, TargetProvider wins — + // combining them is ambiguous (e.g. target=apns + exclude=apns is + // empty by construction), so we pick the safer positive filter and + // ignore the exclusion. Unset = no exclusion. if msg.TargetProvider != "" { filtered := devs[:0] for _, dev := range devs { @@ -112,6 +121,14 @@ func (d *PushDispatcher) SendToUserDetailed( } } devs = filtered + } else if msg.ExcludeProvider != "" { + filtered := devs[:0] + for _, dev := range devs { + if dev.Provider != msg.ExcludeProvider { + filtered = append(filtered, dev) + } + } + devs = filtered } out := &SendDetailedResult{ Ok: true, // flipped to false on the first failure diff --git a/core/pkg/push/dispatcher_exclude_provider_test.go b/core/pkg/push/dispatcher_exclude_provider_test.go new file mode 100644 index 0000000..d7fb2b1 --- /dev/null +++ b/core/pkg/push/dispatcher_exclude_provider_test.go @@ -0,0 +1,146 @@ +package push + +import ( + "context" + "testing" + + "go.uber.org/zap" +) + +// Bugboard feat-10 — exclude_provider dispatcher filter. +// +// Inverse of #408's target_provider. Pin behaviors that matter for the +// "fan out to everyone EXCEPT VoIP" pattern: +// +// 1. With ExcludeProvider="apns_voip", apns/ntfy/expo devices are +// attempted; apns_voip devices are dropped. Cleaner than listing +// every included provider on every call. +// +// 2. With both TargetProvider and ExcludeProvider set, TargetProvider +// wins (positive filter is strictly narrower; combining them is +// ambiguous — e.g. target=apns + exclude=apns is empty). Documented +// and pinned so a future refactor can't accidentally let exclude +// subtract from target. +// +// 3. With neither set, fan-out unchanged (back-compat for every +// existing caller). +// +// 4. DevicesAttempted reflects the POST-filter count. + +func threeDeviceUser() []PushDevice { + return []PushDevice{ + {DeviceID: "ios-base", Provider: "apns", Token: "ALERT-TOKEN"}, + {DeviceID: "ios-base:voip", Provider: "apns_voip", Token: "VOIP-TOKEN"}, + {DeviceID: "expo-1", Provider: "expo", Token: "EXPO-TOKEN"}, + } +} + +func TestDispatcher_ExcludeProvider_DropsApnsVoip(t *testing.T) { + alert := &recordingProvider{name: "apns"} + voip := &recordingProvider{name: "apns_voip"} + expo := &recordingProvider{name: "expo"} + d := New(&targetFilterDeviceStore{devices: threeDeviceUser()}, zap.NewNop()) + for _, p := range []PushProvider{alert, voip, expo} { + d.Register(p) + } + + res, err := d.SendToUserDetailed(context.Background(), "ns", "u1", PushMessage{ + Title: "new message", + Body: "hi", + ExcludeProvider: "apns_voip", + }) + if err != nil { + t.Fatalf("SendToUserDetailed: %v", err) + } + + if got := alert.tokens(); len(got) != 1 { + t.Errorf("alert should have been called once; got %v", got) + } + if got := expo.tokens(); len(got) != 1 { + t.Errorf("expo should have been called once; got %v", got) + } + if got := voip.tokens(); len(got) != 0 { + t.Errorf("FEAT-10 REGRESSION: voip was attempted despite ExcludeProvider=apns_voip; "+ + "this would CallKit-ring on every chat message even when caller meant to skip it. got=%v", got) + } + if res.DevicesAttempted != 2 { + t.Errorf("DevicesAttempted = %d; want 2 (post-exclude: apns + expo)", res.DevicesAttempted) + } +} + +func TestDispatcher_ExcludeProvider_TargetProviderWinsWhenBothSet(t *testing.T) { + // Ambiguity guard: if both are set, the documented behavior is + // "TargetProvider wins; ExcludeProvider is ignored." Without this + // pin, a future refactor could chain the filters (e.g. + // target=apns + exclude=apns → 0 devices, surprise no-op) — which + // would silently break any caller that set both, even harmlessly. + alert := &recordingProvider{name: "apns"} + voip := &recordingProvider{name: "apns_voip"} + d := New(&targetFilterDeviceStore{devices: twoIPhoneDevicesUser()}, zap.NewNop()) + d.Register(alert) + d.Register(voip) + + _, err := d.SendToUserDetailed(context.Background(), "ns", "u1", PushMessage{ + Title: "x", + TargetProvider: "apns", // positive: only apns + ExcludeProvider: "apns_voip", // negative: also exclude voip — redundant when target is set + }) + if err != nil { + t.Fatalf("SendToUserDetailed: %v", err) + } + // Only the positive filter should have applied → alert called once. + if got := alert.tokens(); len(got) != 1 { + t.Errorf("alert attempts = %v; want 1 (TargetProvider should win when both set)", got) + } + if got := voip.tokens(); len(got) != 0 { + t.Errorf("voip should not have been called (target filter excludes it implicitly); got %v", got) + } +} + +func TestDispatcher_ExcludeProvider_UnsetFansOut(t *testing.T) { + // Back-compat: every existing caller that doesn't set either filter + // must continue to see the full fan-out behavior. + alert := &recordingProvider{name: "apns"} + voip := &recordingProvider{name: "apns_voip"} + expo := &recordingProvider{name: "expo"} + d := New(&targetFilterDeviceStore{devices: threeDeviceUser()}, zap.NewNop()) + for _, p := range []PushProvider{alert, voip, expo} { + d.Register(p) + } + + res, err := d.SendToUserDetailed(context.Background(), "ns", "u1", PushMessage{ + Title: "x", + // Neither TargetProvider nor ExcludeProvider set. + }) + if err != nil { + t.Fatalf("SendToUserDetailed: %v", err) + } + if res.DevicesAttempted != 3 { + t.Errorf("DevicesAttempted = %d; want 3 (fan-out)", res.DevicesAttempted) + } + if len(alert.tokens()) != 1 || len(voip.tokens()) != 1 || len(expo.tokens()) != 1 { + t.Errorf("all three providers should have been attempted; got alert=%d voip=%d expo=%d", + len(alert.tokens()), len(voip.tokens()), len(expo.tokens())) + } +} + +func TestDispatcher_ExcludeProvider_NoMatchingExclusion_NoOp(t *testing.T) { + // If the exclude target doesn't match any registered device, + // everyone is still attempted (back-compat fan-out). + alert := &recordingProvider{name: "apns"} + voip := &recordingProvider{name: "apns_voip"} + d := New(&targetFilterDeviceStore{devices: twoIPhoneDevicesUser()}, zap.NewNop()) + d.Register(alert) + d.Register(voip) + + res, err := d.SendToUserDetailed(context.Background(), "ns", "u1", PushMessage{ + Title: "x", + ExcludeProvider: "ntfy", // user has no ntfy device — no-op exclusion + }) + if err != nil { + t.Fatalf("SendToUserDetailed: %v", err) + } + if res.DevicesAttempted != 2 { + t.Errorf("DevicesAttempted = %d; want 2 (exclude matched nothing)", res.DevicesAttempted) + } +} diff --git a/core/pkg/push/types.go b/core/pkg/push/types.go index 32f9311..f44bc46 100644 --- a/core/pkg/push/types.go +++ b/core/pkg/push/types.go @@ -37,16 +37,27 @@ const ( // can target "apns" only and avoid waking the user's "apns_voip" // (PushKit/CallKit) device on every text. Providers themselves ignore // this field. +// +// ExcludeProvider is the inverse filter (bugboard feat-10). Empty = +// no exclusion. Non-empty = dispatcher skips any device whose Provider +// EQUALS this value. Useful for the "fan out to everyone EXCEPT VoIP" +// pattern — a chat message handler that wants ntfy + apns + expo but +// never apns_voip. When BOTH TargetProvider and ExcludeProvider are +// set, TargetProvider wins and Exclude is ignored (positive filter is +// strictly narrower than negative; combining them is ambiguous so we +// pick the safer one — see dispatcher comment for rationale). Providers +// themselves ignore this field. type PushMessage struct { - DeviceToken string - Title string - Body string - Data map[string]interface{} - Badge int - Sound string - Channel string // "messages", "calls", etc — provider may map to its own channel concept - Priority PushPriority - TargetProvider string // dispatcher-side filter; "" = fanout. See type doc. + DeviceToken string + Title string + Body string + Data map[string]interface{} + Badge int + Sound string + Channel string // "messages", "calls", etc — provider may map to its own channel concept + Priority PushPriority + TargetProvider string // dispatcher-side positive filter; "" = fanout. See type doc. + ExcludeProvider string // dispatcher-side negative filter; "" = no exclusion. See type doc. } // PushProvider is implemented by each backend (ntfy, expo, apns). diff --git a/core/pkg/serverless/hostfunctions/push.go b/core/pkg/serverless/hostfunctions/push.go index 24dd2e8..c0a1982 100644 --- a/core/pkg/serverless/hostfunctions/push.go +++ b/core/pkg/serverless/hostfunctions/push.go @@ -29,6 +29,12 @@ type PushSendArgs struct { Sound string `json:"sound,omitempty"` Data map[string]interface{} `json:"data,omitempty"` TargetProvider string `json:"target_provider,omitempty"` + // ExcludeProvider is the inverse of TargetProvider — drops devices + // whose provider equals this value. Cleaner semantic than listing + // every included provider for the "fan out to everyone EXCEPT VoIP" + // pattern (chat-handler wants ntfy+apns+expo but never apns_voip). + // If both are set, TargetProvider wins. Bugboard feat-10. + ExcludeProvider string `json:"exclude_provider,omitempty"` } // MaxPushSendArgsBytes caps the JSON arg size to a few KB. Push payloads @@ -101,14 +107,15 @@ func (h *HostFunctions) PushSend(ctx context.Context, userID string, msgJSON []b } msg := push.PushMessage{ - Title: args.Title, - Body: args.Body, - Channel: args.Channel, - Priority: priority, - Badge: args.Badge, - Sound: args.Sound, - Data: args.Data, - TargetProvider: args.TargetProvider, + Title: args.Title, + Body: args.Body, + Channel: args.Channel, + Priority: priority, + Badge: args.Badge, + Sound: args.Sound, + Data: args.Data, + TargetProvider: args.TargetProvider, + ExcludeProvider: args.ExcludeProvider, } // Route through Manager when present so per-namespace push config @@ -197,14 +204,15 @@ func (h *HostFunctions) PushSendV2(ctx context.Context, userID string, msgJSON [ } msg := push.PushMessage{ - Title: args.Title, - Body: args.Body, - Channel: args.Channel, - Priority: priority, - Badge: args.Badge, - Sound: args.Sound, - Data: args.Data, - TargetProvider: args.TargetProvider, + Title: args.Title, + Body: args.Body, + Channel: args.Channel, + Priority: priority, + Badge: args.Badge, + Sound: args.Sound, + Data: args.Data, + TargetProvider: args.TargetProvider, + ExcludeProvider: args.ExcludeProvider, } // Prefer the Manager (per-namespace config); fall back to the legacy diff --git a/core/pkg/serverless/mocks_test.go b/core/pkg/serverless/mocks_test.go index 0e49b76..3ee85a4 100644 --- a/core/pkg/serverless/mocks_test.go +++ b/core/pkg/serverless/mocks_test.go @@ -79,6 +79,21 @@ func (m *MockRegistry) Delete(ctx context.Context, namespace, name string, versi return nil } +func (m *MockRegistry) SetEnabled(ctx context.Context, namespace, name string, enabled bool) error { + m.mu.Lock() + defer m.mu.Unlock() + fn, ok := m.functions[namespace+"/"+name] + if !ok { + return ErrFunctionNotFound + } + if enabled { + fn.Status = FunctionStatusActive + } else { + fn.Status = FunctionStatusInactive + } + return nil +} + func (m *MockRegistry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/core/pkg/serverless/registry.go b/core/pkg/serverless/registry.go index ef37cbc..fe28c84 100644 --- a/core/pkg/serverless/registry.go +++ b/core/pkg/serverless/registry.go @@ -223,6 +223,38 @@ func (r *Registry) List(ctx context.Context, namespace string) ([]*Function, err return functions, nil } +// SetEnabled flips a function's status between active and inactive +// without redeploying (plan 11.5 disable/enable). Targets ALL versions +// of the function by name so a disable call pauses the whole function, +// not a single version — operators use this during incident response. +// Returns ErrFunctionNotFound when no row matches. +func (r *Registry) SetEnabled(ctx context.Context, namespace, name string, enabled bool) error { + namespace = strings.TrimSpace(namespace) + name = strings.TrimSpace(name) + if namespace == "" || name == "" { + return fmt.Errorf("namespace and name required") + } + status := FunctionStatusInactive + if enabled { + status = FunctionStatusActive + } + query := `UPDATE functions SET status = ?, updated_at = ? WHERE namespace = ? AND name = ?` + result, err := r.db.Exec(ctx, query, string(status), time.Now(), namespace, name) + if err != nil { + return fmt.Errorf("failed to set function enabled state: %w", err) + } + rowsAffected, _ := result.RowsAffected() + if rowsAffected == 0 { + return ErrFunctionNotFound + } + r.logger.Info("Function enabled-state updated", + zap.String("namespace", namespace), + zap.String("name", name), + zap.String("status", string(status)), + ) + return nil +} + // Delete removes a function. If version is 0, removes all versions. func (r *Registry) Delete(ctx context.Context, namespace, name string, version int) error { namespace = strings.TrimSpace(namespace) diff --git a/core/pkg/serverless/registry/function_store.go b/core/pkg/serverless/registry/function_store.go index 1b06253..1bfdcac 100644 --- a/core/pkg/serverless/registry/function_store.go +++ b/core/pkg/serverless/registry/function_store.go @@ -298,6 +298,45 @@ func (s *FunctionStore) Delete(ctx context.Context, namespace, name string, vers return nil } +// SetStatus updates the status column for the latest version of a +// function within a namespace. Used by the disable/enable admin +// endpoints so operators can pause a misbehaving function during an +// incident without redeploying (plan 11.5). +// +// Caller passes the desired FunctionStatus directly so this method +// stays generic for "active" / "inactive" / "error" alike. Returns +// ErrFunctionNotFound if no row matches. +func (s *FunctionStore) SetStatus(ctx context.Context, namespace, name string, status FunctionStatus) error { + namespace = strings.TrimSpace(namespace) + name = strings.TrimSpace(name) + if namespace == "" || name == "" { + return fmt.Errorf("namespace and name required") + } + switch status { + case FunctionStatusActive, FunctionStatusInactive, FunctionStatusError: + // ok + default: + return fmt.Errorf("invalid status %q (must be active/inactive/error)", status) + } + + query := `UPDATE functions SET status = ?, updated_at = ? WHERE namespace = ? AND name = ?` + result, err := s.db.Exec(ctx, query, string(status), time.Now(), namespace, name) + if err != nil { + return fmt.Errorf("failed to set function status: %w", err) + } + rowsAffected, _ := result.RowsAffected() + if rowsAffected == 0 { + return ErrFunctionNotFound + } + + s.logger.Info("Function status updated", + zap.String("namespace", namespace), + zap.String("name", name), + zap.String("status", string(status)), + ) + return nil +} + // SaveEnvVars saves environment variables for a function. func (s *FunctionStore) SaveEnvVars(ctx context.Context, functionID string, envVars map[string]string) error { deleteQuery := `DELETE FROM function_env_vars WHERE function_id = ?` diff --git a/core/pkg/serverless/registry/registry.go b/core/pkg/serverless/registry/registry.go index ff63716..ac47de4 100644 --- a/core/pkg/serverless/registry/registry.go +++ b/core/pkg/serverless/registry/registry.go @@ -97,6 +97,16 @@ func (r *Registry) Delete(ctx context.Context, namespace, name string, version i return r.functionStore.Delete(ctx, namespace, name, version) } +// SetEnabled flips a function's enabled state across all versions +// (plan 11.5). Thin pass-through to FunctionStore.SetStatus. +func (r *Registry) SetEnabled(ctx context.Context, namespace, name string, enabled bool) error { + status := FunctionStatusInactive + if enabled { + status = FunctionStatusActive + } + return r.functionStore.SetStatus(ctx, namespace, name, status) +} + // GetWASMBytes retrieves the compiled WASM bytecode for a function. func (r *Registry) GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error) { if wasmCID == "" { diff --git a/core/pkg/serverless/registry/types.go b/core/pkg/serverless/registry/types.go index 813f455..0100211 100644 --- a/core/pkg/serverless/registry/types.go +++ b/core/pkg/serverless/registry/types.go @@ -105,6 +105,12 @@ type FunctionRegistry interface { Get(ctx context.Context, namespace, name string, version int) (*Function, error) List(ctx context.Context, namespace string) ([]*Function, error) Delete(ctx context.Context, namespace, name string, version int) error + + // SetEnabled flips a function's status between active and inactive + // across all versions without redeploying. Plan 11.5 — pause a + // misbehaving function during incident response. + SetEnabled(ctx context.Context, namespace, name string, enabled bool) error + GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error) // GetLogs returns ONLY WASM-emitted log entries (rows in function_logs). diff --git a/core/pkg/serverless/registry_set_enabled_test.go b/core/pkg/serverless/registry_set_enabled_test.go new file mode 100644 index 0000000..740fe64 --- /dev/null +++ b/core/pkg/serverless/registry_set_enabled_test.go @@ -0,0 +1,49 @@ +package serverless + +import ( + "context" + "testing" + + "go.uber.org/zap" +) + +// Plan 11.5 — disable/enable function status toggle. +// +// SetEnabled is the runtime control surface operators use during +// incident response to pause a misbehaving function without +// redeploying. The Invoker treats inactive functions as missing, so +// new invocations get 404; in-flight ones finish normally. +// +// These tests pin the validation semantics. The actual UPDATE path +// requires rqlite (covered by the registry/function_store integration +// tests once added). + +func TestRegistry_SetEnabled_emptyNamespaceRejected(t *testing.T) { + r := &Registry{logger: zap.NewNop()} + err := r.SetEnabled(context.Background(), "", "fn-1", true) + if err == nil { + t.Fatal("empty namespace must be rejected (defense at boundary)") + } +} + +func TestRegistry_SetEnabled_emptyNameRejected(t *testing.T) { + r := &Registry{logger: zap.NewNop()} + err := r.SetEnabled(context.Background(), "ns", "", true) + if err == nil { + t.Fatal("empty name must be rejected (defense at boundary)") + } +} + +func TestRegistry_SetEnabled_trimsWhitespace(t *testing.T) { + // Whitespace-only inputs should also be rejected — strings.TrimSpace + // makes " " collapse to "" which the empty-check then catches. + // Without this, a caller passing " " would slip through and bind a + // degenerate row update. + r := &Registry{logger: zap.NewNop()} + if err := r.SetEnabled(context.Background(), " ", "name", true); err == nil { + t.Error("whitespace-only namespace must be rejected") + } + if err := r.SetEnabled(context.Background(), "ns", " ", true); err == nil { + t.Error("whitespace-only name must be rejected") + } +} diff --git a/core/pkg/serverless/types.go b/core/pkg/serverless/types.go index fc9c71e..c14995e 100644 --- a/core/pkg/serverless/types.go +++ b/core/pkg/serverless/types.go @@ -81,6 +81,14 @@ type FunctionRegistry interface { // Delete removes a function. If version is 0, removes all versions. Delete(ctx context.Context, namespace, name string, version int) error + // SetEnabled toggles a function's status between active and inactive + // without redeploying. Plan 11.5 — lets operators pause a misbehaving + // function during an incident response. Existing in-flight invocations + // finish; new ones see the function as missing/inactive and the + // invoker rejects them upstream. Returns ErrFunctionNotFound if the + // name doesn't exist in the namespace. + SetEnabled(ctx context.Context, namespace, name string, enabled bool) error + // GetWASMBytes retrieves the compiled WASM bytecode for a function. GetWASMBytes(ctx context.Context, wasmCID string) ([]byte, error)