From c499b2d76ed41ed4877e610e7066d1007dbfcd44 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 20 Feb 2026 09:44:07 +0200 Subject: [PATCH] Enhance health checker and deployment handling - Added support for "degraded" deployment status in types. - Updated health checker initialization to include process manager and node ID. - Refactored health checker tests to accommodate new process manager functionality. - Implemented logic to handle unhealthy deployments, including restart and failure marking. - Enhanced deployment reconciliation to manage under-replicated scenarios. - Updated gateway handlers and middleware to consider "degraded" status in deployment queries. --- pkg/deployments/health/checker.go | 501 +++++++++++--- pkg/deployments/health/checker_test.go | 685 +++++++++++++++++--- pkg/deployments/types.go | 1 + pkg/gateway/gateway.go | 3 +- pkg/gateway/handlers/deployments/service.go | 6 +- pkg/gateway/middleware.go | 6 +- 6 files changed, 1016 insertions(+), 186 deletions(-) diff --git a/pkg/deployments/health/checker.go b/pkg/deployments/health/checker.go index 51cef1b..b4fab79 100644 --- a/pkg/deployments/health/checker.go +++ b/pkg/deployments/health/checker.go @@ -2,16 +2,41 @@ package health import ( "context" + "encoding/json" "fmt" "net/http" "sync" "time" "github.com/DeBrosOfficial/network/pkg/database" + "github.com/DeBrosOfficial/network/pkg/deployments" "go.uber.org/zap" ) -// deploymentRow represents a deployment record for health checking +// Tuning constants. +const ( + consecutiveFailuresThreshold = 3 + defaultDesiredReplicas = deployments.DefaultReplicaCount +) + +// ProcessManager is the subset of process.Manager needed by the health checker. +type ProcessManager interface { + Restart(ctx context.Context, deployment *deployments.Deployment) error + Stop(ctx context.Context, deployment *deployments.Deployment) error +} + +// ReplicaReconciler provides replica management for the reconciliation loop. +type ReplicaReconciler interface { + SelectReplicaNodes(ctx context.Context, primaryNodeID string, count int) ([]string, error) + UpdateReplicaStatus(ctx context.Context, deploymentID, nodeID string, status deployments.ReplicaStatus) error +} + +// ReplicaProvisioner provisions new replicas on remote nodes. +type ReplicaProvisioner interface { + SetupDynamicReplica(ctx context.Context, deployment *deployments.Deployment, nodeID string) +} + +// deploymentRow represents a deployment record for health checking. type deploymentRow struct { ID string `db:"id"` Namespace string `db:"namespace"` @@ -20,57 +45,101 @@ type deploymentRow struct { Port int `db:"port"` HealthCheckPath string `db:"health_check_path"` HomeNodeID string `db:"home_node_id"` + RestartPolicy string `db:"restart_policy"` + MaxRestartCount int `db:"max_restart_count"` + ReplicaStatus string `db:"replica_status"` } -// HealthChecker monitors deployment health +// replicaState tracks in-memory health state for a deployment on this node. +type replicaState struct { + consecutiveMisses int + restartCount int +} + +// HealthChecker monitors deployment health on the local node. type HealthChecker struct { - db database.Database - logger *zap.Logger - workers int - mu sync.RWMutex - active map[string]bool // deployment_id -> is_active + db database.Database + logger *zap.Logger + workers int + nodeID string + processManager ProcessManager + + // In-memory per-replica state (keyed by deployment ID) + stateMu sync.Mutex + states map[string]*replicaState + + // Reconciliation (optional, set via SetReconciler) + rqliteDSN string + reconciler ReplicaReconciler + provisioner ReplicaProvisioner } -// NewHealthChecker creates a new health checker -func NewHealthChecker(db database.Database, logger *zap.Logger) *HealthChecker { +// NewHealthChecker creates a new health checker. +func NewHealthChecker(db database.Database, logger *zap.Logger, nodeID string, pm ProcessManager) *HealthChecker { return &HealthChecker{ - db: db, - logger: logger, - workers: 10, - active: make(map[string]bool), + db: db, + logger: logger, + workers: 10, + nodeID: nodeID, + processManager: pm, + states: make(map[string]*replicaState), } } -// Start begins health monitoring -func (hc *HealthChecker) Start(ctx context.Context) error { - hc.logger.Info("Starting health checker", zap.Int("workers", hc.workers)) +// SetReconciler configures the reconciliation loop dependencies (optional). +// Must be called before Start() if re-replication is desired. +func (hc *HealthChecker) SetReconciler(rqliteDSN string, rc ReplicaReconciler, rp ReplicaProvisioner) { + hc.rqliteDSN = rqliteDSN + hc.reconciler = rc + hc.provisioner = rp +} - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() +// Start begins health monitoring with two periodic tasks: +// 1. Every 30s: probe local replicas +// 2. Every 5m: (leader-only) reconcile under-replicated deployments +func (hc *HealthChecker) Start(ctx context.Context) error { + hc.logger.Info("Starting health checker", + zap.Int("workers", hc.workers), + zap.String("node_id", hc.nodeID), + ) + + probeTicker := time.NewTicker(30 * time.Second) + reconcileTicker := time.NewTicker(5 * time.Minute) + defer probeTicker.Stop() + defer reconcileTicker.Stop() for { select { case <-ctx.Done(): hc.logger.Info("Health checker stopped") return ctx.Err() - case <-ticker.C: + case <-probeTicker.C: if err := hc.checkAllDeployments(ctx); err != nil { hc.logger.Error("Health check cycle failed", zap.Error(err)) } + case <-reconcileTicker.C: + hc.reconcileDeployments(ctx) } } } -// checkAllDeployments checks all active deployments +// checkAllDeployments checks all deployments with active or failed replicas on this node. func (hc *HealthChecker) checkAllDeployments(ctx context.Context) error { var rows []deploymentRow query := ` - SELECT id, namespace, name, type, port, health_check_path, home_node_id - FROM deployments - WHERE status = 'active' AND type IN ('nextjs', 'nodejs-backend', 'go-backend') + SELECT d.id, d.namespace, d.name, d.type, dr.port, + d.health_check_path, d.home_node_id, + d.restart_policy, d.max_restart_count, + dr.status as replica_status + FROM deployments d + JOIN deployment_replicas dr ON d.id = dr.deployment_id + WHERE d.status IN ('active', 'degraded') + AND dr.node_id = ? + AND dr.status IN ('active', 'failed') + AND d.type IN ('nextjs', 'nodejs-backend', 'go-backend') ` - err := hc.db.Query(ctx, &rows, query) + err := hc.db.Query(ctx, &rows, query, hc.nodeID) if err != nil { return fmt.Errorf("failed to query deployments: %w", err) } @@ -90,6 +159,12 @@ func (hc *HealthChecker) checkAllDeployments(ctx context.Context) error { healthy := hc.checkDeployment(ctx, r) hc.recordHealthCheck(ctx, r.ID, healthy) + + if healthy { + hc.handleHealthy(ctx, r) + } else { + hc.handleUnhealthy(ctx, r) + } }(row) } @@ -97,7 +172,7 @@ func (hc *HealthChecker) checkAllDeployments(ctx context.Context) error { return nil } -// checkDeployment checks a single deployment +// checkDeployment checks a single deployment's health via HTTP. func (hc *HealthChecker) checkDeployment(ctx context.Context, dep deploymentRow) bool { if dep.Port == 0 { // Static deployments are always healthy @@ -144,7 +219,208 @@ func (hc *HealthChecker) checkDeployment(ctx context.Context, dep deploymentRow) return healthy } -// recordHealthCheck records the health check result +// handleHealthy processes a healthy check result. +func (hc *HealthChecker) handleHealthy(ctx context.Context, dep deploymentRow) { + hc.stateMu.Lock() + delete(hc.states, dep.ID) + hc.stateMu.Unlock() + + // If the replica was in 'failed' state but is now healthy, decide: + // recover it or stop it (if a replacement already exists). + if dep.ReplicaStatus == "failed" { + // Count how many active replicas this deployment already has. + type countRow struct { + Count int `db:"c"` + } + var rows []countRow + countQuery := `SELECT COUNT(*) as c FROM deployment_replicas WHERE deployment_id = ? AND status = 'active'` + if err := hc.db.Query(ctx, &rows, countQuery, dep.ID); err != nil { + hc.logger.Error("Failed to count active replicas for recovery check", zap.Error(err)) + return + } + activeCount := 0 + if len(rows) > 0 { + activeCount = rows[0].Count + } + + if activeCount >= defaultDesiredReplicas { + // This replica was replaced while the node was down. + // Stop the zombie process and remove the stale replica row. + hc.logger.Warn("Zombie replica detected — node was replaced, stopping process", + zap.String("deployment", dep.Name), + zap.String("node_id", hc.nodeID), + zap.Int("active_replicas", activeCount), + ) + + if hc.processManager != nil { + d := &deployments.Deployment{ + ID: dep.ID, + Namespace: dep.Namespace, + Name: dep.Name, + Type: deployments.DeploymentType(dep.Type), + Port: dep.Port, + } + if err := hc.processManager.Stop(ctx, d); err != nil { + hc.logger.Error("Failed to stop zombie deployment process", zap.Error(err)) + } + } + + deleteQuery := `DELETE FROM deployment_replicas WHERE deployment_id = ? AND node_id = ? AND status = 'failed'` + hc.db.Exec(ctx, deleteQuery, dep.ID, hc.nodeID) + + eventQuery := `INSERT INTO deployment_events (deployment_id, event_type, message, created_at) VALUES (?, 'zombie_replica_stopped', ?, ?)` + msg := fmt.Sprintf("Zombie replica on node %s stopped and removed (already at %d active replicas)", hc.nodeID, activeCount) + hc.db.Exec(ctx, eventQuery, dep.ID, msg, time.Now()) + return + } + + // Under-replicated — genuine recovery. Bring this replica back. + hc.logger.Info("Failed replica recovered, marking active", + zap.String("deployment", dep.Name), + zap.String("node_id", hc.nodeID), + ) + + replicaQuery := `UPDATE deployment_replicas SET status = 'active', updated_at = ? WHERE deployment_id = ? AND node_id = ?` + if _, err := hc.db.Exec(ctx, replicaQuery, time.Now(), dep.ID, hc.nodeID); err != nil { + hc.logger.Error("Failed to recover replica status", zap.Error(err)) + return + } + + // Recalculate deployment status — may go from 'degraded' back to 'active' + hc.recalculateDeploymentStatus(ctx, dep.ID) + + eventQuery := `INSERT INTO deployment_events (deployment_id, event_type, message, created_at) VALUES (?, 'replica_recovered', ?, ?)` + msg := fmt.Sprintf("Replica on node %s recovered and marked active", hc.nodeID) + hc.db.Exec(ctx, eventQuery, dep.ID, msg, time.Now()) + } +} + +// handleUnhealthy processes an unhealthy check result. +func (hc *HealthChecker) handleUnhealthy(ctx context.Context, dep deploymentRow) { + // Don't take action on already-failed replicas + if dep.ReplicaStatus == "failed" { + return + } + + hc.stateMu.Lock() + st, exists := hc.states[dep.ID] + if !exists { + st = &replicaState{} + hc.states[dep.ID] = st + } + st.consecutiveMisses++ + misses := st.consecutiveMisses + restarts := st.restartCount + hc.stateMu.Unlock() + + if misses < consecutiveFailuresThreshold { + return + } + + // Reached threshold — decide: restart or mark failed + maxRestarts := dep.MaxRestartCount + if maxRestarts == 0 { + maxRestarts = deployments.DefaultMaxRestartCount + } + + canRestart := dep.RestartPolicy != string(deployments.RestartPolicyNever) && + restarts < maxRestarts && + hc.processManager != nil + + if canRestart { + hc.logger.Info("Attempting restart for unhealthy deployment", + zap.String("deployment", dep.Name), + zap.Int("restart_attempt", restarts+1), + zap.Int("max_restarts", maxRestarts), + ) + + // Build minimal Deployment struct for process manager + d := &deployments.Deployment{ + ID: dep.ID, + Namespace: dep.Namespace, + Name: dep.Name, + Type: deployments.DeploymentType(dep.Type), + Port: dep.Port, + } + + if err := hc.processManager.Restart(ctx, d); err != nil { + hc.logger.Error("Failed to restart deployment", + zap.String("deployment", dep.Name), + zap.Error(err), + ) + } + + hc.stateMu.Lock() + st.restartCount++ + st.consecutiveMisses = 0 + hc.stateMu.Unlock() + + eventQuery := `INSERT INTO deployment_events (deployment_id, event_type, message, created_at) VALUES (?, 'health_restart', ?, ?)` + msg := fmt.Sprintf("Process restarted on node %s (attempt %d/%d)", hc.nodeID, restarts+1, maxRestarts) + hc.db.Exec(ctx, eventQuery, dep.ID, msg, time.Now()) + return + } + + // Restart limit exhausted (or policy is "never") — mark THIS replica as failed + hc.logger.Error("Marking replica as failed after exhausting restarts", + zap.String("deployment", dep.Name), + zap.String("node_id", hc.nodeID), + zap.Int("restarts_attempted", restarts), + ) + + replicaQuery := `UPDATE deployment_replicas SET status = 'failed', updated_at = ? WHERE deployment_id = ? AND node_id = ?` + if _, err := hc.db.Exec(ctx, replicaQuery, time.Now(), dep.ID, hc.nodeID); err != nil { + hc.logger.Error("Failed to mark replica as failed", zap.Error(err)) + } + + // Recalculate deployment status based on remaining active replicas + hc.recalculateDeploymentStatus(ctx, dep.ID) + + eventQuery := `INSERT INTO deployment_events (deployment_id, event_type, message, created_at) VALUES (?, 'replica_failed', ?, ?)` + msg := fmt.Sprintf("Replica on node %s marked failed after %d restart attempts", hc.nodeID, restarts) + hc.db.Exec(ctx, eventQuery, dep.ID, msg, time.Now()) +} + +// recalculateDeploymentStatus sets the deployment to 'active', 'degraded', or 'failed' +// based on the number of remaining active replicas. +func (hc *HealthChecker) recalculateDeploymentStatus(ctx context.Context, deploymentID string) { + type countRow struct { + Count int `db:"c"` + } + + var rows []countRow + countQuery := `SELECT COUNT(*) as c FROM deployment_replicas WHERE deployment_id = ? AND status = 'active'` + if err := hc.db.Query(ctx, &rows, countQuery, deploymentID); err != nil { + hc.logger.Error("Failed to count active replicas", zap.Error(err)) + return + } + + activeCount := 0 + if len(rows) > 0 { + activeCount = rows[0].Count + } + + var newStatus string + switch { + case activeCount == 0: + newStatus = "failed" + case activeCount < defaultDesiredReplicas: + newStatus = "degraded" + default: + newStatus = "active" + } + + updateQuery := `UPDATE deployments SET status = ?, updated_at = ? WHERE id = ?` + if _, err := hc.db.Exec(ctx, updateQuery, newStatus, time.Now(), deploymentID); err != nil { + hc.logger.Error("Failed to update deployment status", + zap.String("deployment", deploymentID), + zap.String("new_status", newStatus), + zap.Error(err), + ) + } +} + +// recordHealthCheck records the health check result in the database. func (hc *HealthChecker) recordHealthCheck(ctx context.Context, deploymentID string, healthy bool) { status := "healthy" if !healthy { @@ -152,89 +428,154 @@ func (hc *HealthChecker) recordHealthCheck(ctx context.Context, deploymentID str } query := ` - INSERT INTO deployment_health_checks (deployment_id, status, checked_at, response_time_ms) - VALUES (?, ?, ?, ?) + INSERT INTO deployment_health_checks (deployment_id, node_id, status, checked_at, response_time_ms) + VALUES (?, ?, ?, ?, ?) ` - _, err := hc.db.Exec(ctx, query, deploymentID, status, time.Now(), 0) + _, err := hc.db.Exec(ctx, query, deploymentID, hc.nodeID, status, time.Now(), 0) if err != nil { hc.logger.Error("Failed to record health check", zap.String("deployment", deploymentID), zap.Error(err), ) } - - // Track consecutive failures - hc.checkConsecutiveFailures(ctx, deploymentID, healthy) } -// checkConsecutiveFailures marks deployment as failed after 3 consecutive failures -func (hc *HealthChecker) checkConsecutiveFailures(ctx context.Context, deploymentID string, currentHealthy bool) { - if currentHealthy { +// reconcileDeployments checks for under-replicated deployments and triggers re-replication. +// Only runs on the RQLite leader to avoid duplicate repairs. +func (hc *HealthChecker) reconcileDeployments(ctx context.Context) { + if hc.reconciler == nil || hc.provisioner == nil { return } - type healthRow struct { - Status string `db:"status"` + if !hc.isRQLiteLeader(ctx) { + return } - var rows []healthRow + hc.logger.Info("Running deployment reconciliation check") + + type reconcileRow struct { + ID string `db:"id"` + Namespace string `db:"namespace"` + Name string `db:"name"` + Type string `db:"type"` + HomeNodeID string `db:"home_node_id"` + ContentCID string `db:"content_cid"` + BuildCID string `db:"build_cid"` + Environment string `db:"environment"` + Port int `db:"port"` + HealthCheckPath string `db:"health_check_path"` + MemoryLimitMB int `db:"memory_limit_mb"` + CPULimitPercent int `db:"cpu_limit_percent"` + RestartPolicy string `db:"restart_policy"` + MaxRestartCount int `db:"max_restart_count"` + ActiveReplicas int `db:"active_replicas"` + } + + var rows []reconcileRow query := ` - SELECT status - FROM deployment_health_checks - WHERE deployment_id = ? - ORDER BY checked_at DESC - LIMIT 3 + SELECT d.id, d.namespace, d.name, d.type, d.home_node_id, + d.content_cid, d.build_cid, d.environment, d.port, + d.health_check_path, d.memory_limit_mb, d.cpu_limit_percent, + d.restart_policy, d.max_restart_count, + (SELECT COUNT(*) FROM deployment_replicas dr + WHERE dr.deployment_id = d.id AND dr.status = 'active') AS active_replicas + FROM deployments d + WHERE d.status IN ('active', 'degraded') + AND d.type IN ('nextjs', 'nodejs-backend', 'go-backend') ` - err := hc.db.Query(ctx, &rows, query, deploymentID) - if err != nil { - hc.logger.Error("Failed to query health history", zap.Error(err)) + if err := hc.db.Query(ctx, &rows, query); err != nil { + hc.logger.Error("Failed to query deployments for reconciliation", zap.Error(err)) return } - // Check if last 3 checks all failed - if len(rows) >= 3 { - allFailed := true - for _, row := range rows { - if row.Status != "unhealthy" { - allFailed = false - break - } + for _, row := range rows { + if row.ActiveReplicas >= defaultDesiredReplicas { + continue } - if allFailed { - hc.logger.Error("Deployment has 3 consecutive failures, marking as failed", - zap.String("deployment", deploymentID), + needed := defaultDesiredReplicas - row.ActiveReplicas + hc.logger.Warn("Deployment under-replicated, triggering re-replication", + zap.String("deployment", row.Name), + zap.String("namespace", row.Namespace), + zap.Int("active_replicas", row.ActiveReplicas), + zap.Int("desired", defaultDesiredReplicas), + zap.Int("needed", needed), + ) + + newNodes, err := hc.reconciler.SelectReplicaNodes(ctx, row.HomeNodeID, needed) + if err != nil || len(newNodes) == 0 { + hc.logger.Warn("No nodes available for re-replication", + zap.String("deployment", row.Name), + zap.Error(err), ) + continue + } - updateQuery := ` - UPDATE deployments - SET status = 'failed', updated_at = ? - WHERE id = ? - ` + dep := &deployments.Deployment{ + ID: row.ID, + Namespace: row.Namespace, + Name: row.Name, + Type: deployments.DeploymentType(row.Type), + HomeNodeID: row.HomeNodeID, + ContentCID: row.ContentCID, + BuildCID: row.BuildCID, + Port: row.Port, + HealthCheckPath: row.HealthCheckPath, + MemoryLimitMB: row.MemoryLimitMB, + CPULimitPercent: row.CPULimitPercent, + RestartPolicy: deployments.RestartPolicy(row.RestartPolicy), + MaxRestartCount: row.MaxRestartCount, + } + if row.Environment != "" { + json.Unmarshal([]byte(row.Environment), &dep.Environment) + } - _, err := hc.db.Exec(ctx, updateQuery, time.Now(), deploymentID) - if err != nil { - hc.logger.Error("Failed to mark deployment as failed", zap.Error(err)) - } - - // Record event - eventQuery := ` - INSERT INTO deployment_events (deployment_id, event_type, message, created_at) - VALUES (?, 'health_failed', 'Deployment marked as failed after 3 consecutive health check failures', ?) - ` - if _, err := hc.db.Exec(ctx, eventQuery, deploymentID, time.Now()); err != nil { - hc.logger.Error("Failed to record health_failed event", - zap.String("deployment", deploymentID), - zap.Error(err), - ) - } + for _, nodeID := range newNodes { + hc.logger.Info("Provisioning replacement replica", + zap.String("deployment", row.Name), + zap.String("target_node", nodeID), + ) + go hc.provisioner.SetupDynamicReplica(ctx, dep, nodeID) } } } -// GetHealthStatus gets recent health checks for a deployment +// isRQLiteLeader checks whether this node is the current Raft leader. +func (hc *HealthChecker) isRQLiteLeader(ctx context.Context) bool { + dsn := hc.rqliteDSN + if dsn == "" { + dsn = "http://localhost:5001" + } + + client := &http.Client{Timeout: 5 * time.Second} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, dsn+"/status", nil) + if err != nil { + return false + } + + resp, err := client.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + + var status struct { + Store struct { + Raft struct { + State string `json:"state"` + } `json:"raft"` + } `json:"store"` + } + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + return false + } + + return status.Store.Raft.State == "Leader" +} + +// GetHealthStatus gets recent health checks for a deployment. func (hc *HealthChecker) GetHealthStatus(ctx context.Context, deploymentID string, limit int) ([]HealthCheck, error) { type healthRow struct { Status string `db:"status"` @@ -268,7 +609,7 @@ func (hc *HealthChecker) GetHealthStatus(ctx context.Context, deploymentID strin return checks, nil } -// HealthCheck represents a health check result +// HealthCheck represents a health check result. type HealthCheck struct { Status string `json:"status"` CheckedAt time.Time `json:"checked_at"` diff --git a/pkg/deployments/health/checker_test.go b/pkg/deployments/health/checker_test.go index cfae5bb..32c94af 100644 --- a/pkg/deployments/health/checker_test.go +++ b/pkg/deployments/health/checker_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/DeBrosOfficial/network/pkg/deployments" "go.uber.org/zap" ) @@ -35,10 +36,7 @@ type mockDB struct { mu sync.Mutex // Query handling --------------------------------------------------- - // queryFunc is called when Query is invoked. It receives the dest - // pointer and the query string + args. The implementation should - // populate dest (via reflection) and return an error if desired. - queryFunc func(dest interface{}, query string, args ...interface{}) error + queryFunc func(dest interface{}, query string, args ...interface{}) error queryCalls []queryCall // Exec handling ---------------------------------------------------- @@ -95,15 +93,54 @@ func (m *mockDB) getQueryCalls() []queryCall { return out } +// --------------------------------------------------------------------------- +// Mock process manager +// --------------------------------------------------------------------------- + +type mockProcessManager struct { + mu sync.Mutex + restartCalls []string // deployment IDs + restartErr error + stopCalls []string // deployment IDs + stopErr error +} + +func (m *mockProcessManager) Restart(_ context.Context, dep *deployments.Deployment) error { + m.mu.Lock() + m.restartCalls = append(m.restartCalls, dep.ID) + m.mu.Unlock() + return m.restartErr +} + +func (m *mockProcessManager) Stop(_ context.Context, dep *deployments.Deployment) error { + m.mu.Lock() + m.stopCalls = append(m.stopCalls, dep.ID) + m.mu.Unlock() + return m.stopErr +} + +func (m *mockProcessManager) getRestartCalls() []string { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]string, len(m.restartCalls)) + copy(out, m.restartCalls) + return out +} + +func (m *mockProcessManager) getStopCalls() []string { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]string, len(m.stopCalls)) + copy(out, m.stopCalls) + return out +} + // --------------------------------------------------------------------------- // Helper: populate a *[]T dest via reflection so the mock can return rows. // --------------------------------------------------------------------------- // appendRows appends rows to dest (a *[]SomeStruct) by creating new elements // of the destination's element type and copying field values by name. -// Each row is a map[string]interface{} keyed by field name (Go name, not db tag). -// This sidesteps the type-identity problem where the mock and the caller -// define structurally identical but distinct local types. func appendRows(dest interface{}, rows []map[string]interface{}) { dv := reflect.ValueOf(dest).Elem() // []T elemType := dv.Type().Elem() // T @@ -130,8 +167,9 @@ func appendRows(dest interface{}, rows []map[string]interface{}) { func TestNewHealthChecker_NonNil(t *testing.T) { db := &mockDB{} logger := zap.NewNop() + pm := &mockProcessManager{} - hc := NewHealthChecker(db, logger) + hc := NewHealthChecker(db, logger, "node-1", pm) if hc == nil { t.Fatal("expected non-nil HealthChecker") @@ -145,11 +183,14 @@ func TestNewHealthChecker_NonNil(t *testing.T) { if hc.workers != 10 { t.Errorf("expected default workers=10, got %d", hc.workers) } - if hc.active == nil { - t.Error("expected active map to be initialized") + if hc.nodeID != "node-1" { + t.Errorf("expected nodeID='node-1', got %q", hc.nodeID) } - if len(hc.active) != 0 { - t.Errorf("expected active map to be empty, got %d entries", len(hc.active)) + if hc.processManager != pm { + t.Error("expected processManager to be stored") + } + if hc.states == nil { + t.Error("expected states map to be initialized") } } @@ -157,7 +198,7 @@ func TestNewHealthChecker_NonNil(t *testing.T) { func TestCheckDeployment_StaticDeployment(t *testing.T) { db := &mockDB{} - hc := NewHealthChecker(db, zap.NewNop()) + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) dep := deploymentRow{ ID: "dep-1", @@ -180,11 +221,10 @@ func TestCheckDeployment_HealthyEndpoint(t *testing.T) { })) defer srv.Close() - // Extract the port from the test server. port := serverPort(t, srv) db := &mockDB{} - hc := NewHealthChecker(db, zap.NewNop()) + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) dep := deploymentRow{ ID: "dep-2", @@ -207,7 +247,7 @@ func TestCheckDeployment_UnhealthyEndpoint(t *testing.T) { port := serverPort(t, srv) db := &mockDB{} - hc := NewHealthChecker(db, zap.NewNop()) + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) dep := deploymentRow{ ID: "dep-3", @@ -223,7 +263,7 @@ func TestCheckDeployment_UnhealthyEndpoint(t *testing.T) { func TestCheckDeployment_UnreachableEndpoint(t *testing.T) { db := &mockDB{} - hc := NewHealthChecker(db, zap.NewNop()) + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) dep := deploymentRow{ ID: "dep-4", @@ -237,123 +277,383 @@ func TestCheckDeployment_UnreachableEndpoint(t *testing.T) { } } -// ---- c) checkConsecutiveFailures ------------------------------------------ +// ---- c) checkAllDeployments query ----------------------------------------- -func TestCheckConsecutiveFailures_HealthyReturnsEarly(t *testing.T) { +func TestCheckAllDeployments_QueriesLocalReplicas(t *testing.T) { db := &mockDB{} - hc := NewHealthChecker(db, zap.NewNop()) + hc := NewHealthChecker(db, zap.NewNop(), "node-abc", nil) - // When the current check is healthy, the method returns immediately - // without querying the database. - hc.checkConsecutiveFailures(context.Background(), "dep-1", true) + hc.checkAllDeployments(context.Background()) calls := db.getQueryCalls() - if len(calls) != 0 { - t.Errorf("expected no query calls when healthy, got %d", len(calls)) + if len(calls) == 0 { + t.Fatal("expected at least one query call") + } + + q := calls[0].query + if !strings.Contains(q, "deployment_replicas") { + t.Errorf("expected query to join deployment_replicas, got: %s", q) + } + if !strings.Contains(q, "dr.node_id = ?") { + t.Errorf("expected query to filter by dr.node_id, got: %s", q) + } + if !strings.Contains(q, "'degraded'") { + t.Errorf("expected query to include 'degraded' status, got: %s", q) + } + + // Verify nodeID was passed as the bind parameter + if len(calls[0].args) == 0 { + t.Fatal("expected query args") + } + if nodeID, ok := calls[0].args[0].(string); !ok || nodeID != "node-abc" { + t.Errorf("expected nodeID arg 'node-abc', got %v", calls[0].args[0]) } } -func TestCheckConsecutiveFailures_LessThan3Failures(t *testing.T) { - db := &mockDB{ - queryFunc: func(dest interface{}, query string, args ...interface{}) error { - // Return only 2 unhealthy rows (fewer than 3). - appendRows(dest, []map[string]interface{}{ - {"Status": "unhealthy"}, - {"Status": "unhealthy"}, - }) - return nil - }, +// ---- d) handleUnhealthy --------------------------------------------------- + +func TestHandleUnhealthy_RestartsBeforeFailure(t *testing.T) { + db := &mockDB{} + pm := &mockProcessManager{} + hc := NewHealthChecker(db, zap.NewNop(), "node-1", pm) + + dep := deploymentRow{ + ID: "dep-restart", + Namespace: "test", + Name: "my-app", + Type: "nextjs", + Port: 10001, + RestartPolicy: "on-failure", + MaxRestartCount: 3, + ReplicaStatus: "active", } - hc := NewHealthChecker(db, zap.NewNop()) - hc.checkConsecutiveFailures(context.Background(), "dep-1", false) + ctx := context.Background() - // Should query the DB but NOT issue any UPDATE or event INSERT. + // Drive 3 consecutive unhealthy checks -> should trigger restart + for i := 0; i < consecutiveFailuresThreshold; i++ { + hc.handleUnhealthy(ctx, dep) + } + + // Verify restart was called + restarts := pm.getRestartCalls() + if len(restarts) != 1 { + t.Fatalf("expected 1 restart call, got %d", len(restarts)) + } + if restarts[0] != "dep-restart" { + t.Errorf("expected restart for 'dep-restart', got %q", restarts[0]) + } + + // Verify no replica status UPDATE was issued (only event INSERT) execCalls := db.getExecCalls() - if len(execCalls) != 0 { - t.Errorf("expected 0 exec calls with <3 failures, got %d", len(execCalls)) + for _, call := range execCalls { + if strings.Contains(call.query, "UPDATE deployment_replicas") { + t.Error("should not update replica status when restart succeeds") + } } } -func TestCheckConsecutiveFailures_ThreeConsecutive(t *testing.T) { +func TestHandleUnhealthy_MarksReplicaFailedAfterRestartLimit(t *testing.T) { db := &mockDB{ queryFunc: func(dest interface{}, query string, args ...interface{}) error { - appendRows(dest, []map[string]interface{}{ - {"Status": "unhealthy"}, - {"Status": "unhealthy"}, - {"Status": "unhealthy"}, - }) + // Return count of 1 active replica (so deployment becomes degraded, not failed) + if strings.Contains(query, "COUNT(*)") { + appendRows(dest, []map[string]interface{}{ + {"Count": 1}, + }) + } return nil }, - execFunc: func(query string, args ...interface{}) (interface{}, error) { - return nil, nil + } + pm := &mockProcessManager{} + hc := NewHealthChecker(db, zap.NewNop(), "node-1", pm) + + dep := deploymentRow{ + ID: "dep-limited", + Namespace: "test", + Name: "my-app", + Type: "nextjs", + Port: 10001, + RestartPolicy: "on-failure", + MaxRestartCount: 1, // Only 1 restart allowed + ReplicaStatus: "active", + } + + ctx := context.Background() + + // First 3 misses -> restart (limit=1, attempt 1) + for i := 0; i < consecutiveFailuresThreshold; i++ { + hc.handleUnhealthy(ctx, dep) + } + + // Should have restarted once + if len(pm.getRestartCalls()) != 1 { + t.Fatalf("expected 1 restart call, got %d", len(pm.getRestartCalls())) + } + + // Next 3 misses -> restart limit exhausted, mark replica failed + for i := 0; i < consecutiveFailuresThreshold; i++ { + hc.handleUnhealthy(ctx, dep) + } + + // Verify replica was marked failed + execCalls := db.getExecCalls() + foundReplicaUpdate := false + foundDeploymentUpdate := false + for _, call := range execCalls { + if strings.Contains(call.query, "UPDATE deployment_replicas") && strings.Contains(call.query, "'failed'") { + foundReplicaUpdate = true + } + if strings.Contains(call.query, "UPDATE deployments") { + foundDeploymentUpdate = true + } + } + + if !foundReplicaUpdate { + t.Error("expected UPDATE deployment_replicas SET status = 'failed'") + } + if !foundDeploymentUpdate { + t.Error("expected UPDATE deployments to recalculate status") + } + + // Should NOT have restarted again (limit was 1) + if len(pm.getRestartCalls()) != 1 { + t.Errorf("expected still 1 restart call, got %d", len(pm.getRestartCalls())) + } +} + +func TestHandleUnhealthy_NeverRestart(t *testing.T) { + db := &mockDB{ + queryFunc: func(dest interface{}, query string, args ...interface{}) error { + if strings.Contains(query, "COUNT(*)") { + appendRows(dest, []map[string]interface{}{ + {"Count": 0}, + }) + } + return nil }, } + pm := &mockProcessManager{} + hc := NewHealthChecker(db, zap.NewNop(), "node-1", pm) - hc := NewHealthChecker(db, zap.NewNop()) - hc.checkConsecutiveFailures(context.Background(), "dep-99", false) + dep := deploymentRow{ + ID: "dep-never", + Namespace: "test", + Name: "no-restart-app", + Type: "nextjs", + Port: 10001, + RestartPolicy: "never", + MaxRestartCount: 10, + ReplicaStatus: "active", + } + ctx := context.Background() + + // 3 misses should immediately mark failed without restart + for i := 0; i < consecutiveFailuresThreshold; i++ { + hc.handleUnhealthy(ctx, dep) + } + + // No restart calls + if len(pm.getRestartCalls()) != 0 { + t.Errorf("expected 0 restart calls with policy=never, got %d", len(pm.getRestartCalls())) + } + + // Verify replica was marked failed execCalls := db.getExecCalls() + foundReplicaUpdate := false + for _, call := range execCalls { + if strings.Contains(call.query, "UPDATE deployment_replicas") && strings.Contains(call.query, "'failed'") { + foundReplicaUpdate = true + } + } + if !foundReplicaUpdate { + t.Error("expected replica to be marked failed immediately") + } +} - // Expect 2 exec calls: one UPDATE (mark failed) + one INSERT (event). - if len(execCalls) != 2 { - t.Fatalf("expected 2 exec calls (update + event), got %d", len(execCalls)) +// ---- e) handleHealthy ----------------------------------------------------- + +func TestHandleHealthy_ResetsCounters(t *testing.T) { + db := &mockDB{} + pm := &mockProcessManager{} + hc := NewHealthChecker(db, zap.NewNop(), "node-1", pm) + + dep := deploymentRow{ + ID: "dep-reset", + Namespace: "test", + Name: "flaky-app", + Type: "nextjs", + Port: 10001, + RestartPolicy: "on-failure", + MaxRestartCount: 3, + ReplicaStatus: "active", } - // First call: UPDATE deployments SET status = 'failed' - if !strings.Contains(execCalls[0].query, "UPDATE deployments") { - t.Errorf("expected UPDATE deployments query, got: %s", execCalls[0].query) + ctx := context.Background() + + // 2 misses (below threshold) + hc.handleUnhealthy(ctx, dep) + hc.handleUnhealthy(ctx, dep) + + // Health recovered + hc.handleHealthy(ctx, dep) + + // 2 more misses — should NOT trigger restart (counters were reset) + hc.handleUnhealthy(ctx, dep) + hc.handleUnhealthy(ctx, dep) + + if len(pm.getRestartCalls()) != 0 { + t.Errorf("expected 0 restart calls after counter reset, got %d", len(pm.getRestartCalls())) } - if !strings.Contains(execCalls[0].query, "status = 'failed'") { - t.Errorf("expected status='failed' in query, got: %s", execCalls[0].query) +} + +func TestHandleHealthy_RecoversFailedReplica(t *testing.T) { + callCount := 0 + db := &mockDB{ + queryFunc: func(dest interface{}, query string, args ...interface{}) error { + if strings.Contains(query, "COUNT(*)") { + callCount++ + if callCount == 1 { + // First COUNT: over-replication check — 1 active (under-replicated, allow recovery) + appendRows(dest, []map[string]interface{}{{"Count": 1}}) + } else { + // Second COUNT: recalculateDeploymentStatus — now 2 active after recovery + appendRows(dest, []map[string]interface{}{{"Count": 2}}) + } + } + return nil + }, + } + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) + + dep := deploymentRow{ + ID: "dep-recover", + Namespace: "test", + Name: "recovered-app", + ReplicaStatus: "failed", // Was failed, now passing health check } - // Second call: INSERT INTO deployment_events - if !strings.Contains(execCalls[1].query, "INSERT INTO deployment_events") { - t.Errorf("expected INSERT INTO deployment_events, got: %s", execCalls[1].query) + ctx := context.Background() + hc.handleHealthy(ctx, dep) + + // Verify replica was updated back to 'active' + execCalls := db.getExecCalls() + foundReplicaRecovery := false + foundEvent := false + for _, call := range execCalls { + if strings.Contains(call.query, "UPDATE deployment_replicas") && strings.Contains(call.query, "'active'") { + foundReplicaRecovery = true + } + if strings.Contains(call.query, "replica_recovered") { + foundEvent = true + } } - if !strings.Contains(execCalls[1].query, "health_failed") { - t.Errorf("expected health_failed event_type, got: %s", execCalls[1].query) + if !foundReplicaRecovery { + t.Error("expected UPDATE deployment_replicas SET status = 'active'") + } + if !foundEvent { + t.Error("expected replica_recovered event") + } +} + +func TestHandleHealthy_StopsZombieReplicaWhenAlreadyReplaced(t *testing.T) { + db := &mockDB{ + queryFunc: func(dest interface{}, query string, args ...interface{}) error { + if strings.Contains(query, "COUNT(*)") { + // 2 active replicas already exist — this replica was replaced + appendRows(dest, []map[string]interface{}{{"Count": 2}}) + } + return nil + }, + } + pm := &mockProcessManager{} + hc := NewHealthChecker(db, zap.NewNop(), "node-zombie", pm) + + dep := deploymentRow{ + ID: "dep-zombie", + Namespace: "test", + Name: "zombie-app", + Type: "nextjs", + Port: 10001, + ReplicaStatus: "failed", // Was failed, but process is running (systemd Restart=always) } - // Verify the deployment ID was passed to both queries. - for i, call := range execCalls { - found := false - for _, arg := range call.args { - if arg == "dep-99" { - found = true - break + ctx := context.Background() + hc.handleHealthy(ctx, dep) + + // Verify Stop was called (not Restart) + stopCalls := pm.getStopCalls() + if len(stopCalls) != 1 { + t.Fatalf("expected 1 Stop call, got %d", len(stopCalls)) + } + if stopCalls[0] != "dep-zombie" { + t.Errorf("expected Stop for 'dep-zombie', got %q", stopCalls[0]) + } + + // Verify replica row was DELETED (not updated to active) + execCalls := db.getExecCalls() + foundDelete := false + foundZombieEvent := false + for _, call := range execCalls { + if strings.Contains(call.query, "DELETE FROM deployment_replicas") { + foundDelete = true + // Verify the right deployment and node + if len(call.args) >= 2 { + if call.args[0] != "dep-zombie" || call.args[1] != "node-zombie" { + t.Errorf("DELETE args: got (%v, %v), want (dep-zombie, node-zombie)", call.args[0], call.args[1]) + } } } - if !found { - t.Errorf("exec call %d: expected deployment id 'dep-99' in args %v", i, call.args) + if strings.Contains(call.query, "zombie_replica_stopped") { + foundZombieEvent = true + } + // Should NOT recover to active + if strings.Contains(call.query, "UPDATE deployment_replicas") && strings.Contains(call.query, "'active'") { + t.Error("should NOT update replica to active when it's a zombie") } } -} - -func TestCheckConsecutiveFailures_MixedResults(t *testing.T) { - db := &mockDB{ - queryFunc: func(dest interface{}, query string, args ...interface{}) error { - // 3 rows but NOT all unhealthy — no action should be taken. - appendRows(dest, []map[string]interface{}{ - {"Status": "unhealthy"}, - {"Status": "healthy"}, - {"Status": "unhealthy"}, - }) - return nil - }, + if !foundDelete { + t.Error("expected DELETE FROM deployment_replicas for zombie replica") + } + if !foundZombieEvent { + t.Error("expected zombie_replica_stopped event") } - hc := NewHealthChecker(db, zap.NewNop()) - hc.checkConsecutiveFailures(context.Background(), "dep-mixed", false) + // Verify no Restart calls + if len(pm.getRestartCalls()) != 0 { + t.Errorf("expected 0 restart calls, got %d", len(pm.getRestartCalls())) + } +} + +// ---- f) recordHealthCheck ------------------------------------------------- + +func TestRecordHealthCheck_IncludesNodeID(t *testing.T) { + db := &mockDB{} + hc := NewHealthChecker(db, zap.NewNop(), "node-xyz", nil) + + hc.recordHealthCheck(context.Background(), "dep-1", true) execCalls := db.getExecCalls() - if len(execCalls) != 0 { - t.Errorf("expected 0 exec calls with mixed results, got %d", len(execCalls)) + if len(execCalls) != 1 { + t.Fatalf("expected 1 exec call, got %d", len(execCalls)) + } + + q := execCalls[0].query + if !strings.Contains(q, "node_id") { + t.Errorf("expected INSERT to include node_id column, got: %s", q) + } + + // Verify node_id is the second arg (after deployment_id) + if len(execCalls[0].args) < 2 { + t.Fatal("expected at least 2 args") + } + if nodeID, ok := execCalls[0].args[1].(string); !ok || nodeID != "node-xyz" { + t.Errorf("expected node_id arg 'node-xyz', got %v", execCalls[0].args[1]) } } -// ---- d) GetHealthStatus --------------------------------------------------- +// ---- g) GetHealthStatus --------------------------------------------------- func TestGetHealthStatus_ReturnsChecks(t *testing.T) { now := time.Now().Truncate(time.Second) @@ -368,7 +668,7 @@ func TestGetHealthStatus_ReturnsChecks(t *testing.T) { }, } - hc := NewHealthChecker(db, zap.NewNop()) + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) checks, err := hc.GetHealthStatus(context.Background(), "dep-1", 10) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -391,20 +691,16 @@ func TestGetHealthStatus_ReturnsChecks(t *testing.T) { if checks[1].Status != "unhealthy" { t.Errorf("checks[1].Status = %q, want %q", checks[1].Status, "unhealthy") } - if checks[1].ResponseTimeMs != 5001 { - t.Errorf("checks[1].ResponseTimeMs = %d, want 5001", checks[1].ResponseTimeMs) - } } func TestGetHealthStatus_EmptyList(t *testing.T) { db := &mockDB{ queryFunc: func(dest interface{}, query string, args ...interface{}) error { - // Don't populate dest — leave the slice empty. return nil }, } - hc := NewHealthChecker(db, zap.NewNop()) + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) checks, err := hc.GetHealthStatus(context.Background(), "dep-empty", 10) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -422,7 +718,7 @@ func TestGetHealthStatus_DatabaseError(t *testing.T) { }, } - hc := NewHealthChecker(db, zap.NewNop()) + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) _, err := hc.GetHealthStatus(context.Background(), "dep-err", 10) if err == nil { t.Fatal("expected error from GetHealthStatus") @@ -432,6 +728,199 @@ func TestGetHealthStatus_DatabaseError(t *testing.T) { } } +// ---- h) reconcileDeployments ---------------------------------------------- + +type mockReconciler struct { + mu sync.Mutex + selectCalls []string // primaryNodeIDs + selectResult []string + selectErr error + updateStatusCalls []struct { + deploymentID string + nodeID string + status deployments.ReplicaStatus + } +} + +func (m *mockReconciler) SelectReplicaNodes(_ context.Context, primaryNodeID string, _ int) ([]string, error) { + m.mu.Lock() + m.selectCalls = append(m.selectCalls, primaryNodeID) + m.mu.Unlock() + return m.selectResult, m.selectErr +} + +func (m *mockReconciler) UpdateReplicaStatus(_ context.Context, deploymentID, nodeID string, status deployments.ReplicaStatus) error { + m.mu.Lock() + m.updateStatusCalls = append(m.updateStatusCalls, struct { + deploymentID string + nodeID string + status deployments.ReplicaStatus + }{deploymentID, nodeID, status}) + m.mu.Unlock() + return nil +} + +type mockProvisioner struct { + mu sync.Mutex + setupCalls []struct { + deploymentID string + nodeID string + } +} + +func (m *mockProvisioner) SetupDynamicReplica(_ context.Context, dep *deployments.Deployment, nodeID string) { + m.mu.Lock() + m.setupCalls = append(m.setupCalls, struct { + deploymentID string + nodeID string + }{dep.ID, nodeID}) + m.mu.Unlock() +} + +func TestReconcileDeployments_UnderReplicated(t *testing.T) { + // Start a mock RQLite status endpoint that reports Leader + leaderSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Write([]byte(`{"store":{"raft":{"state":"Leader"}}}`)) + })) + defer leaderSrv.Close() + + db := &mockDB{ + queryFunc: func(dest interface{}, query string, args ...interface{}) error { + if strings.Contains(query, "active_replicas") { + appendRows(dest, []map[string]interface{}{ + { + "ID": "dep-under", + "Namespace": "test", + "Name": "under-app", + "Type": "nextjs", + "HomeNodeID": "node-home", + "ContentCID": "cid-123", + "BuildCID": "", + "Environment": "", + "Port": 10001, + "HealthCheckPath": "/health", + "MemoryLimitMB": 256, + "CPULimitPercent": 50, + "RestartPolicy": "on-failure", + "MaxRestartCount": 10, + "ActiveReplicas": 1, // Under-replicated (desired=2) + }, + }) + } + return nil + }, + } + + rc := &mockReconciler{selectResult: []string{"node-new"}} + rp := &mockProvisioner{} + + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) + hc.SetReconciler(leaderSrv.URL, rc, rp) + + hc.reconcileDeployments(context.Background()) + + // Wait briefly for the goroutine to fire + time.Sleep(50 * time.Millisecond) + + // Verify SelectReplicaNodes was called + rc.mu.Lock() + selectCount := len(rc.selectCalls) + rc.mu.Unlock() + if selectCount != 1 { + t.Fatalf("expected 1 SelectReplicaNodes call, got %d", selectCount) + } + + // Verify SetupDynamicReplica was called + rp.mu.Lock() + setupCount := len(rp.setupCalls) + rp.mu.Unlock() + if setupCount != 1 { + t.Fatalf("expected 1 SetupDynamicReplica call, got %d", setupCount) + } + rp.mu.Lock() + if rp.setupCalls[0].deploymentID != "dep-under" { + t.Errorf("expected deployment 'dep-under', got %q", rp.setupCalls[0].deploymentID) + } + if rp.setupCalls[0].nodeID != "node-new" { + t.Errorf("expected node 'node-new', got %q", rp.setupCalls[0].nodeID) + } + rp.mu.Unlock() +} + +func TestReconcileDeployments_FullyReplicated(t *testing.T) { + leaderSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Write([]byte(`{"store":{"raft":{"state":"Leader"}}}`)) + })) + defer leaderSrv.Close() + + db := &mockDB{ + queryFunc: func(dest interface{}, query string, args ...interface{}) error { + if strings.Contains(query, "active_replicas") { + appendRows(dest, []map[string]interface{}{ + { + "ID": "dep-full", + "Namespace": "test", + "Name": "full-app", + "Type": "nextjs", + "HomeNodeID": "node-home", + "ContentCID": "cid-456", + "BuildCID": "", + "Environment": "", + "Port": 10002, + "HealthCheckPath": "/health", + "MemoryLimitMB": 256, + "CPULimitPercent": 50, + "RestartPolicy": "on-failure", + "MaxRestartCount": 10, + "ActiveReplicas": 2, // Fully replicated + }, + }) + } + return nil + }, + } + + rc := &mockReconciler{selectResult: []string{"node-new"}} + rp := &mockProvisioner{} + + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) + hc.SetReconciler(leaderSrv.URL, rc, rp) + + hc.reconcileDeployments(context.Background()) + + time.Sleep(50 * time.Millisecond) + + // Should NOT trigger re-replication + rc.mu.Lock() + if len(rc.selectCalls) != 0 { + t.Errorf("expected 0 SelectReplicaNodes calls for fully replicated deployment, got %d", len(rc.selectCalls)) + } + rc.mu.Unlock() +} + +func TestReconcileDeployments_NotLeader(t *testing.T) { + // Not-leader RQLite status + followerSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Write([]byte(`{"store":{"raft":{"state":"Follower"}}}`)) + })) + defer followerSrv.Close() + + db := &mockDB{} + rc := &mockReconciler{} + rp := &mockProvisioner{} + + hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil) + hc.SetReconciler(followerSrv.URL, rc, rp) + + hc.reconcileDeployments(context.Background()) + + // Should not query deployments at all + calls := db.getQueryCalls() + if len(calls) != 0 { + t.Errorf("expected 0 query calls on follower, got %d", len(calls)) + } +} + // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -439,10 +928,8 @@ func TestGetHealthStatus_DatabaseError(t *testing.T) { // serverPort extracts the port number from an httptest.Server. func serverPort(t *testing.T, srv *httptest.Server) int { t.Helper() - // URL is http://127.0.0.1: addr := srv.Listener.Addr().String() var port int - // addr is "127.0.0.1:PORT" _, err := fmt.Sscanf(addr[strings.LastIndex(addr, ":")+1:], "%d", &port) if err != nil { t.Fatalf("failed to parse port from %q: %v", addr, err) diff --git a/pkg/deployments/types.go b/pkg/deployments/types.go index c34fbd9..f4768e9 100644 --- a/pkg/deployments/types.go +++ b/pkg/deployments/types.go @@ -25,6 +25,7 @@ const ( DeploymentStatusDeploying DeploymentStatus = "deploying" DeploymentStatusActive DeploymentStatus = "active" DeploymentStatusFailed DeploymentStatus = "failed" + DeploymentStatusDegraded DeploymentStatus = "degraded" DeploymentStatusStopped DeploymentStatus = "stopped" DeploymentStatusUpdating DeploymentStatus = "updating" ) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 8d85382..5ef414e 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -500,7 +500,8 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { ) // Start health checker - gw.healthChecker = health.NewHealthChecker(dbAdapter, logger.Logger) + gw.healthChecker = health.NewHealthChecker(dbAdapter, logger.Logger, cfg.NodePeerID, gw.processManager) + gw.healthChecker.SetReconciler(cfg.RQLiteDSN, gw.replicaManager, gw.deploymentService) go gw.healthChecker.Start(context.Background()) logger.ComponentInfo(logging.ComponentGeneral, "Deployment system initialized") diff --git a/pkg/gateway/handlers/deployments/service.go b/pkg/gateway/handlers/deployments/service.go index 906c482..3d10dd4 100644 --- a/pkg/gateway/handlers/deployments/service.go +++ b/pkg/gateway/handlers/deployments/service.go @@ -333,13 +333,13 @@ func (s *DeploymentService) createDeploymentReplicas(ctx context.Context, deploy } } else { // Dynamic deployments: fan out to the secondary node to set up the process - go s.setupDynamicReplica(ctx, deployment, nodeID) + go s.SetupDynamicReplica(ctx, deployment, nodeID) } } } -// setupDynamicReplica calls the secondary node's internal API to set up a deployment replica. -func (s *DeploymentService) setupDynamicReplica(ctx context.Context, deployment *deployments.Deployment, nodeID string) { +// SetupDynamicReplica calls the secondary node's internal API to set up a deployment replica. +func (s *DeploymentService) SetupDynamicReplica(ctx context.Context, deployment *deployments.Deployment, nodeID string) { nodeIP, err := s.replicaManager.GetNodeIP(ctx, nodeID) if err != nil { s.logger.Error("Failed to get node IP for replica setup", diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index b442737..0b3be80 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -1125,7 +1125,7 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de SELECT id, namespace, name, type, port, content_cid, status, home_node_id, subdomain FROM deployments WHERE subdomain = ? - AND status = 'active' + AND status IN ('active', 'degraded') LIMIT 1 ` result, err := db.Query(internalCtx, query, subdomainOrName) @@ -1149,7 +1149,7 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de SELECT id, namespace, name, type, port, content_cid, status, home_node_id, subdomain FROM deployments WHERE name = ? - AND status = 'active' + AND status IN ('active', 'degraded') LIMIT 1 ` result, err = db.Query(internalCtx, query, subdomainOrName) @@ -1177,7 +1177,7 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de FROM deployments d JOIN deployment_domains dd ON d.id = dd.deployment_id WHERE dd.domain = ? AND dd.verified_at IS NOT NULL - AND d.status = 'active' + AND d.status IN ('active', 'degraded') LIMIT 1 ` result, err := db.Query(internalCtx, query, domain)