Enhance health checker and deployment handling

- Added support for "degraded" deployment status in types.
- Updated health checker initialization to include process manager and node ID.
- Refactored health checker tests to accommodate new process manager functionality.
- Implemented logic to handle unhealthy deployments, including restart and failure marking.
- Enhanced deployment reconciliation to manage under-replicated scenarios.
- Updated gateway handlers and middleware to consider "degraded" status in deployment queries.
This commit is contained in:
anonpenguin23 2026-02-20 09:44:07 +02:00
parent 4ebf558719
commit c499b2d76e
6 changed files with 1016 additions and 186 deletions

View File

@ -2,16 +2,41 @@ package health
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
"time" "time"
"github.com/DeBrosOfficial/network/pkg/database" "github.com/DeBrosOfficial/network/pkg/database"
"github.com/DeBrosOfficial/network/pkg/deployments"
"go.uber.org/zap" "go.uber.org/zap"
) )
// deploymentRow represents a deployment record for health checking // Tuning constants.
const (
consecutiveFailuresThreshold = 3
defaultDesiredReplicas = deployments.DefaultReplicaCount
)
// ProcessManager is the subset of process.Manager needed by the health checker.
type ProcessManager interface {
Restart(ctx context.Context, deployment *deployments.Deployment) error
Stop(ctx context.Context, deployment *deployments.Deployment) error
}
// ReplicaReconciler provides replica management for the reconciliation loop.
type ReplicaReconciler interface {
SelectReplicaNodes(ctx context.Context, primaryNodeID string, count int) ([]string, error)
UpdateReplicaStatus(ctx context.Context, deploymentID, nodeID string, status deployments.ReplicaStatus) error
}
// ReplicaProvisioner provisions new replicas on remote nodes.
type ReplicaProvisioner interface {
SetupDynamicReplica(ctx context.Context, deployment *deployments.Deployment, nodeID string)
}
// deploymentRow represents a deployment record for health checking.
type deploymentRow struct { type deploymentRow struct {
ID string `db:"id"` ID string `db:"id"`
Namespace string `db:"namespace"` Namespace string `db:"namespace"`
@ -20,57 +45,101 @@ type deploymentRow struct {
Port int `db:"port"` Port int `db:"port"`
HealthCheckPath string `db:"health_check_path"` HealthCheckPath string `db:"health_check_path"`
HomeNodeID string `db:"home_node_id"` HomeNodeID string `db:"home_node_id"`
RestartPolicy string `db:"restart_policy"`
MaxRestartCount int `db:"max_restart_count"`
ReplicaStatus string `db:"replica_status"`
} }
// HealthChecker monitors deployment health // replicaState tracks in-memory health state for a deployment on this node.
type replicaState struct {
consecutiveMisses int
restartCount int
}
// HealthChecker monitors deployment health on the local node.
type HealthChecker struct { type HealthChecker struct {
db database.Database db database.Database
logger *zap.Logger logger *zap.Logger
workers int workers int
mu sync.RWMutex nodeID string
active map[string]bool // deployment_id -> is_active processManager ProcessManager
// In-memory per-replica state (keyed by deployment ID)
stateMu sync.Mutex
states map[string]*replicaState
// Reconciliation (optional, set via SetReconciler)
rqliteDSN string
reconciler ReplicaReconciler
provisioner ReplicaProvisioner
} }
// NewHealthChecker creates a new health checker // NewHealthChecker creates a new health checker.
func NewHealthChecker(db database.Database, logger *zap.Logger) *HealthChecker { func NewHealthChecker(db database.Database, logger *zap.Logger, nodeID string, pm ProcessManager) *HealthChecker {
return &HealthChecker{ return &HealthChecker{
db: db, db: db,
logger: logger, logger: logger,
workers: 10, workers: 10,
active: make(map[string]bool), nodeID: nodeID,
processManager: pm,
states: make(map[string]*replicaState),
} }
} }
// Start begins health monitoring // SetReconciler configures the reconciliation loop dependencies (optional).
// Must be called before Start() if re-replication is desired.
func (hc *HealthChecker) SetReconciler(rqliteDSN string, rc ReplicaReconciler, rp ReplicaProvisioner) {
hc.rqliteDSN = rqliteDSN
hc.reconciler = rc
hc.provisioner = rp
}
// Start begins health monitoring with two periodic tasks:
// 1. Every 30s: probe local replicas
// 2. Every 5m: (leader-only) reconcile under-replicated deployments
func (hc *HealthChecker) Start(ctx context.Context) error { func (hc *HealthChecker) Start(ctx context.Context) error {
hc.logger.Info("Starting health checker", zap.Int("workers", hc.workers)) hc.logger.Info("Starting health checker",
zap.Int("workers", hc.workers),
zap.String("node_id", hc.nodeID),
)
ticker := time.NewTicker(30 * time.Second) probeTicker := time.NewTicker(30 * time.Second)
defer ticker.Stop() reconcileTicker := time.NewTicker(5 * time.Minute)
defer probeTicker.Stop()
defer reconcileTicker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
hc.logger.Info("Health checker stopped") hc.logger.Info("Health checker stopped")
return ctx.Err() return ctx.Err()
case <-ticker.C: case <-probeTicker.C:
if err := hc.checkAllDeployments(ctx); err != nil { if err := hc.checkAllDeployments(ctx); err != nil {
hc.logger.Error("Health check cycle failed", zap.Error(err)) hc.logger.Error("Health check cycle failed", zap.Error(err))
} }
case <-reconcileTicker.C:
hc.reconcileDeployments(ctx)
} }
} }
} }
// checkAllDeployments checks all active deployments // checkAllDeployments checks all deployments with active or failed replicas on this node.
func (hc *HealthChecker) checkAllDeployments(ctx context.Context) error { func (hc *HealthChecker) checkAllDeployments(ctx context.Context) error {
var rows []deploymentRow var rows []deploymentRow
query := ` query := `
SELECT id, namespace, name, type, port, health_check_path, home_node_id SELECT d.id, d.namespace, d.name, d.type, dr.port,
FROM deployments d.health_check_path, d.home_node_id,
WHERE status = 'active' AND type IN ('nextjs', 'nodejs-backend', 'go-backend') d.restart_policy, d.max_restart_count,
dr.status as replica_status
FROM deployments d
JOIN deployment_replicas dr ON d.id = dr.deployment_id
WHERE d.status IN ('active', 'degraded')
AND dr.node_id = ?
AND dr.status IN ('active', 'failed')
AND d.type IN ('nextjs', 'nodejs-backend', 'go-backend')
` `
err := hc.db.Query(ctx, &rows, query) err := hc.db.Query(ctx, &rows, query, hc.nodeID)
if err != nil { if err != nil {
return fmt.Errorf("failed to query deployments: %w", err) return fmt.Errorf("failed to query deployments: %w", err)
} }
@ -90,6 +159,12 @@ func (hc *HealthChecker) checkAllDeployments(ctx context.Context) error {
healthy := hc.checkDeployment(ctx, r) healthy := hc.checkDeployment(ctx, r)
hc.recordHealthCheck(ctx, r.ID, healthy) hc.recordHealthCheck(ctx, r.ID, healthy)
if healthy {
hc.handleHealthy(ctx, r)
} else {
hc.handleUnhealthy(ctx, r)
}
}(row) }(row)
} }
@ -97,7 +172,7 @@ func (hc *HealthChecker) checkAllDeployments(ctx context.Context) error {
return nil return nil
} }
// checkDeployment checks a single deployment // checkDeployment checks a single deployment's health via HTTP.
func (hc *HealthChecker) checkDeployment(ctx context.Context, dep deploymentRow) bool { func (hc *HealthChecker) checkDeployment(ctx context.Context, dep deploymentRow) bool {
if dep.Port == 0 { if dep.Port == 0 {
// Static deployments are always healthy // Static deployments are always healthy
@ -144,7 +219,208 @@ func (hc *HealthChecker) checkDeployment(ctx context.Context, dep deploymentRow)
return healthy return healthy
} }
// recordHealthCheck records the health check result // handleHealthy processes a healthy check result.
func (hc *HealthChecker) handleHealthy(ctx context.Context, dep deploymentRow) {
hc.stateMu.Lock()
delete(hc.states, dep.ID)
hc.stateMu.Unlock()
// If the replica was in 'failed' state but is now healthy, decide:
// recover it or stop it (if a replacement already exists).
if dep.ReplicaStatus == "failed" {
// Count how many active replicas this deployment already has.
type countRow struct {
Count int `db:"c"`
}
var rows []countRow
countQuery := `SELECT COUNT(*) as c FROM deployment_replicas WHERE deployment_id = ? AND status = 'active'`
if err := hc.db.Query(ctx, &rows, countQuery, dep.ID); err != nil {
hc.logger.Error("Failed to count active replicas for recovery check", zap.Error(err))
return
}
activeCount := 0
if len(rows) > 0 {
activeCount = rows[0].Count
}
if activeCount >= defaultDesiredReplicas {
// This replica was replaced while the node was down.
// Stop the zombie process and remove the stale replica row.
hc.logger.Warn("Zombie replica detected — node was replaced, stopping process",
zap.String("deployment", dep.Name),
zap.String("node_id", hc.nodeID),
zap.Int("active_replicas", activeCount),
)
if hc.processManager != nil {
d := &deployments.Deployment{
ID: dep.ID,
Namespace: dep.Namespace,
Name: dep.Name,
Type: deployments.DeploymentType(dep.Type),
Port: dep.Port,
}
if err := hc.processManager.Stop(ctx, d); err != nil {
hc.logger.Error("Failed to stop zombie deployment process", zap.Error(err))
}
}
deleteQuery := `DELETE FROM deployment_replicas WHERE deployment_id = ? AND node_id = ? AND status = 'failed'`
hc.db.Exec(ctx, deleteQuery, dep.ID, hc.nodeID)
eventQuery := `INSERT INTO deployment_events (deployment_id, event_type, message, created_at) VALUES (?, 'zombie_replica_stopped', ?, ?)`
msg := fmt.Sprintf("Zombie replica on node %s stopped and removed (already at %d active replicas)", hc.nodeID, activeCount)
hc.db.Exec(ctx, eventQuery, dep.ID, msg, time.Now())
return
}
// Under-replicated — genuine recovery. Bring this replica back.
hc.logger.Info("Failed replica recovered, marking active",
zap.String("deployment", dep.Name),
zap.String("node_id", hc.nodeID),
)
replicaQuery := `UPDATE deployment_replicas SET status = 'active', updated_at = ? WHERE deployment_id = ? AND node_id = ?`
if _, err := hc.db.Exec(ctx, replicaQuery, time.Now(), dep.ID, hc.nodeID); err != nil {
hc.logger.Error("Failed to recover replica status", zap.Error(err))
return
}
// Recalculate deployment status — may go from 'degraded' back to 'active'
hc.recalculateDeploymentStatus(ctx, dep.ID)
eventQuery := `INSERT INTO deployment_events (deployment_id, event_type, message, created_at) VALUES (?, 'replica_recovered', ?, ?)`
msg := fmt.Sprintf("Replica on node %s recovered and marked active", hc.nodeID)
hc.db.Exec(ctx, eventQuery, dep.ID, msg, time.Now())
}
}
// handleUnhealthy processes an unhealthy check result.
func (hc *HealthChecker) handleUnhealthy(ctx context.Context, dep deploymentRow) {
// Don't take action on already-failed replicas
if dep.ReplicaStatus == "failed" {
return
}
hc.stateMu.Lock()
st, exists := hc.states[dep.ID]
if !exists {
st = &replicaState{}
hc.states[dep.ID] = st
}
st.consecutiveMisses++
misses := st.consecutiveMisses
restarts := st.restartCount
hc.stateMu.Unlock()
if misses < consecutiveFailuresThreshold {
return
}
// Reached threshold — decide: restart or mark failed
maxRestarts := dep.MaxRestartCount
if maxRestarts == 0 {
maxRestarts = deployments.DefaultMaxRestartCount
}
canRestart := dep.RestartPolicy != string(deployments.RestartPolicyNever) &&
restarts < maxRestarts &&
hc.processManager != nil
if canRestart {
hc.logger.Info("Attempting restart for unhealthy deployment",
zap.String("deployment", dep.Name),
zap.Int("restart_attempt", restarts+1),
zap.Int("max_restarts", maxRestarts),
)
// Build minimal Deployment struct for process manager
d := &deployments.Deployment{
ID: dep.ID,
Namespace: dep.Namespace,
Name: dep.Name,
Type: deployments.DeploymentType(dep.Type),
Port: dep.Port,
}
if err := hc.processManager.Restart(ctx, d); err != nil {
hc.logger.Error("Failed to restart deployment",
zap.String("deployment", dep.Name),
zap.Error(err),
)
}
hc.stateMu.Lock()
st.restartCount++
st.consecutiveMisses = 0
hc.stateMu.Unlock()
eventQuery := `INSERT INTO deployment_events (deployment_id, event_type, message, created_at) VALUES (?, 'health_restart', ?, ?)`
msg := fmt.Sprintf("Process restarted on node %s (attempt %d/%d)", hc.nodeID, restarts+1, maxRestarts)
hc.db.Exec(ctx, eventQuery, dep.ID, msg, time.Now())
return
}
// Restart limit exhausted (or policy is "never") — mark THIS replica as failed
hc.logger.Error("Marking replica as failed after exhausting restarts",
zap.String("deployment", dep.Name),
zap.String("node_id", hc.nodeID),
zap.Int("restarts_attempted", restarts),
)
replicaQuery := `UPDATE deployment_replicas SET status = 'failed', updated_at = ? WHERE deployment_id = ? AND node_id = ?`
if _, err := hc.db.Exec(ctx, replicaQuery, time.Now(), dep.ID, hc.nodeID); err != nil {
hc.logger.Error("Failed to mark replica as failed", zap.Error(err))
}
// Recalculate deployment status based on remaining active replicas
hc.recalculateDeploymentStatus(ctx, dep.ID)
eventQuery := `INSERT INTO deployment_events (deployment_id, event_type, message, created_at) VALUES (?, 'replica_failed', ?, ?)`
msg := fmt.Sprintf("Replica on node %s marked failed after %d restart attempts", hc.nodeID, restarts)
hc.db.Exec(ctx, eventQuery, dep.ID, msg, time.Now())
}
// recalculateDeploymentStatus sets the deployment to 'active', 'degraded', or 'failed'
// based on the number of remaining active replicas.
func (hc *HealthChecker) recalculateDeploymentStatus(ctx context.Context, deploymentID string) {
type countRow struct {
Count int `db:"c"`
}
var rows []countRow
countQuery := `SELECT COUNT(*) as c FROM deployment_replicas WHERE deployment_id = ? AND status = 'active'`
if err := hc.db.Query(ctx, &rows, countQuery, deploymentID); err != nil {
hc.logger.Error("Failed to count active replicas", zap.Error(err))
return
}
activeCount := 0
if len(rows) > 0 {
activeCount = rows[0].Count
}
var newStatus string
switch {
case activeCount == 0:
newStatus = "failed"
case activeCount < defaultDesiredReplicas:
newStatus = "degraded"
default:
newStatus = "active"
}
updateQuery := `UPDATE deployments SET status = ?, updated_at = ? WHERE id = ?`
if _, err := hc.db.Exec(ctx, updateQuery, newStatus, time.Now(), deploymentID); err != nil {
hc.logger.Error("Failed to update deployment status",
zap.String("deployment", deploymentID),
zap.String("new_status", newStatus),
zap.Error(err),
)
}
}
// recordHealthCheck records the health check result in the database.
func (hc *HealthChecker) recordHealthCheck(ctx context.Context, deploymentID string, healthy bool) { func (hc *HealthChecker) recordHealthCheck(ctx context.Context, deploymentID string, healthy bool) {
status := "healthy" status := "healthy"
if !healthy { if !healthy {
@ -152,89 +428,154 @@ func (hc *HealthChecker) recordHealthCheck(ctx context.Context, deploymentID str
} }
query := ` query := `
INSERT INTO deployment_health_checks (deployment_id, status, checked_at, response_time_ms) INSERT INTO deployment_health_checks (deployment_id, node_id, status, checked_at, response_time_ms)
VALUES (?, ?, ?, ?) VALUES (?, ?, ?, ?, ?)
` `
_, err := hc.db.Exec(ctx, query, deploymentID, status, time.Now(), 0) _, err := hc.db.Exec(ctx, query, deploymentID, hc.nodeID, status, time.Now(), 0)
if err != nil { if err != nil {
hc.logger.Error("Failed to record health check", hc.logger.Error("Failed to record health check",
zap.String("deployment", deploymentID), zap.String("deployment", deploymentID),
zap.Error(err), zap.Error(err),
) )
} }
// Track consecutive failures
hc.checkConsecutiveFailures(ctx, deploymentID, healthy)
} }
// checkConsecutiveFailures marks deployment as failed after 3 consecutive failures // reconcileDeployments checks for under-replicated deployments and triggers re-replication.
func (hc *HealthChecker) checkConsecutiveFailures(ctx context.Context, deploymentID string, currentHealthy bool) { // Only runs on the RQLite leader to avoid duplicate repairs.
if currentHealthy { func (hc *HealthChecker) reconcileDeployments(ctx context.Context) {
if hc.reconciler == nil || hc.provisioner == nil {
return return
} }
type healthRow struct { if !hc.isRQLiteLeader(ctx) {
Status string `db:"status"` return
} }
var rows []healthRow hc.logger.Info("Running deployment reconciliation check")
type reconcileRow struct {
ID string `db:"id"`
Namespace string `db:"namespace"`
Name string `db:"name"`
Type string `db:"type"`
HomeNodeID string `db:"home_node_id"`
ContentCID string `db:"content_cid"`
BuildCID string `db:"build_cid"`
Environment string `db:"environment"`
Port int `db:"port"`
HealthCheckPath string `db:"health_check_path"`
MemoryLimitMB int `db:"memory_limit_mb"`
CPULimitPercent int `db:"cpu_limit_percent"`
RestartPolicy string `db:"restart_policy"`
MaxRestartCount int `db:"max_restart_count"`
ActiveReplicas int `db:"active_replicas"`
}
var rows []reconcileRow
query := ` query := `
SELECT status SELECT d.id, d.namespace, d.name, d.type, d.home_node_id,
FROM deployment_health_checks d.content_cid, d.build_cid, d.environment, d.port,
WHERE deployment_id = ? d.health_check_path, d.memory_limit_mb, d.cpu_limit_percent,
ORDER BY checked_at DESC d.restart_policy, d.max_restart_count,
LIMIT 3 (SELECT COUNT(*) FROM deployment_replicas dr
WHERE dr.deployment_id = d.id AND dr.status = 'active') AS active_replicas
FROM deployments d
WHERE d.status IN ('active', 'degraded')
AND d.type IN ('nextjs', 'nodejs-backend', 'go-backend')
` `
err := hc.db.Query(ctx, &rows, query, deploymentID) if err := hc.db.Query(ctx, &rows, query); err != nil {
if err != nil { hc.logger.Error("Failed to query deployments for reconciliation", zap.Error(err))
hc.logger.Error("Failed to query health history", zap.Error(err))
return return
} }
// Check if last 3 checks all failed
if len(rows) >= 3 {
allFailed := true
for _, row := range rows { for _, row := range rows {
if row.Status != "unhealthy" { if row.ActiveReplicas >= defaultDesiredReplicas {
allFailed = false continue
break
}
} }
if allFailed { needed := defaultDesiredReplicas - row.ActiveReplicas
hc.logger.Error("Deployment has 3 consecutive failures, marking as failed", hc.logger.Warn("Deployment under-replicated, triggering re-replication",
zap.String("deployment", deploymentID), zap.String("deployment", row.Name),
zap.String("namespace", row.Namespace),
zap.Int("active_replicas", row.ActiveReplicas),
zap.Int("desired", defaultDesiredReplicas),
zap.Int("needed", needed),
) )
updateQuery := ` newNodes, err := hc.reconciler.SelectReplicaNodes(ctx, row.HomeNodeID, needed)
UPDATE deployments if err != nil || len(newNodes) == 0 {
SET status = 'failed', updated_at = ? hc.logger.Warn("No nodes available for re-replication",
WHERE id = ? zap.String("deployment", row.Name),
`
_, err := hc.db.Exec(ctx, updateQuery, time.Now(), deploymentID)
if err != nil {
hc.logger.Error("Failed to mark deployment as failed", zap.Error(err))
}
// Record event
eventQuery := `
INSERT INTO deployment_events (deployment_id, event_type, message, created_at)
VALUES (?, 'health_failed', 'Deployment marked as failed after 3 consecutive health check failures', ?)
`
if _, err := hc.db.Exec(ctx, eventQuery, deploymentID, time.Now()); err != nil {
hc.logger.Error("Failed to record health_failed event",
zap.String("deployment", deploymentID),
zap.Error(err), zap.Error(err),
) )
continue
} }
dep := &deployments.Deployment{
ID: row.ID,
Namespace: row.Namespace,
Name: row.Name,
Type: deployments.DeploymentType(row.Type),
HomeNodeID: row.HomeNodeID,
ContentCID: row.ContentCID,
BuildCID: row.BuildCID,
Port: row.Port,
HealthCheckPath: row.HealthCheckPath,
MemoryLimitMB: row.MemoryLimitMB,
CPULimitPercent: row.CPULimitPercent,
RestartPolicy: deployments.RestartPolicy(row.RestartPolicy),
MaxRestartCount: row.MaxRestartCount,
}
if row.Environment != "" {
json.Unmarshal([]byte(row.Environment), &dep.Environment)
}
for _, nodeID := range newNodes {
hc.logger.Info("Provisioning replacement replica",
zap.String("deployment", row.Name),
zap.String("target_node", nodeID),
)
go hc.provisioner.SetupDynamicReplica(ctx, dep, nodeID)
} }
} }
} }
// GetHealthStatus gets recent health checks for a deployment // isRQLiteLeader checks whether this node is the current Raft leader.
func (hc *HealthChecker) isRQLiteLeader(ctx context.Context) bool {
dsn := hc.rqliteDSN
if dsn == "" {
dsn = "http://localhost:5001"
}
client := &http.Client{Timeout: 5 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, dsn+"/status", nil)
if err != nil {
return false
}
resp, err := client.Do(req)
if err != nil {
return false
}
defer resp.Body.Close()
var status struct {
Store struct {
Raft struct {
State string `json:"state"`
} `json:"raft"`
} `json:"store"`
}
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return false
}
return status.Store.Raft.State == "Leader"
}
// GetHealthStatus gets recent health checks for a deployment.
func (hc *HealthChecker) GetHealthStatus(ctx context.Context, deploymentID string, limit int) ([]HealthCheck, error) { func (hc *HealthChecker) GetHealthStatus(ctx context.Context, deploymentID string, limit int) ([]HealthCheck, error) {
type healthRow struct { type healthRow struct {
Status string `db:"status"` Status string `db:"status"`
@ -268,7 +609,7 @@ func (hc *HealthChecker) GetHealthStatus(ctx context.Context, deploymentID strin
return checks, nil return checks, nil
} }
// HealthCheck represents a health check result // HealthCheck represents a health check result.
type HealthCheck struct { type HealthCheck struct {
Status string `json:"status"` Status string `json:"status"`
CheckedAt time.Time `json:"checked_at"` CheckedAt time.Time `json:"checked_at"`

View File

@ -11,6 +11,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/DeBrosOfficial/network/pkg/deployments"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -35,9 +36,6 @@ type mockDB struct {
mu sync.Mutex mu sync.Mutex
// Query handling --------------------------------------------------- // Query handling ---------------------------------------------------
// queryFunc is called when Query is invoked. It receives the dest
// pointer and the query string + args. The implementation should
// populate dest (via reflection) and return an error if desired.
queryFunc func(dest interface{}, query string, args ...interface{}) error queryFunc func(dest interface{}, query string, args ...interface{}) error
queryCalls []queryCall queryCalls []queryCall
@ -95,15 +93,54 @@ func (m *mockDB) getQueryCalls() []queryCall {
return out return out
} }
// ---------------------------------------------------------------------------
// Mock process manager
// ---------------------------------------------------------------------------
type mockProcessManager struct {
mu sync.Mutex
restartCalls []string // deployment IDs
restartErr error
stopCalls []string // deployment IDs
stopErr error
}
func (m *mockProcessManager) Restart(_ context.Context, dep *deployments.Deployment) error {
m.mu.Lock()
m.restartCalls = append(m.restartCalls, dep.ID)
m.mu.Unlock()
return m.restartErr
}
func (m *mockProcessManager) Stop(_ context.Context, dep *deployments.Deployment) error {
m.mu.Lock()
m.stopCalls = append(m.stopCalls, dep.ID)
m.mu.Unlock()
return m.stopErr
}
func (m *mockProcessManager) getRestartCalls() []string {
m.mu.Lock()
defer m.mu.Unlock()
out := make([]string, len(m.restartCalls))
copy(out, m.restartCalls)
return out
}
func (m *mockProcessManager) getStopCalls() []string {
m.mu.Lock()
defer m.mu.Unlock()
out := make([]string, len(m.stopCalls))
copy(out, m.stopCalls)
return out
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Helper: populate a *[]T dest via reflection so the mock can return rows. // Helper: populate a *[]T dest via reflection so the mock can return rows.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// appendRows appends rows to dest (a *[]SomeStruct) by creating new elements // appendRows appends rows to dest (a *[]SomeStruct) by creating new elements
// of the destination's element type and copying field values by name. // of the destination's element type and copying field values by name.
// Each row is a map[string]interface{} keyed by field name (Go name, not db tag).
// This sidesteps the type-identity problem where the mock and the caller
// define structurally identical but distinct local types.
func appendRows(dest interface{}, rows []map[string]interface{}) { func appendRows(dest interface{}, rows []map[string]interface{}) {
dv := reflect.ValueOf(dest).Elem() // []T dv := reflect.ValueOf(dest).Elem() // []T
elemType := dv.Type().Elem() // T elemType := dv.Type().Elem() // T
@ -130,8 +167,9 @@ func appendRows(dest interface{}, rows []map[string]interface{}) {
func TestNewHealthChecker_NonNil(t *testing.T) { func TestNewHealthChecker_NonNil(t *testing.T) {
db := &mockDB{} db := &mockDB{}
logger := zap.NewNop() logger := zap.NewNop()
pm := &mockProcessManager{}
hc := NewHealthChecker(db, logger) hc := NewHealthChecker(db, logger, "node-1", pm)
if hc == nil { if hc == nil {
t.Fatal("expected non-nil HealthChecker") t.Fatal("expected non-nil HealthChecker")
@ -145,11 +183,14 @@ func TestNewHealthChecker_NonNil(t *testing.T) {
if hc.workers != 10 { if hc.workers != 10 {
t.Errorf("expected default workers=10, got %d", hc.workers) t.Errorf("expected default workers=10, got %d", hc.workers)
} }
if hc.active == nil { if hc.nodeID != "node-1" {
t.Error("expected active map to be initialized") t.Errorf("expected nodeID='node-1', got %q", hc.nodeID)
} }
if len(hc.active) != 0 { if hc.processManager != pm {
t.Errorf("expected active map to be empty, got %d entries", len(hc.active)) t.Error("expected processManager to be stored")
}
if hc.states == nil {
t.Error("expected states map to be initialized")
} }
} }
@ -157,7 +198,7 @@ func TestNewHealthChecker_NonNil(t *testing.T) {
func TestCheckDeployment_StaticDeployment(t *testing.T) { func TestCheckDeployment_StaticDeployment(t *testing.T) {
db := &mockDB{} db := &mockDB{}
hc := NewHealthChecker(db, zap.NewNop()) hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
dep := deploymentRow{ dep := deploymentRow{
ID: "dep-1", ID: "dep-1",
@ -180,11 +221,10 @@ func TestCheckDeployment_HealthyEndpoint(t *testing.T) {
})) }))
defer srv.Close() defer srv.Close()
// Extract the port from the test server.
port := serverPort(t, srv) port := serverPort(t, srv)
db := &mockDB{} db := &mockDB{}
hc := NewHealthChecker(db, zap.NewNop()) hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
dep := deploymentRow{ dep := deploymentRow{
ID: "dep-2", ID: "dep-2",
@ -207,7 +247,7 @@ func TestCheckDeployment_UnhealthyEndpoint(t *testing.T) {
port := serverPort(t, srv) port := serverPort(t, srv)
db := &mockDB{} db := &mockDB{}
hc := NewHealthChecker(db, zap.NewNop()) hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
dep := deploymentRow{ dep := deploymentRow{
ID: "dep-3", ID: "dep-3",
@ -223,7 +263,7 @@ func TestCheckDeployment_UnhealthyEndpoint(t *testing.T) {
func TestCheckDeployment_UnreachableEndpoint(t *testing.T) { func TestCheckDeployment_UnreachableEndpoint(t *testing.T) {
db := &mockDB{} db := &mockDB{}
hc := NewHealthChecker(db, zap.NewNop()) hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
dep := deploymentRow{ dep := deploymentRow{
ID: "dep-4", ID: "dep-4",
@ -237,123 +277,383 @@ func TestCheckDeployment_UnreachableEndpoint(t *testing.T) {
} }
} }
// ---- c) checkConsecutiveFailures ------------------------------------------ // ---- c) checkAllDeployments query -----------------------------------------
func TestCheckConsecutiveFailures_HealthyReturnsEarly(t *testing.T) { func TestCheckAllDeployments_QueriesLocalReplicas(t *testing.T) {
db := &mockDB{} db := &mockDB{}
hc := NewHealthChecker(db, zap.NewNop()) hc := NewHealthChecker(db, zap.NewNop(), "node-abc", nil)
// When the current check is healthy, the method returns immediately hc.checkAllDeployments(context.Background())
// without querying the database.
hc.checkConsecutiveFailures(context.Background(), "dep-1", true)
calls := db.getQueryCalls() calls := db.getQueryCalls()
if len(calls) != 0 { if len(calls) == 0 {
t.Errorf("expected no query calls when healthy, got %d", len(calls)) t.Fatal("expected at least one query call")
}
q := calls[0].query
if !strings.Contains(q, "deployment_replicas") {
t.Errorf("expected query to join deployment_replicas, got: %s", q)
}
if !strings.Contains(q, "dr.node_id = ?") {
t.Errorf("expected query to filter by dr.node_id, got: %s", q)
}
if !strings.Contains(q, "'degraded'") {
t.Errorf("expected query to include 'degraded' status, got: %s", q)
}
// Verify nodeID was passed as the bind parameter
if len(calls[0].args) == 0 {
t.Fatal("expected query args")
}
if nodeID, ok := calls[0].args[0].(string); !ok || nodeID != "node-abc" {
t.Errorf("expected nodeID arg 'node-abc', got %v", calls[0].args[0])
} }
} }
func TestCheckConsecutiveFailures_LessThan3Failures(t *testing.T) { // ---- d) handleUnhealthy ---------------------------------------------------
func TestHandleUnhealthy_RestartsBeforeFailure(t *testing.T) {
db := &mockDB{}
pm := &mockProcessManager{}
hc := NewHealthChecker(db, zap.NewNop(), "node-1", pm)
dep := deploymentRow{
ID: "dep-restart",
Namespace: "test",
Name: "my-app",
Type: "nextjs",
Port: 10001,
RestartPolicy: "on-failure",
MaxRestartCount: 3,
ReplicaStatus: "active",
}
ctx := context.Background()
// Drive 3 consecutive unhealthy checks -> should trigger restart
for i := 0; i < consecutiveFailuresThreshold; i++ {
hc.handleUnhealthy(ctx, dep)
}
// Verify restart was called
restarts := pm.getRestartCalls()
if len(restarts) != 1 {
t.Fatalf("expected 1 restart call, got %d", len(restarts))
}
if restarts[0] != "dep-restart" {
t.Errorf("expected restart for 'dep-restart', got %q", restarts[0])
}
// Verify no replica status UPDATE was issued (only event INSERT)
execCalls := db.getExecCalls()
for _, call := range execCalls {
if strings.Contains(call.query, "UPDATE deployment_replicas") {
t.Error("should not update replica status when restart succeeds")
}
}
}
func TestHandleUnhealthy_MarksReplicaFailedAfterRestartLimit(t *testing.T) {
db := &mockDB{ db := &mockDB{
queryFunc: func(dest interface{}, query string, args ...interface{}) error { queryFunc: func(dest interface{}, query string, args ...interface{}) error {
// Return only 2 unhealthy rows (fewer than 3). // Return count of 1 active replica (so deployment becomes degraded, not failed)
if strings.Contains(query, "COUNT(*)") {
appendRows(dest, []map[string]interface{}{ appendRows(dest, []map[string]interface{}{
{"Status": "unhealthy"}, {"Count": 1},
{"Status": "unhealthy"},
}) })
}
return nil return nil
}, },
} }
pm := &mockProcessManager{}
hc := NewHealthChecker(db, zap.NewNop(), "node-1", pm)
hc := NewHealthChecker(db, zap.NewNop()) dep := deploymentRow{
hc.checkConsecutiveFailures(context.Background(), "dep-1", false) ID: "dep-limited",
Namespace: "test",
Name: "my-app",
Type: "nextjs",
Port: 10001,
RestartPolicy: "on-failure",
MaxRestartCount: 1, // Only 1 restart allowed
ReplicaStatus: "active",
}
// Should query the DB but NOT issue any UPDATE or event INSERT. ctx := context.Background()
// First 3 misses -> restart (limit=1, attempt 1)
for i := 0; i < consecutiveFailuresThreshold; i++ {
hc.handleUnhealthy(ctx, dep)
}
// Should have restarted once
if len(pm.getRestartCalls()) != 1 {
t.Fatalf("expected 1 restart call, got %d", len(pm.getRestartCalls()))
}
// Next 3 misses -> restart limit exhausted, mark replica failed
for i := 0; i < consecutiveFailuresThreshold; i++ {
hc.handleUnhealthy(ctx, dep)
}
// Verify replica was marked failed
execCalls := db.getExecCalls() execCalls := db.getExecCalls()
if len(execCalls) != 0 { foundReplicaUpdate := false
t.Errorf("expected 0 exec calls with <3 failures, got %d", len(execCalls)) foundDeploymentUpdate := false
for _, call := range execCalls {
if strings.Contains(call.query, "UPDATE deployment_replicas") && strings.Contains(call.query, "'failed'") {
foundReplicaUpdate = true
}
if strings.Contains(call.query, "UPDATE deployments") {
foundDeploymentUpdate = true
} }
} }
func TestCheckConsecutiveFailures_ThreeConsecutive(t *testing.T) { if !foundReplicaUpdate {
t.Error("expected UPDATE deployment_replicas SET status = 'failed'")
}
if !foundDeploymentUpdate {
t.Error("expected UPDATE deployments to recalculate status")
}
// Should NOT have restarted again (limit was 1)
if len(pm.getRestartCalls()) != 1 {
t.Errorf("expected still 1 restart call, got %d", len(pm.getRestartCalls()))
}
}
func TestHandleUnhealthy_NeverRestart(t *testing.T) {
db := &mockDB{ db := &mockDB{
queryFunc: func(dest interface{}, query string, args ...interface{}) error { queryFunc: func(dest interface{}, query string, args ...interface{}) error {
if strings.Contains(query, "COUNT(*)") {
appendRows(dest, []map[string]interface{}{ appendRows(dest, []map[string]interface{}{
{"Status": "unhealthy"}, {"Count": 0},
{"Status": "unhealthy"},
{"Status": "unhealthy"},
}) })
return nil
},
execFunc: func(query string, args ...interface{}) (interface{}, error) {
return nil, nil
},
} }
hc := NewHealthChecker(db, zap.NewNop())
hc.checkConsecutiveFailures(context.Background(), "dep-99", false)
execCalls := db.getExecCalls()
// Expect 2 exec calls: one UPDATE (mark failed) + one INSERT (event).
if len(execCalls) != 2 {
t.Fatalf("expected 2 exec calls (update + event), got %d", len(execCalls))
}
// First call: UPDATE deployments SET status = 'failed'
if !strings.Contains(execCalls[0].query, "UPDATE deployments") {
t.Errorf("expected UPDATE deployments query, got: %s", execCalls[0].query)
}
if !strings.Contains(execCalls[0].query, "status = 'failed'") {
t.Errorf("expected status='failed' in query, got: %s", execCalls[0].query)
}
// Second call: INSERT INTO deployment_events
if !strings.Contains(execCalls[1].query, "INSERT INTO deployment_events") {
t.Errorf("expected INSERT INTO deployment_events, got: %s", execCalls[1].query)
}
if !strings.Contains(execCalls[1].query, "health_failed") {
t.Errorf("expected health_failed event_type, got: %s", execCalls[1].query)
}
// Verify the deployment ID was passed to both queries.
for i, call := range execCalls {
found := false
for _, arg := range call.args {
if arg == "dep-99" {
found = true
break
}
}
if !found {
t.Errorf("exec call %d: expected deployment id 'dep-99' in args %v", i, call.args)
}
}
}
func TestCheckConsecutiveFailures_MixedResults(t *testing.T) {
db := &mockDB{
queryFunc: func(dest interface{}, query string, args ...interface{}) error {
// 3 rows but NOT all unhealthy — no action should be taken.
appendRows(dest, []map[string]interface{}{
{"Status": "unhealthy"},
{"Status": "healthy"},
{"Status": "unhealthy"},
})
return nil return nil
}, },
} }
pm := &mockProcessManager{}
hc := NewHealthChecker(db, zap.NewNop(), "node-1", pm)
hc := NewHealthChecker(db, zap.NewNop()) dep := deploymentRow{
hc.checkConsecutiveFailures(context.Background(), "dep-mixed", false) ID: "dep-never",
Namespace: "test",
Name: "no-restart-app",
Type: "nextjs",
Port: 10001,
RestartPolicy: "never",
MaxRestartCount: 10,
ReplicaStatus: "active",
}
ctx := context.Background()
// 3 misses should immediately mark failed without restart
for i := 0; i < consecutiveFailuresThreshold; i++ {
hc.handleUnhealthy(ctx, dep)
}
// No restart calls
if len(pm.getRestartCalls()) != 0 {
t.Errorf("expected 0 restart calls with policy=never, got %d", len(pm.getRestartCalls()))
}
// Verify replica was marked failed
execCalls := db.getExecCalls()
foundReplicaUpdate := false
for _, call := range execCalls {
if strings.Contains(call.query, "UPDATE deployment_replicas") && strings.Contains(call.query, "'failed'") {
foundReplicaUpdate = true
}
}
if !foundReplicaUpdate {
t.Error("expected replica to be marked failed immediately")
}
}
// ---- e) handleHealthy -----------------------------------------------------
func TestHandleHealthy_ResetsCounters(t *testing.T) {
db := &mockDB{}
pm := &mockProcessManager{}
hc := NewHealthChecker(db, zap.NewNop(), "node-1", pm)
dep := deploymentRow{
ID: "dep-reset",
Namespace: "test",
Name: "flaky-app",
Type: "nextjs",
Port: 10001,
RestartPolicy: "on-failure",
MaxRestartCount: 3,
ReplicaStatus: "active",
}
ctx := context.Background()
// 2 misses (below threshold)
hc.handleUnhealthy(ctx, dep)
hc.handleUnhealthy(ctx, dep)
// Health recovered
hc.handleHealthy(ctx, dep)
// 2 more misses — should NOT trigger restart (counters were reset)
hc.handleUnhealthy(ctx, dep)
hc.handleUnhealthy(ctx, dep)
if len(pm.getRestartCalls()) != 0 {
t.Errorf("expected 0 restart calls after counter reset, got %d", len(pm.getRestartCalls()))
}
}
func TestHandleHealthy_RecoversFailedReplica(t *testing.T) {
callCount := 0
db := &mockDB{
queryFunc: func(dest interface{}, query string, args ...interface{}) error {
if strings.Contains(query, "COUNT(*)") {
callCount++
if callCount == 1 {
// First COUNT: over-replication check — 1 active (under-replicated, allow recovery)
appendRows(dest, []map[string]interface{}{{"Count": 1}})
} else {
// Second COUNT: recalculateDeploymentStatus — now 2 active after recovery
appendRows(dest, []map[string]interface{}{{"Count": 2}})
}
}
return nil
},
}
hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
dep := deploymentRow{
ID: "dep-recover",
Namespace: "test",
Name: "recovered-app",
ReplicaStatus: "failed", // Was failed, now passing health check
}
ctx := context.Background()
hc.handleHealthy(ctx, dep)
// Verify replica was updated back to 'active'
execCalls := db.getExecCalls()
foundReplicaRecovery := false
foundEvent := false
for _, call := range execCalls {
if strings.Contains(call.query, "UPDATE deployment_replicas") && strings.Contains(call.query, "'active'") {
foundReplicaRecovery = true
}
if strings.Contains(call.query, "replica_recovered") {
foundEvent = true
}
}
if !foundReplicaRecovery {
t.Error("expected UPDATE deployment_replicas SET status = 'active'")
}
if !foundEvent {
t.Error("expected replica_recovered event")
}
}
func TestHandleHealthy_StopsZombieReplicaWhenAlreadyReplaced(t *testing.T) {
db := &mockDB{
queryFunc: func(dest interface{}, query string, args ...interface{}) error {
if strings.Contains(query, "COUNT(*)") {
// 2 active replicas already exist — this replica was replaced
appendRows(dest, []map[string]interface{}{{"Count": 2}})
}
return nil
},
}
pm := &mockProcessManager{}
hc := NewHealthChecker(db, zap.NewNop(), "node-zombie", pm)
dep := deploymentRow{
ID: "dep-zombie",
Namespace: "test",
Name: "zombie-app",
Type: "nextjs",
Port: 10001,
ReplicaStatus: "failed", // Was failed, but process is running (systemd Restart=always)
}
ctx := context.Background()
hc.handleHealthy(ctx, dep)
// Verify Stop was called (not Restart)
stopCalls := pm.getStopCalls()
if len(stopCalls) != 1 {
t.Fatalf("expected 1 Stop call, got %d", len(stopCalls))
}
if stopCalls[0] != "dep-zombie" {
t.Errorf("expected Stop for 'dep-zombie', got %q", stopCalls[0])
}
// Verify replica row was DELETED (not updated to active)
execCalls := db.getExecCalls()
foundDelete := false
foundZombieEvent := false
for _, call := range execCalls {
if strings.Contains(call.query, "DELETE FROM deployment_replicas") {
foundDelete = true
// Verify the right deployment and node
if len(call.args) >= 2 {
if call.args[0] != "dep-zombie" || call.args[1] != "node-zombie" {
t.Errorf("DELETE args: got (%v, %v), want (dep-zombie, node-zombie)", call.args[0], call.args[1])
}
}
}
if strings.Contains(call.query, "zombie_replica_stopped") {
foundZombieEvent = true
}
// Should NOT recover to active
if strings.Contains(call.query, "UPDATE deployment_replicas") && strings.Contains(call.query, "'active'") {
t.Error("should NOT update replica to active when it's a zombie")
}
}
if !foundDelete {
t.Error("expected DELETE FROM deployment_replicas for zombie replica")
}
if !foundZombieEvent {
t.Error("expected zombie_replica_stopped event")
}
// Verify no Restart calls
if len(pm.getRestartCalls()) != 0 {
t.Errorf("expected 0 restart calls, got %d", len(pm.getRestartCalls()))
}
}
// ---- f) recordHealthCheck -------------------------------------------------
func TestRecordHealthCheck_IncludesNodeID(t *testing.T) {
db := &mockDB{}
hc := NewHealthChecker(db, zap.NewNop(), "node-xyz", nil)
hc.recordHealthCheck(context.Background(), "dep-1", true)
execCalls := db.getExecCalls() execCalls := db.getExecCalls()
if len(execCalls) != 0 { if len(execCalls) != 1 {
t.Errorf("expected 0 exec calls with mixed results, got %d", len(execCalls)) t.Fatalf("expected 1 exec call, got %d", len(execCalls))
}
q := execCalls[0].query
if !strings.Contains(q, "node_id") {
t.Errorf("expected INSERT to include node_id column, got: %s", q)
}
// Verify node_id is the second arg (after deployment_id)
if len(execCalls[0].args) < 2 {
t.Fatal("expected at least 2 args")
}
if nodeID, ok := execCalls[0].args[1].(string); !ok || nodeID != "node-xyz" {
t.Errorf("expected node_id arg 'node-xyz', got %v", execCalls[0].args[1])
} }
} }
// ---- d) GetHealthStatus --------------------------------------------------- // ---- g) GetHealthStatus ---------------------------------------------------
func TestGetHealthStatus_ReturnsChecks(t *testing.T) { func TestGetHealthStatus_ReturnsChecks(t *testing.T) {
now := time.Now().Truncate(time.Second) now := time.Now().Truncate(time.Second)
@ -368,7 +668,7 @@ func TestGetHealthStatus_ReturnsChecks(t *testing.T) {
}, },
} }
hc := NewHealthChecker(db, zap.NewNop()) hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
checks, err := hc.GetHealthStatus(context.Background(), "dep-1", 10) checks, err := hc.GetHealthStatus(context.Background(), "dep-1", 10)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -391,20 +691,16 @@ func TestGetHealthStatus_ReturnsChecks(t *testing.T) {
if checks[1].Status != "unhealthy" { if checks[1].Status != "unhealthy" {
t.Errorf("checks[1].Status = %q, want %q", checks[1].Status, "unhealthy") t.Errorf("checks[1].Status = %q, want %q", checks[1].Status, "unhealthy")
} }
if checks[1].ResponseTimeMs != 5001 {
t.Errorf("checks[1].ResponseTimeMs = %d, want 5001", checks[1].ResponseTimeMs)
}
} }
func TestGetHealthStatus_EmptyList(t *testing.T) { func TestGetHealthStatus_EmptyList(t *testing.T) {
db := &mockDB{ db := &mockDB{
queryFunc: func(dest interface{}, query string, args ...interface{}) error { queryFunc: func(dest interface{}, query string, args ...interface{}) error {
// Don't populate dest — leave the slice empty.
return nil return nil
}, },
} }
hc := NewHealthChecker(db, zap.NewNop()) hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
checks, err := hc.GetHealthStatus(context.Background(), "dep-empty", 10) checks, err := hc.GetHealthStatus(context.Background(), "dep-empty", 10)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -422,7 +718,7 @@ func TestGetHealthStatus_DatabaseError(t *testing.T) {
}, },
} }
hc := NewHealthChecker(db, zap.NewNop()) hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
_, err := hc.GetHealthStatus(context.Background(), "dep-err", 10) _, err := hc.GetHealthStatus(context.Background(), "dep-err", 10)
if err == nil { if err == nil {
t.Fatal("expected error from GetHealthStatus") t.Fatal("expected error from GetHealthStatus")
@ -432,6 +728,199 @@ func TestGetHealthStatus_DatabaseError(t *testing.T) {
} }
} }
// ---- h) reconcileDeployments ----------------------------------------------
type mockReconciler struct {
mu sync.Mutex
selectCalls []string // primaryNodeIDs
selectResult []string
selectErr error
updateStatusCalls []struct {
deploymentID string
nodeID string
status deployments.ReplicaStatus
}
}
func (m *mockReconciler) SelectReplicaNodes(_ context.Context, primaryNodeID string, _ int) ([]string, error) {
m.mu.Lock()
m.selectCalls = append(m.selectCalls, primaryNodeID)
m.mu.Unlock()
return m.selectResult, m.selectErr
}
func (m *mockReconciler) UpdateReplicaStatus(_ context.Context, deploymentID, nodeID string, status deployments.ReplicaStatus) error {
m.mu.Lock()
m.updateStatusCalls = append(m.updateStatusCalls, struct {
deploymentID string
nodeID string
status deployments.ReplicaStatus
}{deploymentID, nodeID, status})
m.mu.Unlock()
return nil
}
type mockProvisioner struct {
mu sync.Mutex
setupCalls []struct {
deploymentID string
nodeID string
}
}
func (m *mockProvisioner) SetupDynamicReplica(_ context.Context, dep *deployments.Deployment, nodeID string) {
m.mu.Lock()
m.setupCalls = append(m.setupCalls, struct {
deploymentID string
nodeID string
}{dep.ID, nodeID})
m.mu.Unlock()
}
func TestReconcileDeployments_UnderReplicated(t *testing.T) {
// Start a mock RQLite status endpoint that reports Leader
leaderSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte(`{"store":{"raft":{"state":"Leader"}}}`))
}))
defer leaderSrv.Close()
db := &mockDB{
queryFunc: func(dest interface{}, query string, args ...interface{}) error {
if strings.Contains(query, "active_replicas") {
appendRows(dest, []map[string]interface{}{
{
"ID": "dep-under",
"Namespace": "test",
"Name": "under-app",
"Type": "nextjs",
"HomeNodeID": "node-home",
"ContentCID": "cid-123",
"BuildCID": "",
"Environment": "",
"Port": 10001,
"HealthCheckPath": "/health",
"MemoryLimitMB": 256,
"CPULimitPercent": 50,
"RestartPolicy": "on-failure",
"MaxRestartCount": 10,
"ActiveReplicas": 1, // Under-replicated (desired=2)
},
})
}
return nil
},
}
rc := &mockReconciler{selectResult: []string{"node-new"}}
rp := &mockProvisioner{}
hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
hc.SetReconciler(leaderSrv.URL, rc, rp)
hc.reconcileDeployments(context.Background())
// Wait briefly for the goroutine to fire
time.Sleep(50 * time.Millisecond)
// Verify SelectReplicaNodes was called
rc.mu.Lock()
selectCount := len(rc.selectCalls)
rc.mu.Unlock()
if selectCount != 1 {
t.Fatalf("expected 1 SelectReplicaNodes call, got %d", selectCount)
}
// Verify SetupDynamicReplica was called
rp.mu.Lock()
setupCount := len(rp.setupCalls)
rp.mu.Unlock()
if setupCount != 1 {
t.Fatalf("expected 1 SetupDynamicReplica call, got %d", setupCount)
}
rp.mu.Lock()
if rp.setupCalls[0].deploymentID != "dep-under" {
t.Errorf("expected deployment 'dep-under', got %q", rp.setupCalls[0].deploymentID)
}
if rp.setupCalls[0].nodeID != "node-new" {
t.Errorf("expected node 'node-new', got %q", rp.setupCalls[0].nodeID)
}
rp.mu.Unlock()
}
func TestReconcileDeployments_FullyReplicated(t *testing.T) {
leaderSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte(`{"store":{"raft":{"state":"Leader"}}}`))
}))
defer leaderSrv.Close()
db := &mockDB{
queryFunc: func(dest interface{}, query string, args ...interface{}) error {
if strings.Contains(query, "active_replicas") {
appendRows(dest, []map[string]interface{}{
{
"ID": "dep-full",
"Namespace": "test",
"Name": "full-app",
"Type": "nextjs",
"HomeNodeID": "node-home",
"ContentCID": "cid-456",
"BuildCID": "",
"Environment": "",
"Port": 10002,
"HealthCheckPath": "/health",
"MemoryLimitMB": 256,
"CPULimitPercent": 50,
"RestartPolicy": "on-failure",
"MaxRestartCount": 10,
"ActiveReplicas": 2, // Fully replicated
},
})
}
return nil
},
}
rc := &mockReconciler{selectResult: []string{"node-new"}}
rp := &mockProvisioner{}
hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
hc.SetReconciler(leaderSrv.URL, rc, rp)
hc.reconcileDeployments(context.Background())
time.Sleep(50 * time.Millisecond)
// Should NOT trigger re-replication
rc.mu.Lock()
if len(rc.selectCalls) != 0 {
t.Errorf("expected 0 SelectReplicaNodes calls for fully replicated deployment, got %d", len(rc.selectCalls))
}
rc.mu.Unlock()
}
func TestReconcileDeployments_NotLeader(t *testing.T) {
// Not-leader RQLite status
followerSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte(`{"store":{"raft":{"state":"Follower"}}}`))
}))
defer followerSrv.Close()
db := &mockDB{}
rc := &mockReconciler{}
rp := &mockProvisioner{}
hc := NewHealthChecker(db, zap.NewNop(), "node-1", nil)
hc.SetReconciler(followerSrv.URL, rc, rp)
hc.reconcileDeployments(context.Background())
// Should not query deployments at all
calls := db.getQueryCalls()
if len(calls) != 0 {
t.Errorf("expected 0 query calls on follower, got %d", len(calls))
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Helpers // Helpers
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -439,10 +928,8 @@ func TestGetHealthStatus_DatabaseError(t *testing.T) {
// serverPort extracts the port number from an httptest.Server. // serverPort extracts the port number from an httptest.Server.
func serverPort(t *testing.T, srv *httptest.Server) int { func serverPort(t *testing.T, srv *httptest.Server) int {
t.Helper() t.Helper()
// URL is http://127.0.0.1:<port>
addr := srv.Listener.Addr().String() addr := srv.Listener.Addr().String()
var port int var port int
// addr is "127.0.0.1:PORT"
_, err := fmt.Sscanf(addr[strings.LastIndex(addr, ":")+1:], "%d", &port) _, err := fmt.Sscanf(addr[strings.LastIndex(addr, ":")+1:], "%d", &port)
if err != nil { if err != nil {
t.Fatalf("failed to parse port from %q: %v", addr, err) t.Fatalf("failed to parse port from %q: %v", addr, err)

View File

@ -25,6 +25,7 @@ const (
DeploymentStatusDeploying DeploymentStatus = "deploying" DeploymentStatusDeploying DeploymentStatus = "deploying"
DeploymentStatusActive DeploymentStatus = "active" DeploymentStatusActive DeploymentStatus = "active"
DeploymentStatusFailed DeploymentStatus = "failed" DeploymentStatusFailed DeploymentStatus = "failed"
DeploymentStatusDegraded DeploymentStatus = "degraded"
DeploymentStatusStopped DeploymentStatus = "stopped" DeploymentStatusStopped DeploymentStatus = "stopped"
DeploymentStatusUpdating DeploymentStatus = "updating" DeploymentStatusUpdating DeploymentStatus = "updating"
) )

View File

@ -500,7 +500,8 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
) )
// Start health checker // Start health checker
gw.healthChecker = health.NewHealthChecker(dbAdapter, logger.Logger) gw.healthChecker = health.NewHealthChecker(dbAdapter, logger.Logger, cfg.NodePeerID, gw.processManager)
gw.healthChecker.SetReconciler(cfg.RQLiteDSN, gw.replicaManager, gw.deploymentService)
go gw.healthChecker.Start(context.Background()) go gw.healthChecker.Start(context.Background())
logger.ComponentInfo(logging.ComponentGeneral, "Deployment system initialized") logger.ComponentInfo(logging.ComponentGeneral, "Deployment system initialized")

View File

@ -333,13 +333,13 @@ func (s *DeploymentService) createDeploymentReplicas(ctx context.Context, deploy
} }
} else { } else {
// Dynamic deployments: fan out to the secondary node to set up the process // Dynamic deployments: fan out to the secondary node to set up the process
go s.setupDynamicReplica(ctx, deployment, nodeID) go s.SetupDynamicReplica(ctx, deployment, nodeID)
} }
} }
} }
// setupDynamicReplica calls the secondary node's internal API to set up a deployment replica. // SetupDynamicReplica calls the secondary node's internal API to set up a deployment replica.
func (s *DeploymentService) setupDynamicReplica(ctx context.Context, deployment *deployments.Deployment, nodeID string) { func (s *DeploymentService) SetupDynamicReplica(ctx context.Context, deployment *deployments.Deployment, nodeID string) {
nodeIP, err := s.replicaManager.GetNodeIP(ctx, nodeID) nodeIP, err := s.replicaManager.GetNodeIP(ctx, nodeID)
if err != nil { if err != nil {
s.logger.Error("Failed to get node IP for replica setup", s.logger.Error("Failed to get node IP for replica setup",

View File

@ -1125,7 +1125,7 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de
SELECT id, namespace, name, type, port, content_cid, status, home_node_id, subdomain SELECT id, namespace, name, type, port, content_cid, status, home_node_id, subdomain
FROM deployments FROM deployments
WHERE subdomain = ? WHERE subdomain = ?
AND status = 'active' AND status IN ('active', 'degraded')
LIMIT 1 LIMIT 1
` `
result, err := db.Query(internalCtx, query, subdomainOrName) result, err := db.Query(internalCtx, query, subdomainOrName)
@ -1149,7 +1149,7 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de
SELECT id, namespace, name, type, port, content_cid, status, home_node_id, subdomain SELECT id, namespace, name, type, port, content_cid, status, home_node_id, subdomain
FROM deployments FROM deployments
WHERE name = ? WHERE name = ?
AND status = 'active' AND status IN ('active', 'degraded')
LIMIT 1 LIMIT 1
` `
result, err = db.Query(internalCtx, query, subdomainOrName) result, err = db.Query(internalCtx, query, subdomainOrName)
@ -1177,7 +1177,7 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de
FROM deployments d FROM deployments d
JOIN deployment_domains dd ON d.id = dd.deployment_id JOIN deployment_domains dd ON d.id = dd.deployment_id
WHERE dd.domain = ? AND dd.verified_at IS NOT NULL WHERE dd.domain = ? AND dd.verified_at IS NOT NULL
AND d.status = 'active' AND d.status IN ('active', 'degraded')
LIMIT 1 LIMIT 1
` `
result, err := db.Query(internalCtx, query, domain) result, err := db.Query(internalCtx, query, domain)