diff --git a/docs/DEV_DEPLOY.md b/docs/DEV_DEPLOY.md new file mode 100644 index 0000000..dfd4f82 --- /dev/null +++ b/docs/DEV_DEPLOY.md @@ -0,0 +1,136 @@ +# Development Guide + +## Prerequisites + +- Go 1.21+ +- Node.js 18+ (for anyone-client in dev mode) +- macOS or Linux + +## Building + +```bash +# Build all binaries +make build + +# Outputs: +# bin/orama-node — the node binary +# bin/orama — the CLI +# bin/gateway — standalone gateway (optional) +# bin/identity — identity tool +# bin/rqlite-mcp — RQLite MCP server +``` + +## Running Tests + +```bash +make test +``` + +## Running Locally (macOS) + +The node runs in "direct mode" on macOS — processes are managed directly instead of via systemd. + +```bash +# Start a single node +make run-node + +# Start multiple nodes for cluster testing +make run-node2 +make run-node3 +``` + +## Deploying to VPS + +There are two deployment workflows: **development** (fast iteration, no git required) and **production** (via git). + +### Development Deployment (Fast Iteration) + +Use this when iterating quickly — no need to commit or push to git. + +```bash +# 1. Build the CLI for Linux +GOOS=linux GOARCH=amd64 go build -o orama-cli-linux ./cmd/cli + +# 2. Generate a source archive (excludes .git, node_modules, bin/, etc.) +./scripts/generate-source-archive.sh +# Creates: /tmp/network-source.tar.gz + +# 3. Copy CLI and source to the VPS +sshpass -p '' scp -o StrictHostKeyChecking=no orama-cli-linux ubuntu@:/tmp/orama +sshpass -p '' scp -o StrictHostKeyChecking=no /tmp/network-source.tar.gz ubuntu@:/tmp/ + +# 4. On the VPS: extract source and install the CLI +ssh ubuntu@ +sudo rm -rf /home/debros/src && sudo mkdir -p /home/debros/src +sudo tar xzf /tmp/network-source.tar.gz -C /home/debros/src +sudo chown -R debros:debros /home/debros/src +sudo mv /tmp/orama /usr/local/bin/orama && sudo chmod +x /usr/local/bin/orama + +# 5. Upgrade using local source (skips git pull) +sudo orama upgrade --no-pull --restart +``` + +### Production Deployment (Via Git) + +For production releases — pulls source from GitHub on the VPS. + +```bash +# 1. Commit and push your changes +git push origin + +# 2. Build the CLI for Linux +GOOS=linux GOARCH=amd64 go build -o orama-cli-linux ./cmd/cli + +# 3. Deploy the CLI to the VPS +sshpass -p '' scp orama-cli-linux ubuntu@:/tmp/orama +ssh ubuntu@ "sudo mv /tmp/orama /usr/local/bin/orama && sudo chmod +x /usr/local/bin/orama" + +# 4. Run upgrade (downloads source from GitHub) +ssh ubuntu@ "sudo orama upgrade --branch --restart" +``` + +### Deploying to All 3 Nodes + +To deploy to all nodes, repeat steps 3-5 (dev) or 3-4 (production) for each VPS IP. + +### CLI Flags Reference + +| Flag | Description | +|------|-------------| +| `--branch ` | Git branch to pull from (production deployment) | +| `--no-pull` | Skip git pull, use existing `/home/debros/src` (dev deployment) | +| `--restart` | Restart all services after upgrade | +| `--nameserver` | Configure this node as a nameserver (install only) | +| `--domain ` | Domain for HTTPS certificates (install only) | +| `--vps-ip ` | VPS public IP address (install only) | + +## Debugging Production Issues + +Always follow the local-first approach: + +1. **Reproduce locally** — set up the same conditions on your machine +2. **Find the root cause** — understand why it's happening +3. **Fix in the codebase** — make changes to the source code +4. **Test locally** — run `make test` and verify +5. **Deploy** — only then deploy the fix to production + +Never fix issues directly on the server — those fixes are lost on next deployment. + +## Project Structure + +See [ARCHITECTURE.md](ARCHITECTURE.md) for the full architecture overview. + +Key directories: + +``` +cmd/ + cli/ — CLI entry point (orama command) + node/ — Node entry point (orama-node) + gateway/ — Standalone gateway entry point +pkg/ + cli/ — CLI command implementations + gateway/ — HTTP gateway, routes, middleware + deployments/ — Deployment types, service, storage + environments/ — Production (systemd) and development (direct) modes + rqlite/ — Distributed SQLite via RQLite +``` diff --git a/migrations/012_deployment_replicas.sql b/migrations/012_deployment_replicas.sql new file mode 100644 index 0000000..03d203d --- /dev/null +++ b/migrations/012_deployment_replicas.sql @@ -0,0 +1,15 @@ +-- Deployment replicas: tracks which nodes host replicas of each deployment +CREATE TABLE IF NOT EXISTS deployment_replicas ( + deployment_id TEXT NOT NULL, + node_id TEXT NOT NULL, + port INTEGER DEFAULT 0, + status TEXT NOT NULL DEFAULT 'pending', + is_primary BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (deployment_id, node_id), + FOREIGN KEY (deployment_id) REFERENCES deployments(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_deployment_replicas_node ON deployment_replicas(node_id); +CREATE INDEX IF NOT EXISTS idx_deployment_replicas_status ON deployment_replicas(deployment_id, status); diff --git a/pkg/deployments/replica_manager.go b/pkg/deployments/replica_manager.go new file mode 100644 index 0000000..dc89cb7 --- /dev/null +++ b/pkg/deployments/replica_manager.go @@ -0,0 +1,273 @@ +package deployments + +import ( + "context" + "fmt" + "time" + + "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// ReplicaManager manages deployment replicas across nodes +type ReplicaManager struct { + db rqlite.Client + homeNodeMgr *HomeNodeManager + portAllocator *PortAllocator + logger *zap.Logger +} + +// NewReplicaManager creates a new replica manager +func NewReplicaManager(db rqlite.Client, homeNodeMgr *HomeNodeManager, portAllocator *PortAllocator, logger *zap.Logger) *ReplicaManager { + return &ReplicaManager{ + db: db, + homeNodeMgr: homeNodeMgr, + portAllocator: portAllocator, + logger: logger, + } +} + +// SelectReplicaNodes picks additional nodes for replicas, excluding the primary node. +// Returns up to count node IDs. +func (rm *ReplicaManager) SelectReplicaNodes(ctx context.Context, primaryNodeID string, count int) ([]string, error) { + internalCtx := client.WithInternalAuth(ctx) + + activeNodes, err := rm.homeNodeMgr.getActiveNodes(internalCtx) + if err != nil { + return nil, fmt.Errorf("failed to get active nodes: %w", err) + } + + // Filter out the primary node + var candidates []string + for _, nodeID := range activeNodes { + if nodeID != primaryNodeID { + candidates = append(candidates, nodeID) + } + } + + if len(candidates) == 0 { + return nil, nil // No additional nodes available + } + + // Calculate capacity scores and pick the best ones + capacities, err := rm.homeNodeMgr.calculateNodeCapacities(internalCtx, candidates) + if err != nil { + return nil, fmt.Errorf("failed to calculate capacities: %w", err) + } + + // Sort by score descending (simple selection) + selected := make([]string, 0, count) + for i := 0; i < count && i < len(capacities); i++ { + best := rm.homeNodeMgr.selectBestNode(capacities) + if best == nil { + break + } + selected = append(selected, best.NodeID) + // Remove selected from capacities + remaining := make([]*NodeCapacity, 0, len(capacities)-1) + for _, c := range capacities { + if c.NodeID != best.NodeID { + remaining = append(remaining, c) + } + } + capacities = remaining + } + + rm.logger.Info("Selected replica nodes", + zap.String("primary", primaryNodeID), + zap.Strings("replicas", selected), + zap.Int("requested", count), + ) + + return selected, nil +} + +// CreateReplica inserts a replica record for a deployment on a specific node. +func (rm *ReplicaManager) CreateReplica(ctx context.Context, deploymentID, nodeID string, port int, isPrimary bool) error { + internalCtx := client.WithInternalAuth(ctx) + + query := ` + INSERT INTO deployment_replicas (deployment_id, node_id, port, status, is_primary, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(deployment_id, node_id) DO UPDATE SET + port = excluded.port, + status = excluded.status, + is_primary = excluded.is_primary, + updated_at = excluded.updated_at + ` + + now := time.Now() + _, err := rm.db.Exec(internalCtx, query, deploymentID, nodeID, port, ReplicaStatusActive, isPrimary, now, now) + if err != nil { + return &DeploymentError{ + Message: fmt.Sprintf("failed to create replica for deployment %s on node %s", deploymentID, nodeID), + Cause: err, + } + } + + rm.logger.Info("Created deployment replica", + zap.String("deployment_id", deploymentID), + zap.String("node_id", nodeID), + zap.Int("port", port), + zap.Bool("is_primary", isPrimary), + ) + + return nil +} + +// GetReplicas returns all replicas for a deployment. +func (rm *ReplicaManager) GetReplicas(ctx context.Context, deploymentID string) ([]Replica, error) { + internalCtx := client.WithInternalAuth(ctx) + + type replicaRow struct { + DeploymentID string `db:"deployment_id"` + NodeID string `db:"node_id"` + Port int `db:"port"` + Status string `db:"status"` + IsPrimary bool `db:"is_primary"` + } + + var rows []replicaRow + query := `SELECT deployment_id, node_id, port, status, is_primary FROM deployment_replicas WHERE deployment_id = ?` + err := rm.db.Query(internalCtx, &rows, query, deploymentID) + if err != nil { + return nil, &DeploymentError{ + Message: "failed to query replicas", + Cause: err, + } + } + + replicas := make([]Replica, len(rows)) + for i, row := range rows { + replicas[i] = Replica{ + DeploymentID: row.DeploymentID, + NodeID: row.NodeID, + Port: row.Port, + Status: ReplicaStatus(row.Status), + IsPrimary: row.IsPrimary, + } + } + + return replicas, nil +} + +// GetActiveReplicaNodes returns node IDs of all active replicas for a deployment. +func (rm *ReplicaManager) GetActiveReplicaNodes(ctx context.Context, deploymentID string) ([]string, error) { + internalCtx := client.WithInternalAuth(ctx) + + type nodeRow struct { + NodeID string `db:"node_id"` + } + + var rows []nodeRow + query := `SELECT node_id FROM deployment_replicas WHERE deployment_id = ? AND status = ?` + err := rm.db.Query(internalCtx, &rows, query, deploymentID, ReplicaStatusActive) + if err != nil { + return nil, &DeploymentError{ + Message: "failed to query active replicas", + Cause: err, + } + } + + nodes := make([]string, len(rows)) + for i, row := range rows { + nodes[i] = row.NodeID + } + + return nodes, nil +} + +// IsReplicaNode checks if the given node is an active replica for the deployment. +func (rm *ReplicaManager) IsReplicaNode(ctx context.Context, deploymentID, nodeID string) (bool, error) { + internalCtx := client.WithInternalAuth(ctx) + + type countRow struct { + Count int `db:"c"` + } + + var rows []countRow + query := `SELECT COUNT(*) as c FROM deployment_replicas WHERE deployment_id = ? AND node_id = ? AND status = ?` + err := rm.db.Query(internalCtx, &rows, query, deploymentID, nodeID, ReplicaStatusActive) + if err != nil { + return false, err + } + + return len(rows) > 0 && rows[0].Count > 0, nil +} + +// GetReplicaPort returns the port allocated for a deployment on a specific node. +func (rm *ReplicaManager) GetReplicaPort(ctx context.Context, deploymentID, nodeID string) (int, error) { + internalCtx := client.WithInternalAuth(ctx) + + type portRow struct { + Port int `db:"port"` + } + + var rows []portRow + query := `SELECT port FROM deployment_replicas WHERE deployment_id = ? AND node_id = ? AND status = ? LIMIT 1` + err := rm.db.Query(internalCtx, &rows, query, deploymentID, nodeID, ReplicaStatusActive) + if err != nil { + return 0, err + } + + if len(rows) == 0 { + return 0, fmt.Errorf("no active replica found for deployment %s on node %s", deploymentID, nodeID) + } + + return rows[0].Port, nil +} + +// UpdateReplicaStatus updates the status of a specific replica. +func (rm *ReplicaManager) UpdateReplicaStatus(ctx context.Context, deploymentID, nodeID string, status ReplicaStatus) error { + internalCtx := client.WithInternalAuth(ctx) + + query := `UPDATE deployment_replicas SET status = ?, updated_at = ? WHERE deployment_id = ? AND node_id = ?` + _, err := rm.db.Exec(internalCtx, query, status, time.Now(), deploymentID, nodeID) + if err != nil { + return &DeploymentError{ + Message: fmt.Sprintf("failed to update replica status for %s on %s", deploymentID, nodeID), + Cause: err, + } + } + + return nil +} + +// RemoveReplicas deletes all replica records for a deployment. +func (rm *ReplicaManager) RemoveReplicas(ctx context.Context, deploymentID string) error { + internalCtx := client.WithInternalAuth(ctx) + + query := `DELETE FROM deployment_replicas WHERE deployment_id = ?` + _, err := rm.db.Exec(internalCtx, query, deploymentID) + if err != nil { + return &DeploymentError{ + Message: "failed to remove replicas", + Cause: err, + } + } + + return nil +} + +// GetNodeIP retrieves the IP address for a node from dns_nodes. +func (rm *ReplicaManager) GetNodeIP(ctx context.Context, nodeID string) (string, error) { + internalCtx := client.WithInternalAuth(ctx) + + type nodeRow struct { + IPAddress string `db:"ip_address"` + } + + var rows []nodeRow + query := `SELECT ip_address FROM dns_nodes WHERE id = ? LIMIT 1` + err := rm.db.Query(internalCtx, &rows, query, nodeID) + if err != nil { + return "", err + } + + if len(rows) == 0 { + return "", fmt.Errorf("node not found: %s", nodeID) + } + + return rows[0].IPAddress, nil +} diff --git a/pkg/deployments/types.go b/pkg/deployments/types.go index 84c82b1..8cbcbda 100644 --- a/pkg/deployments/types.go +++ b/pkg/deployments/types.go @@ -82,6 +82,30 @@ type Deployment struct { DeployedBy string `json:"deployed_by"` } +// ReplicaStatus represents the status of a deployment replica on a node +type ReplicaStatus string + +const ( + ReplicaStatusPending ReplicaStatus = "pending" + ReplicaStatusActive ReplicaStatus = "active" + ReplicaStatusFailed ReplicaStatus = "failed" + ReplicaStatusRemoving ReplicaStatus = "removing" +) + +// DefaultReplicaCount is the default number of replicas per deployment +const DefaultReplicaCount = 2 + +// Replica represents a deployment replica on a specific node +type Replica struct { + DeploymentID string `json:"deployment_id"` + NodeID string `json:"node_id"` + Port int `json:"port"` + Status ReplicaStatus `json:"status"` + IsPrimary bool `json:"is_primary"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + // PortAllocation represents an allocated port on a specific node type PortAllocation struct { NodeID string `json:"node_id"` diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 3f7da9b..6f23547 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -91,8 +91,10 @@ type Gateway struct { domainHandler *deploymentshandlers.DomainHandler sqliteHandler *sqlitehandlers.SQLiteHandler sqliteBackupHandler *sqlitehandlers.BackupHandler + replicaHandler *deploymentshandlers.ReplicaHandler portAllocator *deployments.PortAllocator homeNodeManager *deployments.HomeNodeManager + replicaManager *deployments.ReplicaManager processManager *process.Manager healthChecker *health.HealthChecker @@ -252,6 +254,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { // Create deployment service components gw.portAllocator = deployments.NewPortAllocator(deps.ORMClient, logger.Logger) gw.homeNodeManager = deployments.NewHomeNodeManager(deps.ORMClient, gw.portAllocator, logger.Logger) + gw.replicaManager = deployments.NewReplicaManager(deps.ORMClient, gw.homeNodeManager, gw.portAllocator, logger.Logger) gw.processManager = process.NewManager(logger.Logger) // Create deployment service @@ -259,6 +262,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { deps.ORMClient, gw.homeNodeManager, gw.portAllocator, + gw.replicaManager, logger.Logger, ) // Set base domain from config @@ -329,6 +333,14 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.Logger, ) + gw.replicaHandler = deploymentshandlers.NewReplicaHandler( + gw.deploymentService, + gw.processManager, + deps.IPFSClient, + logger.Logger, + baseDeployPath, + ) + gw.logsHandler = deploymentshandlers.NewLogsHandler( gw.deploymentService, gw.processManager, diff --git a/pkg/gateway/handlers/deployments/list_handler.go b/pkg/gateway/handlers/deployments/list_handler.go index b5bf24e..6d331a2 100644 --- a/pkg/gateway/handlers/deployments/list_handler.go +++ b/pkg/gateway/handlers/deployments/list_handler.go @@ -219,6 +219,9 @@ func (h *ListHandler) HandleDelete(w http.ResponseWriter, r *http.Request) { return } + // 0. Fan out teardown to replica nodes (before local cleanup so replicas can stop processes) + h.service.FanOutToReplicas(ctx, deployment, "/v1/internal/deployments/replica/teardown", nil) + // 1. Stop systemd service if err := h.processManager.Stop(ctx, deployment); err != nil { h.logger.Warn("Failed to stop deployment service (may not exist)", zap.Error(err), zap.String("name", deployment.Name)) diff --git a/pkg/gateway/handlers/deployments/replica_handler.go b/pkg/gateway/handlers/deployments/replica_handler.go new file mode 100644 index 0000000..327f30a --- /dev/null +++ b/pkg/gateway/handlers/deployments/replica_handler.go @@ -0,0 +1,424 @@ +package deployments + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "path/filepath" + "time" + + "os/exec" + + "github.com/DeBrosOfficial/network/pkg/deployments" + "github.com/DeBrosOfficial/network/pkg/deployments/process" + "github.com/DeBrosOfficial/network/pkg/ipfs" + "go.uber.org/zap" +) + +// ReplicaHandler handles internal node-to-node replica coordination endpoints. +type ReplicaHandler struct { + service *DeploymentService + processManager *process.Manager + ipfsClient ipfs.IPFSClient + logger *zap.Logger + baseDeployPath string +} + +// NewReplicaHandler creates a new replica handler. +func NewReplicaHandler( + service *DeploymentService, + processManager *process.Manager, + ipfsClient ipfs.IPFSClient, + logger *zap.Logger, + baseDeployPath string, +) *ReplicaHandler { + if baseDeployPath == "" { + baseDeployPath = filepath.Join(os.Getenv("HOME"), ".orama", "deployments") + } + return &ReplicaHandler{ + service: service, + processManager: processManager, + ipfsClient: ipfsClient, + logger: logger, + baseDeployPath: baseDeployPath, + } +} + +// replicaSetupRequest is the payload for setting up a new replica. +type replicaSetupRequest struct { + DeploymentID string `json:"deployment_id"` + Namespace string `json:"namespace"` + Name string `json:"name"` + Type string `json:"type"` + ContentCID string `json:"content_cid"` + BuildCID string `json:"build_cid"` + Environment string `json:"environment"` // JSON-encoded env vars + HealthCheckPath string `json:"health_check_path"` + MemoryLimitMB int `json:"memory_limit_mb"` + CPULimitPercent int `json:"cpu_limit_percent"` + RestartPolicy string `json:"restart_policy"` + MaxRestartCount int `json:"max_restart_count"` +} + +// HandleSetup sets up a new deployment replica on this node. +// POST /v1/internal/deployments/replica/setup +func (h *ReplicaHandler) HandleSetup(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if !h.isInternalRequest(r) { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + var req replicaSetupRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + h.logger.Info("Setting up deployment replica", + zap.String("deployment_id", req.DeploymentID), + zap.String("name", req.Name), + zap.String("type", req.Type), + ) + + ctx := r.Context() + + // Allocate a port on this node + port, err := h.service.portAllocator.AllocatePort(ctx, h.service.nodePeerID, req.DeploymentID) + if err != nil { + h.logger.Error("Failed to allocate port for replica", zap.Error(err)) + http.Error(w, "Failed to allocate port", http.StatusInternalServerError) + return + } + + // Create the deployment directory + deployPath := filepath.Join(h.baseDeployPath, req.Namespace, req.Name) + if err := os.MkdirAll(deployPath, 0755); err != nil { + http.Error(w, "Failed to create deployment directory", http.StatusInternalServerError) + return + } + + // Extract content from IPFS + cid := req.BuildCID + if cid == "" { + cid = req.ContentCID + } + + if err := h.extractFromIPFS(ctx, cid, deployPath); err != nil { + h.logger.Error("Failed to extract IPFS content for replica", zap.Error(err)) + http.Error(w, "Failed to extract content", http.StatusInternalServerError) + return + } + + // Parse environment + var env map[string]string + if req.Environment != "" { + json.Unmarshal([]byte(req.Environment), &env) + } + if env == nil { + env = make(map[string]string) + } + + // Build a Deployment struct for the process manager + deployment := &deployments.Deployment{ + ID: req.DeploymentID, + Namespace: req.Namespace, + Name: req.Name, + Type: deployments.DeploymentType(req.Type), + Port: port, + HomeNodeID: h.service.nodePeerID, + ContentCID: req.ContentCID, + BuildCID: req.BuildCID, + Environment: env, + HealthCheckPath: req.HealthCheckPath, + MemoryLimitMB: req.MemoryLimitMB, + CPULimitPercent: req.CPULimitPercent, + RestartPolicy: deployments.RestartPolicy(req.RestartPolicy), + MaxRestartCount: req.MaxRestartCount, + } + + // Start the process + if err := h.processManager.Start(ctx, deployment, deployPath); err != nil { + h.logger.Error("Failed to start replica process", zap.Error(err)) + http.Error(w, fmt.Sprintf("Failed to start process: %v", err), http.StatusInternalServerError) + return + } + + // Wait for health check + if err := h.processManager.WaitForHealthy(ctx, deployment, 90*time.Second); err != nil { + h.logger.Warn("Replica did not become healthy", zap.Error(err)) + } + + // Update replica record to active with the port + if h.service.replicaManager != nil { + h.service.replicaManager.CreateReplica(ctx, req.DeploymentID, h.service.nodePeerID, port, false) + } + + resp := map[string]interface{}{ + "status": "active", + "port": port, + "node_id": h.service.nodePeerID, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// replicaUpdateRequest is the payload for updating a replica. +type replicaUpdateRequest struct { + DeploymentID string `json:"deployment_id"` + Namespace string `json:"namespace"` + Name string `json:"name"` + Type string `json:"type"` + ContentCID string `json:"content_cid"` + BuildCID string `json:"build_cid"` + NewVersion int `json:"new_version"` +} + +// HandleUpdate updates a deployment replica on this node. +// POST /v1/internal/deployments/replica/update +func (h *ReplicaHandler) HandleUpdate(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if !h.isInternalRequest(r) { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + var req replicaUpdateRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + h.logger.Info("Updating deployment replica", + zap.String("deployment_id", req.DeploymentID), + zap.String("name", req.Name), + ) + + ctx := r.Context() + deployType := deployments.DeploymentType(req.Type) + + isStatic := deployType == deployments.DeploymentTypeStatic || + deployType == deployments.DeploymentTypeNextJSStatic || + deployType == deployments.DeploymentTypeGoWASM + + if isStatic { + // Static deployments: nothing to do locally, IPFS handles content + resp := map[string]interface{}{"status": "updated"} + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + return + } + + // Dynamic deployment: extract new content and restart + cid := req.BuildCID + if cid == "" { + cid = req.ContentCID + } + + deployPath := filepath.Join(h.baseDeployPath, req.Namespace, req.Name) + stagingPath := deployPath + ".new" + oldPath := deployPath + ".old" + + // Extract to staging + if err := os.MkdirAll(stagingPath, 0755); err != nil { + http.Error(w, "Failed to create staging directory", http.StatusInternalServerError) + return + } + + if err := h.extractFromIPFS(ctx, cid, stagingPath); err != nil { + os.RemoveAll(stagingPath) + http.Error(w, "Failed to extract content", http.StatusInternalServerError) + return + } + + // Atomic swap + if err := os.Rename(deployPath, oldPath); err != nil { + os.RemoveAll(stagingPath) + http.Error(w, "Failed to backup current deployment", http.StatusInternalServerError) + return + } + + if err := os.Rename(stagingPath, deployPath); err != nil { + os.Rename(oldPath, deployPath) + http.Error(w, "Failed to activate new deployment", http.StatusInternalServerError) + return + } + + // Get the port for this replica + var port int + if h.service.replicaManager != nil { + p, err := h.service.replicaManager.GetReplicaPort(ctx, req.DeploymentID, h.service.nodePeerID) + if err == nil { + port = p + } + } + + // Restart the process + deployment := &deployments.Deployment{ + ID: req.DeploymentID, + Namespace: req.Namespace, + Name: req.Name, + Type: deployType, + Port: port, + HomeNodeID: h.service.nodePeerID, + } + + if err := h.processManager.Restart(ctx, deployment); err != nil { + // Rollback + os.Rename(deployPath, stagingPath) + os.Rename(oldPath, deployPath) + h.processManager.Restart(ctx, deployment) + http.Error(w, fmt.Sprintf("Failed to restart: %v", err), http.StatusInternalServerError) + return + } + + // Health check + if err := h.processManager.WaitForHealthy(ctx, deployment, 60*time.Second); err != nil { + h.logger.Warn("Replica unhealthy after update, rolling back", zap.Error(err)) + os.Rename(deployPath, stagingPath) + os.Rename(oldPath, deployPath) + h.processManager.Restart(ctx, deployment) + http.Error(w, "Health check failed after update", http.StatusInternalServerError) + return + } + + os.RemoveAll(oldPath) + + resp := map[string]interface{}{"status": "updated"} + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// HandleRollback rolls back a deployment replica on this node. +// POST /v1/internal/deployments/replica/rollback +func (h *ReplicaHandler) HandleRollback(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if !h.isInternalRequest(r) { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + // Rollback uses the same logic as update — the caller sends the target CID + h.HandleUpdate(w, r) +} + +// replicaTeardownRequest is the payload for tearing down a replica. +type replicaTeardownRequest struct { + DeploymentID string `json:"deployment_id"` + Namespace string `json:"namespace"` + Name string `json:"name"` + Type string `json:"type"` +} + +// HandleTeardown removes a deployment replica from this node. +// POST /v1/internal/deployments/replica/teardown +func (h *ReplicaHandler) HandleTeardown(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if !h.isInternalRequest(r) { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + var req replicaTeardownRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + h.logger.Info("Tearing down deployment replica", + zap.String("deployment_id", req.DeploymentID), + zap.String("name", req.Name), + ) + + ctx := r.Context() + + // Get port for this replica before teardown + var port int + if h.service.replicaManager != nil { + p, err := h.service.replicaManager.GetReplicaPort(ctx, req.DeploymentID, h.service.nodePeerID) + if err == nil { + port = p + } + } + + // Stop the process + deployment := &deployments.Deployment{ + ID: req.DeploymentID, + Namespace: req.Namespace, + Name: req.Name, + Type: deployments.DeploymentType(req.Type), + Port: port, + HomeNodeID: h.service.nodePeerID, + } + + if err := h.processManager.Stop(ctx, deployment); err != nil { + h.logger.Warn("Failed to stop replica process", zap.Error(err)) + } + + // Remove deployment files + deployPath := filepath.Join(h.baseDeployPath, req.Namespace, req.Name) + if err := os.RemoveAll(deployPath); err != nil { + h.logger.Warn("Failed to remove replica files", zap.Error(err)) + } + + // Update replica status + if h.service.replicaManager != nil { + h.service.replicaManager.UpdateReplicaStatus(ctx, req.DeploymentID, h.service.nodePeerID, deployments.ReplicaStatusRemoving) + } + + resp := map[string]interface{}{"status": "removed"} + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// extractFromIPFS downloads and extracts a tarball from IPFS. +func (h *ReplicaHandler) extractFromIPFS(ctx context.Context, cid, destPath string) error { + reader, err := h.ipfsClient.Get(ctx, "/ipfs/"+cid, "") + if err != nil { + return err + } + defer reader.Close() + + tmpFile, err := os.CreateTemp("", "replica-deploy-*.tar.gz") + if err != nil { + return err + } + defer os.Remove(tmpFile.Name()) + defer tmpFile.Close() + + if _, err := tmpFile.ReadFrom(reader); err != nil { + return err + } + tmpFile.Close() + + cmd := exec.Command("tar", "-xzf", tmpFile.Name(), "-C", destPath) + if output, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to extract tarball: %s: %w", string(output), err) + } + + return nil +} + +// isInternalRequest checks if the request is an internal node-to-node call. +func (h *ReplicaHandler) isInternalRequest(r *http.Request) bool { + return r.Header.Get("X-Orama-Internal-Auth") == "replica-coordination" +} diff --git a/pkg/gateway/handlers/deployments/rollback_handler.go b/pkg/gateway/handlers/deployments/rollback_handler.go index 89b1df0..bae7313 100644 --- a/pkg/gateway/handlers/deployments/rollback_handler.go +++ b/pkg/gateway/handlers/deployments/rollback_handler.go @@ -112,6 +112,11 @@ func (h *RollbackHandler) HandleRollback(w http.ResponseWriter, r *http.Request) return } + // Fan out rollback to replica nodes + h.service.FanOutToReplicas(ctx, rolled, "/v1/internal/deployments/replica/rollback", map[string]interface{}{ + "new_version": rolled.Version, + }) + // Return response resp := map[string]interface{}{ "deployment_id": rolled.ID, diff --git a/pkg/gateway/handlers/deployments/service.go b/pkg/gateway/handlers/deployments/service.go index df7b1aa..6c24147 100644 --- a/pkg/gateway/handlers/deployments/service.go +++ b/pkg/gateway/handlers/deployments/service.go @@ -1,10 +1,12 @@ package deployments import ( + "bytes" "context" "crypto/rand" "encoding/json" "fmt" + "net/http" "strings" "time" @@ -26,6 +28,7 @@ type DeploymentService struct { db rqlite.Client homeNodeManager *deployments.HomeNodeManager portAllocator *deployments.PortAllocator + replicaManager *deployments.ReplicaManager logger *zap.Logger baseDomain string // Base domain for deployments (e.g., "dbrs.space") nodePeerID string // Current node's peer ID (deployments run on this node) @@ -36,12 +39,14 @@ func NewDeploymentService( db rqlite.Client, homeNodeManager *deployments.HomeNodeManager, portAllocator *deployments.PortAllocator, + replicaManager *deployments.ReplicaManager, logger *zap.Logger, ) *DeploymentService { return &DeploymentService{ db: db, homeNodeManager: homeNodeManager, portAllocator: portAllocator, + replicaManager: replicaManager, logger: logger, baseDomain: "dbrs.space", // default } @@ -231,6 +236,11 @@ func (s *DeploymentService) CreateDeployment(ctx context.Context, deployment *de // Record in history s.recordHistory(ctx, deployment, "deployed") + // Create replica records + if s.replicaManager != nil { + s.createDeploymentReplicas(ctx, deployment) + } + s.logger.Info("Deployment created", zap.String("id", deployment.ID), zap.String("namespace", deployment.Namespace), @@ -243,6 +253,157 @@ func (s *DeploymentService) CreateDeployment(ctx context.Context, deployment *de return nil } +// createDeploymentReplicas creates replica records for a deployment. +// The primary replica is always the current node. A secondary replica is +// selected from available nodes using capacity scoring. +func (s *DeploymentService) createDeploymentReplicas(ctx context.Context, deployment *deployments.Deployment) { + primaryNodeID := deployment.HomeNodeID + + // Register the primary replica + if err := s.replicaManager.CreateReplica(ctx, deployment.ID, primaryNodeID, deployment.Port, true); err != nil { + s.logger.Error("Failed to create primary replica record", + zap.String("deployment_id", deployment.ID), + zap.Error(err), + ) + return + } + + // Select a secondary node + secondaryNodes, err := s.replicaManager.SelectReplicaNodes(ctx, primaryNodeID, deployments.DefaultReplicaCount-1) + if err != nil { + s.logger.Warn("Failed to select secondary replica nodes", + zap.String("deployment_id", deployment.ID), + zap.Error(err), + ) + return + } + + if len(secondaryNodes) == 0 { + s.logger.Warn("No secondary nodes available for replica, running with single replica", + zap.String("deployment_id", deployment.ID), + ) + return + } + + for _, nodeID := range secondaryNodes { + isStatic := deployment.Type == deployments.DeploymentTypeStatic || + deployment.Type == deployments.DeploymentTypeNextJSStatic || + deployment.Type == deployments.DeploymentTypeGoWASM + + if isStatic { + // Static deployments: content is in IPFS, no process to start + if err := s.replicaManager.CreateReplica(ctx, deployment.ID, nodeID, 0, false); err != nil { + s.logger.Error("Failed to create static replica", + zap.String("deployment_id", deployment.ID), + zap.String("node_id", nodeID), + zap.Error(err), + ) + } + } else { + // Dynamic deployments: fan out to the secondary node to set up the process + go s.setupDynamicReplica(ctx, deployment, nodeID) + } + } +} + +// setupDynamicReplica calls the secondary node's internal API to set up a deployment replica. +func (s *DeploymentService) setupDynamicReplica(ctx context.Context, deployment *deployments.Deployment, nodeID string) { + nodeIP, err := s.replicaManager.GetNodeIP(ctx, nodeID) + if err != nil { + s.logger.Error("Failed to get node IP for replica setup", + zap.String("node_id", nodeID), + zap.Error(err), + ) + return + } + + // Create the replica record in pending status + if err := s.replicaManager.CreateReplica(ctx, deployment.ID, nodeID, 0, false); err != nil { + s.logger.Error("Failed to create pending replica record", + zap.String("deployment_id", deployment.ID), + zap.String("node_id", nodeID), + zap.Error(err), + ) + return + } + + // Call the internal API on the target node + envJSON, _ := json.Marshal(deployment.Environment) + + payload := map[string]interface{}{ + "deployment_id": deployment.ID, + "namespace": deployment.Namespace, + "name": deployment.Name, + "type": deployment.Type, + "content_cid": deployment.ContentCID, + "build_cid": deployment.BuildCID, + "environment": string(envJSON), + "health_check_path": deployment.HealthCheckPath, + "memory_limit_mb": deployment.MemoryLimitMB, + "cpu_limit_percent": deployment.CPULimitPercent, + "restart_policy": deployment.RestartPolicy, + "max_restart_count": deployment.MaxRestartCount, + } + + resp, err := s.callInternalAPI(nodeIP, "/v1/internal/deployments/replica/setup", payload) + if err != nil { + s.logger.Error("Failed to set up dynamic replica on remote node", + zap.String("deployment_id", deployment.ID), + zap.String("node_id", nodeID), + zap.String("node_ip", nodeIP), + zap.Error(err), + ) + s.replicaManager.UpdateReplicaStatus(ctx, deployment.ID, nodeID, deployments.ReplicaStatusFailed) + return + } + + // Update replica with allocated port + if port, ok := resp["port"].(float64); ok && port > 0 { + s.replicaManager.CreateReplica(ctx, deployment.ID, nodeID, int(port), false) + } + + s.logger.Info("Dynamic replica set up on remote node", + zap.String("deployment_id", deployment.ID), + zap.String("node_id", nodeID), + ) +} + +// callInternalAPI makes an HTTP POST to a node's internal API. +func (s *DeploymentService) callInternalAPI(nodeIP, path string, payload map[string]interface{}) (map[string]interface{}, error) { + jsonData, err := json.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("failed to marshal payload: %w", err) + } + + url := fmt.Sprintf("http://%s:6001%s", nodeIP, path) + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Orama-Internal-Auth", "replica-coordination") + + client := &http.Client{Timeout: 120 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + var result map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return result, fmt.Errorf("remote node returned status %d", resp.StatusCode) + } + + return result, nil +} + // GetDeployment retrieves a deployment by namespace and name func (s *DeploymentService) GetDeployment(ctx context.Context, namespace, name string) (*deployments.Deployment, error) { type deploymentRow struct { @@ -517,3 +678,65 @@ func (s *DeploymentService) recordHistory(ctx context.Context, deployment *deplo s.logger.Error("Failed to record history", zap.Error(err)) } } + +// FanOutToReplicas sends an internal API call to all non-local replica nodes +// for a given deployment. The path should be the internal API endpoint +// (e.g., "/v1/internal/deployments/replica/update"). Errors are logged but +// do not fail the operation — replicas are updated on a best-effort basis. +func (s *DeploymentService) FanOutToReplicas(ctx context.Context, deployment *deployments.Deployment, path string, extraPayload map[string]interface{}) { + if s.replicaManager == nil { + return + } + + replicaNodes, err := s.replicaManager.GetActiveReplicaNodes(ctx, deployment.ID) + if err != nil { + s.logger.Warn("Failed to get replica nodes for fan-out", + zap.String("deployment_id", deployment.ID), + zap.Error(err), + ) + return + } + + payload := map[string]interface{}{ + "deployment_id": deployment.ID, + "namespace": deployment.Namespace, + "name": deployment.Name, + "type": deployment.Type, + "content_cid": deployment.ContentCID, + "build_cid": deployment.BuildCID, + } + for k, v := range extraPayload { + payload[k] = v + } + + for _, nodeID := range replicaNodes { + if nodeID == s.nodePeerID { + continue // Skip self + } + + nodeIP, err := s.replicaManager.GetNodeIP(ctx, nodeID) + if err != nil { + s.logger.Warn("Failed to get IP for replica node", + zap.String("node_id", nodeID), + zap.Error(err), + ) + continue + } + + go func(ip, nid string) { + _, err := s.callInternalAPI(ip, path, payload) + if err != nil { + s.logger.Error("Replica fan-out failed", + zap.String("node_id", nid), + zap.String("path", path), + zap.Error(err), + ) + } else { + s.logger.Info("Replica fan-out succeeded", + zap.String("node_id", nid), + zap.String("path", path), + ) + } + }(nodeIP, nodeID) + } +} diff --git a/pkg/gateway/handlers/deployments/update_handler.go b/pkg/gateway/handlers/deployments/update_handler.go index 3145365..f11652e 100644 --- a/pkg/gateway/handlers/deployments/update_handler.go +++ b/pkg/gateway/handlers/deployments/update_handler.go @@ -101,6 +101,11 @@ func (h *UpdateHandler) HandleUpdate(w http.ResponseWriter, r *http.Request) { return } + // Fan out update to replica nodes + h.service.FanOutToReplicas(ctx, updated, "/v1/internal/deployments/replica/update", map[string]interface{}{ + "new_version": updated.Version, + }) + // Return response resp := map[string]interface{}{ "deployment_id": updated.ID, diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index a8b1098..10d6e3b 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -197,6 +197,11 @@ func isPublicPath(p string) bool { return true } + // Internal replica coordination endpoints (auth handled by replica handler) + if strings.HasPrefix(p, "/v1/internal/deployments/replica/") { + return true + } + switch p { case "/health", "/v1/health", "/status", "/v1/status", "/v1/auth/jwks", "/.well-known/jwks.json", "/v1/version", "/v1/auth/login", "/v1/auth/challenge", "/v1/auth/verify", "/v1/auth/register", "/v1/auth/refresh", "/v1/auth/logout", "/v1/auth/api-key", "/v1/auth/simple-key", "/v1/network/status", "/v1/network/peers", "/v1/internal/tls/check", "/v1/internal/acme/present", "/v1/internal/acme/cleanup": return true @@ -794,7 +799,9 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de } // proxyToDynamicDeployment proxies requests to a dynamic deployment's local port -// If the deployment is on a different node, it forwards the request to that node +// If the deployment is on a different node, it forwards the request to that node. +// With replica support, it first checks if the current node is a replica and can +// serve the request locally using the replica's port. func (g *Gateway) proxyToDynamicDeployment(w http.ResponseWriter, r *http.Request, deployment *deployments.Deployment) { if deployment.Port == 0 { http.Error(w, "Deployment has no assigned port", http.StatusServiceUnavailable) @@ -804,12 +811,28 @@ func (g *Gateway) proxyToDynamicDeployment(w http.ResponseWriter, r *http.Reques // Check if request was already forwarded by another node (loop prevention) proxyNode := r.Header.Get("X-Orama-Proxy-Node") - // Check if this deployment is on the current node + // Check if this deployment is on the current node (primary) if g.nodePeerID != "" && deployment.HomeNodeID != "" && deployment.HomeNodeID != g.nodePeerID && proxyNode == "" { - // Need to proxy to home node - if g.proxyCrossNode(w, r, deployment) { - return // Request was proxied successfully + + // Check if this node is a replica and can serve locally + if g.replicaManager != nil { + replicaPort, err := g.replicaManager.GetReplicaPort(r.Context(), deployment.ID, g.nodePeerID) + if err == nil && replicaPort > 0 { + // This node is a replica — serve locally using the replica's port + g.logger.Debug("Serving from local replica", + zap.String("deployment", deployment.Name), + zap.Int("replica_port", replicaPort), + ) + deployment.Port = replicaPort + // Fall through to local proxy below + goto serveLocal + } + } + + // Not a replica on this node — proxy to a healthy replica node + if g.proxyCrossNodeWithReplicas(w, r, deployment) { + return } // Fall through if cross-node proxy failed - try local anyway g.logger.Warn("Cross-node proxy failed, attempting local fallback", @@ -818,6 +841,8 @@ func (g *Gateway) proxyToDynamicDeployment(w http.ResponseWriter, r *http.Reques ) } +serveLocal: + // Create a simple reverse proxy to localhost target := "http://localhost:" + strconv.Itoa(deployment.Port) @@ -953,6 +978,99 @@ func (g *Gateway) proxyCrossNode(w http.ResponseWriter, r *http.Request, deploym return true } +// proxyCrossNodeWithReplicas tries to proxy a request to any healthy replica node. +// It first tries the primary (home node), then falls back to other replicas. +// Returns true if the request was successfully proxied. +func (g *Gateway) proxyCrossNodeWithReplicas(w http.ResponseWriter, r *http.Request, deployment *deployments.Deployment) bool { + if g.replicaManager == nil { + // No replica manager — fall back to original single-node proxy + return g.proxyCrossNode(w, r, deployment) + } + + // Get all active replica nodes + replicaNodes, err := g.replicaManager.GetActiveReplicaNodes(r.Context(), deployment.ID) + if err != nil || len(replicaNodes) == 0 { + // Fall back to original home node proxy + return g.proxyCrossNode(w, r, deployment) + } + + // Try each replica node (primary first if present) + for _, nodeID := range replicaNodes { + if nodeID == g.nodePeerID { + continue // Skip self + } + + nodeIP, err := g.replicaManager.GetNodeIP(r.Context(), nodeID) + if err != nil { + g.logger.Warn("Failed to get replica node IP", + zap.String("node_id", nodeID), + zap.Error(err), + ) + continue + } + + // Proxy using the same logic as proxyCrossNode + proxyDeployment := *deployment + proxyDeployment.HomeNodeID = nodeID + if g.proxyCrossNodeToIP(w, r, &proxyDeployment, nodeIP) { + return true + } + } + + return false +} + +// proxyCrossNodeToIP forwards a request to a specific node IP. +// This is a variant of proxyCrossNode that takes a resolved IP directly. +func (g *Gateway) proxyCrossNodeToIP(w http.ResponseWriter, r *http.Request, deployment *deployments.Deployment, nodeIP string) bool { + g.logger.Info("Proxying request to replica node", + zap.String("deployment", deployment.Name), + zap.String("node_id", deployment.HomeNodeID), + zap.String("node_ip", nodeIP), + ) + + targetURL := "http://" + nodeIP + ":6001" + r.URL.Path + if r.URL.RawQuery != "" { + targetURL += "?" + r.URL.RawQuery + } + + proxyReq, err := http.NewRequest(r.Method, targetURL, r.Body) + if err != nil { + g.logger.Error("Failed to create cross-node proxy request", zap.Error(err)) + return false + } + + for key, values := range r.Header { + for _, value := range values { + proxyReq.Header.Add(key, value) + } + } + proxyReq.Host = r.Host + proxyReq.Header.Set("X-Forwarded-For", getClientIP(r)) + proxyReq.Header.Set("X-Orama-Proxy-Node", g.nodePeerID) + + httpClient := &http.Client{Timeout: 120 * time.Second} + resp, err := httpClient.Do(proxyReq) + if err != nil { + g.logger.Warn("Replica proxy request failed", + zap.String("target_ip", nodeIP), + zap.Error(err), + ) + return false + } + defer resp.Body.Close() + + for key, values := range resp.Header { + for _, value := range values { + w.Header().Add(key, value) + } + } + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) + + return true +} + // Helper functions for type conversion func getString(v interface{}) string { if s, ok := v.(string); ok { diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index 34f010e..10aca3a 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -123,6 +123,14 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/deployments/stats", g.withHomeNodeProxy(g.statsHandler.HandleStats)) mux.HandleFunc("/v1/deployments/events", g.logsHandler.HandleGetEvents) + // Internal replica coordination endpoints + if g.replicaHandler != nil { + mux.HandleFunc("/v1/internal/deployments/replica/setup", g.replicaHandler.HandleSetup) + mux.HandleFunc("/v1/internal/deployments/replica/update", g.replicaHandler.HandleUpdate) + mux.HandleFunc("/v1/internal/deployments/replica/rollback", g.replicaHandler.HandleRollback) + mux.HandleFunc("/v1/internal/deployments/replica/teardown", g.replicaHandler.HandleTeardown) + } + // Custom domains mux.HandleFunc("/v1/deployments/domains/add", g.domainHandler.HandleAddDomain) mux.HandleFunc("/v1/deployments/domains/verify", g.domainHandler.HandleVerifyDomain)