package push // manager.go — top-level entry point for sending push notifications, // with per-namespace provider configuration that tenants self-serve. // // The Manager wraps: // - a ConfigStore (per-namespace overrides in RQLite) // - a fallback Defaults (gateway YAML — the cluster-wide default if // a namespace hasn't set its own) // - an LRU cache of built dispatchers keyed by namespace // // Every send-path (HTTP, WASM hostfunc) goes through Manager.SendToUser // or Manager.Send. The cache eliminates per-call config decryption + // provider construction overhead — only the FIRST send for a namespace // pays that cost; subsequent sends are zero-allocation lookups. // // Cache invalidation: HTTP config-change handlers call Invalidate after // PUT/DELETE so the next send rebuilds with fresh config. // // See bug #220 follow-up. Generic by design — same pattern works for // any future "tenant should self-serve this knob" feature. import ( "container/list" "context" "errors" "fmt" "sync" "go.uber.org/zap" ) // ProviderFactory builds the set of providers for a resolved Config. // Injected by the gateway so the push package itself doesn't import // the provider sub-packages (avoids a circular dependency: provider // sub-packages already depend on push for the PushProvider interface). // // The factory is called once per fresh dispatcher build (cache miss). // Empty slice is allowed and means "this config produces no providers"; // Manager treats that as ErrPushNotConfigured. type ProviderFactory func(cfg Config) []PushProvider // ErrPushNotConfigured is returned by Send when the namespace has no // per-namespace config AND the gateway has no fallback defaults — i.e. // nothing to send through. Distinguish from ErrNoDevices (different // failure mode). var ErrPushNotConfigured = errors.New("push not configured for namespace; set ntfy_base_url or expo_access_token via PUT /v1/push/config") // Defaults are the gateway-YAML fallback when a namespace hasn't set its // own config. Any field set here applies to every namespace that doesn't // override it. Empty Defaults means "no global default — namespace must // set their own". type Defaults struct { NtfyBaseURL string NtfyAuthToken string ExpoAccessToken string } // IsEmpty returns true when no fallback provider is set. func (d Defaults) IsEmpty() bool { return d.NtfyBaseURL == "" && d.ExpoAccessToken == "" } // Manager is the top-level push entry point. Build with NewManager and // hand out via the gateway's dependencies. Safe for concurrent use. type Manager struct { store ConfigStore devices PushDeviceStore defaults Defaults factory ProviderFactory logger *zap.Logger // cache LRU of namespace → built dispatcher. mu sync.Mutex cache map[string]*list.Element lru *list.List cacheCap int } // cacheEntry is the doubly-linked-list node + dispatcher payload. type cacheEntry struct { namespace string dispatcher *PushDispatcher } // defaultCacheCap caps how many namespaces' dispatchers we hold in memory. // Each entry is small (~few hundred bytes); 256 is generous and bounds // memory under abuse. const defaultCacheCap = 256 // NewManager constructs a Manager with the given device store, config // store, fallback Defaults, and ProviderFactory. // // The ProviderFactory is REQUIRED — it tells the manager how to turn a // resolved Config into PushProvider instances. The gateway's // dependencies wires this up (it imports the provider sub-packages and // returns the right providers for each config field that's populated). func NewManager(devices PushDeviceStore, store ConfigStore, defaults Defaults, factory ProviderFactory, logger *zap.Logger) *Manager { if logger == nil { logger = zap.NewNop() } return &Manager{ store: store, devices: devices, defaults: defaults, factory: factory, logger: logger, cache: make(map[string]*list.Element, defaultCacheCap), lru: list.New(), cacheCap: defaultCacheCap, } } // SendToUser dispatches a push to every device registered for the user // in the given namespace. Looks up per-namespace config (or falls back // to defaults), builds the appropriate dispatcher, and sends. // // Returns: // - nil on success or "no devices" (per existing PushDispatcher // contract — sending push to a user with zero devices is a no-op) // - ErrPushNotConfigured when neither namespace config nor gateway // defaults provide a working provider func (m *Manager) SendToUser(ctx context.Context, namespace, userID string, msg PushMessage) error { d, err := m.dispatcherFor(ctx, namespace) if err != nil { return err } return d.SendToUser(ctx, namespace, userID, msg) } // DeviceStore exposes the underlying device store so HTTP handlers // (register/list/delete) can use it directly without going through the // dispatcher path. func (m *Manager) DeviceStore() PushDeviceStore { return m.devices } // IsConfigured returns true when the namespace has per-namespace config // OR usable fallback defaults — i.e. SendToUser would not fail with // ErrPushNotConfigured. Cheap path-check used by HTTP 503 responses. func (m *Manager) IsConfigured(ctx context.Context, namespace string) bool { if !m.defaults.IsEmpty() { return true } if m.store == nil { return false } cfg, err := m.store.Get(ctx, namespace) if err != nil || cfg == nil { return false } return !cfg.IsEmpty() } // Invalidate evicts the cached dispatcher for a namespace. Call after a // successful PUT/DELETE on /v1/push/config so the next SendToUser uses // fresh config. func (m *Manager) Invalidate(namespace string) { m.mu.Lock() defer m.mu.Unlock() if elem, ok := m.cache[namespace]; ok { m.lru.Remove(elem) delete(m.cache, namespace) } } // dispatcherFor returns a (cached or freshly built) dispatcher with the // providers configured for the given namespace. func (m *Manager) dispatcherFor(ctx context.Context, namespace string) (*PushDispatcher, error) { // Fast path — already cached. m.mu.Lock() if elem, ok := m.cache[namespace]; ok { m.lru.MoveToFront(elem) entry := elem.Value.(*cacheEntry) m.mu.Unlock() return entry.dispatcher, nil } m.mu.Unlock() // Slow path — load config + build dispatcher. d, err := m.buildDispatcher(ctx, namespace) if err != nil { return nil, err } // Insert into cache (eviction if at capacity). m.mu.Lock() defer m.mu.Unlock() // Recheck under lock — another goroutine may have built one. if elem, ok := m.cache[namespace]; ok { m.lru.MoveToFront(elem) return elem.Value.(*cacheEntry).dispatcher, nil } if m.lru.Len() >= m.cacheCap { oldest := m.lru.Back() if oldest != nil { old := oldest.Value.(*cacheEntry) m.lru.Remove(oldest) delete(m.cache, old.namespace) } } entry := &cacheEntry{namespace: namespace, dispatcher: d} m.cache[namespace] = m.lru.PushFront(entry) return d, nil } // buildDispatcher resolves the effective config for the namespace and // constructs a fresh PushDispatcher with the right providers registered. // // Effective-config resolution: // 1. Start with gateway YAML defaults // 2. If a per-namespace row exists, override field-by-field // 3. If the result has zero providers, return ErrPushNotConfigured func (m *Manager) buildDispatcher(ctx context.Context, namespace string) (*PushDispatcher, error) { eff := Config{Namespace: namespace} // Start from defaults. eff.NtfyBaseURL = m.defaults.NtfyBaseURL eff.NtfyAuthToken = m.defaults.NtfyAuthToken eff.ExpoAccessToken = m.defaults.ExpoAccessToken // Layer namespace overrides if any. if m.store != nil { nc, err := m.store.Get(ctx, namespace) if err != nil && !errors.Is(err, ErrConfigNotFound) { return nil, fmt.Errorf("manager: load namespace config: %w", err) } if nc != nil { // Each field overrides the default if non-empty. An explicit // empty value via PUT clears the namespace's row entirely // (DELETE) — there's no "set this field to empty to clear" // half-state, by design. if nc.NtfyBaseURL != "" { eff.NtfyBaseURL = nc.NtfyBaseURL } if nc.NtfyAuthToken != "" { eff.NtfyAuthToken = nc.NtfyAuthToken } if nc.ExpoAccessToken != "" { eff.ExpoAccessToken = nc.ExpoAccessToken } } } // Refuse to build a dispatcher with no providers — caller gets a // clear error instead of a silent no-op. if eff.NtfyBaseURL == "" && eff.ExpoAccessToken == "" { return nil, ErrPushNotConfigured } if m.factory == nil { // Defensive: a Manager built without a factory can't produce // providers. Programmer error; surface explicitly. return nil, fmt.Errorf("manager: no provider factory configured") } providers := m.factory(eff) if len(providers) == 0 { return nil, ErrPushNotConfigured } d := New(m.devices, m.logger) for _, p := range providers { d.Register(p) } return d, nil }