From d256a83fb71c952f9fe60b5caaa1f82c418110bc Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Sat, 21 Feb 2026 06:41:19 +0200 Subject: [PATCH] feat: enhance namespace management with cluster state deletion and improved deprovisioning process --- .gitignore | 4 +- pkg/cli/production/install/orchestrator.go | 43 ++++- .../handlers/namespace/delete_handler.go | 149 +++++++++++++++++- .../handlers/namespace/spawn_handler.go | 10 +- pkg/namespace/cluster_manager.go | 57 ++++++- pkg/namespace/systemd_spawner.go | 12 ++ 6 files changed, 257 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 91d8a2d..2b0f167 100644 --- a/.gitignore +++ b/.gitignore @@ -109,4 +109,6 @@ terms-agreement results/ -phantom-auth/ \ No newline at end of file +phantom-auth/ + +*.db \ No newline at end of file diff --git a/pkg/cli/production/install/orchestrator.go b/pkg/cli/production/install/orchestrator.go index f3ecd67..e7630d7 100644 --- a/pkg/cli/production/install/orchestrator.go +++ b/pkg/cli/production/install/orchestrator.go @@ -278,7 +278,7 @@ func (o *Orchestrator) executeJoinFlow() error { // Step 4: Verify WG tunnel fmt.Printf("\nšŸ” Verifying WireGuard tunnel...\n") - if err := o.verifyWGTunnel(joinResp.WGPeers); err != nil { + if err := o.verifyWGTunnel(joinResp.WGPeers, o.flags.JoinAddress); err != nil { return fmt.Errorf("WireGuard tunnel verification failed: %w", err) } fmt.Printf(" āœ“ WireGuard tunnel established\n") @@ -422,14 +422,31 @@ func (o *Orchestrator) saveSecretsFromJoinResponse(resp *joinhandlers.JoinRespon return nil } -// verifyWGTunnel pings a WG peer to verify the tunnel is working -func (o *Orchestrator) verifyWGTunnel(peers []joinhandlers.WGPeerInfo) error { +// verifyWGTunnel pings a WG peer to verify the tunnel is working. +// It targets the node that handled the join request (joinAddress), since that +// node is the only one guaranteed to have the new peer's key immediately. +// Other peers learn the key via the WireGuard sync loop (up to 60s delay), +// so pinging them would race against replication. +func (o *Orchestrator) verifyWGTunnel(peers []joinhandlers.WGPeerInfo, joinAddress string) error { if len(peers) == 0 { return fmt.Errorf("no WG peers to verify") } - // Extract the IP from the first peer's AllowedIP (e.g. "10.0.0.1/32" -> "10.0.0.1") - targetIP := strings.TrimSuffix(peers[0].AllowedIP, "/32") + // Find the join node's WG IP by matching its public IP against peer endpoints. + targetIP := "" + joinHost := extractHost(joinAddress) + for _, p := range peers { + endpointHost := extractHost(p.Endpoint) + if endpointHost == joinHost { + targetIP = strings.TrimSuffix(p.AllowedIP, "/32") + break + } + } + + // Fallback to first peer if the join node wasn't found in the peer list. + if targetIP == "" { + targetIP = strings.TrimSuffix(peers[0].AllowedIP, "/32") + } // Retry ping for up to 30 seconds deadline := time.Now().Add(30 * time.Second) @@ -444,6 +461,22 @@ func (o *Orchestrator) verifyWGTunnel(peers []joinhandlers.WGPeerInfo) error { return fmt.Errorf("could not reach %s via WireGuard after 30s", targetIP) } +// extractHost returns the host part from a URL or host:port string. +func extractHost(addr string) string { + // Strip scheme (http://, https://) + addr = strings.TrimPrefix(addr, "http://") + addr = strings.TrimPrefix(addr, "https://") + // Strip port + if idx := strings.LastIndex(addr, ":"); idx != -1 { + addr = addr[:idx] + } + // Strip trailing path + if idx := strings.Index(addr, "/"); idx != -1 { + addr = addr[:idx] + } + return addr +} + func (o *Orchestrator) printFirstNodeSecrets() { fmt.Printf("šŸ“‹ To add more nodes to this cluster:\n\n") fmt.Printf(" 1. Generate an invite token:\n") diff --git a/pkg/gateway/handlers/namespace/delete_handler.go b/pkg/gateway/handlers/namespace/delete_handler.go index f3418f3..2a0d9e1 100644 --- a/pkg/gateway/handlers/namespace/delete_handler.go +++ b/pkg/gateway/handlers/namespace/delete_handler.go @@ -1,10 +1,12 @@ package namespace import ( + "bytes" "context" "encoding/json" "fmt" "net/http" + "time" "github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys" "github.com/DeBrosOfficial/network/pkg/ipfs" @@ -79,25 +81,28 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - h.logger.Info("Deprovisioning namespace cluster", + h.logger.Info("Deleting namespace", zap.String("namespace", ns), zap.Int64("namespace_id", namespaceID), ) - // 1. Deprovision the cluster (stops infra + deployment processes, deallocates ports, deletes DNS) + // 1. Deprovision the cluster (stops infra on ALL nodes, deletes cluster-state, deallocates ports, deletes DNS) if err := h.deprovisioner.DeprovisionCluster(r.Context(), namespaceID); err != nil { h.logger.Error("Failed to deprovision cluster", zap.Error(err)) writeDeleteResponse(w, http.StatusInternalServerError, map[string]interface{}{"error": err.Error()}) return } - // 2. Unpin IPFS content (must run before global table cleanup to read CID list) + // 2. Clean up deployments (teardown replicas on all nodes, unpin IPFS, delete DB records) + h.cleanupDeployments(r.Context(), ns) + + // 3. Unpin IPFS content from ipfs_content_ownership (separate from deployment CIDs) h.unpinNamespaceContent(r.Context(), ns) - // 3. Clean up global tables that use namespace TEXT (not FK cascade) + // 4. Clean up global tables that use namespace TEXT (not FK cascade) h.cleanupGlobalTables(r.Context(), ns) - // 4. Delete API keys, ownership records, and namespace record (FK cascade handles children) + // 5. Delete API keys, ownership records, and namespace record h.ormClient.Exec(r.Context(), "DELETE FROM wallet_api_keys WHERE namespace_id = ?", namespaceID) h.ormClient.Exec(r.Context(), "DELETE FROM api_keys WHERE namespace_id = ?", namespaceID) h.ormClient.Exec(r.Context(), "DELETE FROM namespace_ownership WHERE namespace_id = ?", namespaceID) @@ -111,6 +116,140 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { }) } +// cleanupDeployments tears down all deployment replicas on all nodes, unpins IPFS content, +// and deletes all deployment-related DB records for the namespace. +// Best-effort: individual failures are logged but do not abort deletion. +func (h *DeleteHandler) cleanupDeployments(ctx context.Context, ns string) { + type deploymentInfo struct { + ID string `db:"id"` + Name string `db:"name"` + Type string `db:"type"` + ContentCID string `db:"content_cid"` + BuildCID string `db:"build_cid"` + } + var deps []deploymentInfo + if err := h.ormClient.Query(ctx, &deps, + "SELECT id, name, type, content_cid, build_cid FROM deployments WHERE namespace = ?", ns); err != nil { + h.logger.Warn("Failed to query deployments for cleanup", + zap.String("namespace", ns), zap.Error(err)) + return + } + + if len(deps) == 0 { + return + } + + h.logger.Info("Cleaning up deployments for namespace", + zap.String("namespace", ns), + zap.Int("count", len(deps))) + + // 1. Send teardown to all replica nodes for each deployment + for _, dep := range deps { + h.teardownDeploymentReplicas(ctx, ns, dep.ID, dep.Name, dep.Type) + } + + // 2. Unpin deployment IPFS content + if h.ipfsClient != nil { + for _, dep := range deps { + if dep.ContentCID != "" { + if err := h.ipfsClient.Unpin(ctx, dep.ContentCID); err != nil { + h.logger.Warn("Failed to unpin deployment content CID", + zap.String("deployment_id", dep.ID), + zap.String("cid", dep.ContentCID), zap.Error(err)) + } + } + if dep.BuildCID != "" { + if err := h.ipfsClient.Unpin(ctx, dep.BuildCID); err != nil { + h.logger.Warn("Failed to unpin deployment build CID", + zap.String("deployment_id", dep.ID), + zap.String("cid", dep.BuildCID), zap.Error(err)) + } + } + } + } + + // 3. Clean up deployment DB records (children first, since FK cascades disabled in rqlite) + for _, dep := range deps { + // Child tables with FK to deployments(id) + h.ormClient.Exec(ctx, "DELETE FROM deployment_replicas WHERE deployment_id = ?", dep.ID) + h.ormClient.Exec(ctx, "DELETE FROM port_allocations WHERE deployment_id = ?", dep.ID) + h.ormClient.Exec(ctx, "DELETE FROM deployment_domains WHERE deployment_id = ?", dep.ID) + h.ormClient.Exec(ctx, "DELETE FROM deployment_history WHERE deployment_id = ?", dep.ID) + h.ormClient.Exec(ctx, "DELETE FROM deployment_env_vars WHERE deployment_id = ?", dep.ID) + h.ormClient.Exec(ctx, "DELETE FROM deployment_events WHERE deployment_id = ?", dep.ID) + h.ormClient.Exec(ctx, "DELETE FROM deployment_health_checks WHERE deployment_id = ?", dep.ID) + // Tables with no FK constraint + h.ormClient.Exec(ctx, "DELETE FROM dns_records WHERE deployment_id = ?", dep.ID) + h.ormClient.Exec(ctx, "DELETE FROM global_deployment_subdomains WHERE deployment_id = ?", dep.ID) + } + h.ormClient.Exec(ctx, "DELETE FROM deployments WHERE namespace = ?", ns) + + h.logger.Info("Deployment cleanup completed", + zap.String("namespace", ns), + zap.Int("deployments_cleaned", len(deps))) +} + +// teardownDeploymentReplicas sends a teardown request to every node that has a replica +// of the given deployment. Each node stops its process, removes files, and deallocates its port. +func (h *DeleteHandler) teardownDeploymentReplicas(ctx context.Context, ns, deploymentID, name, depType string) { + type replicaNode struct { + NodeID string `db:"node_id"` + InternalIP string `db:"internal_ip"` + } + var nodes []replicaNode + query := ` + SELECT dr.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip + FROM deployment_replicas dr + JOIN dns_nodes dn ON dr.node_id = dn.id + WHERE dr.deployment_id = ? + ` + if err := h.ormClient.Query(ctx, &nodes, query, deploymentID); err != nil { + h.logger.Warn("Failed to query replica nodes for teardown", + zap.String("deployment_id", deploymentID), zap.Error(err)) + return + } + + if len(nodes) == 0 { + return + } + + payload := map[string]interface{}{ + "deployment_id": deploymentID, + "namespace": ns, + "name": name, + "type": depType, + } + jsonData, err := json.Marshal(payload) + if err != nil { + h.logger.Error("Failed to marshal teardown payload", zap.Error(err)) + return + } + + for _, node := range nodes { + url := fmt.Sprintf("http://%s:6001/v1/internal/deployments/replica/teardown", node.InternalIP) + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonData)) + if err != nil { + h.logger.Warn("Failed to create teardown request", + zap.String("node_id", node.NodeID), zap.Error(err)) + continue + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Orama-Internal-Auth", "replica-coordination") + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + h.logger.Warn("Failed to send teardown to replica node", + zap.String("deployment_id", deploymentID), + zap.String("node_id", node.NodeID), + zap.String("node_ip", node.InternalIP), + zap.Error(err)) + continue + } + resp.Body.Close() + } +} + // unpinNamespaceContent unpins all IPFS content owned by the namespace. // Best-effort: individual failures are logged but do not abort deletion. func (h *DeleteHandler) unpinNamespaceContent(ctx context.Context, ns string) { diff --git a/pkg/gateway/handlers/namespace/spawn_handler.go b/pkg/gateway/handlers/namespace/spawn_handler.go index f1875b4..e075862 100644 --- a/pkg/gateway/handlers/namespace/spawn_handler.go +++ b/pkg/gateway/handlers/namespace/spawn_handler.go @@ -17,7 +17,7 @@ import ( // SpawnRequest represents a request to spawn or stop a namespace instance type SpawnRequest struct { - Action string `json:"action"` // "spawn-rqlite", "spawn-olric", "spawn-gateway", "stop-rqlite", "stop-olric", "stop-gateway" + Action string `json:"action"` // "spawn-rqlite", "spawn-olric", "spawn-gateway", "stop-rqlite", "stop-olric", "stop-gateway", "save-cluster-state", "delete-cluster-state" Namespace string `json:"namespace"` NodeID string `json:"node_id"` @@ -234,6 +234,14 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) + case "delete-cluster-state": + if err := h.systemdSpawner.DeleteClusterState(req.Namespace); err != nil { + h.logger.Error("Failed to delete cluster state", zap.Error(err)) + writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) + return + } + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) + default: writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: fmt.Sprintf("unknown action: %s", req.Action)}) } diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index 0236dd6..1172bc4 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -818,7 +818,9 @@ func (cm *ClusterManager) rollbackProvisioning(ctx context.Context, cluster *Nam cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, "Provisioning failed and rolled back") } -// DeprovisionCluster tears down a namespace cluster +// DeprovisionCluster tears down a namespace cluster on all nodes. +// Stops namespace infrastructure (Gateway, Olric, RQLite) on every cluster node, +// deletes cluster-state.json, deallocates ports, removes DNS records, and cleans up DB. func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID int64) error { cluster, err := cm.GetClusterByNamespaceID(ctx, namespaceID) if err != nil { @@ -837,16 +839,59 @@ func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID in cm.logEvent(ctx, cluster.ID, EventDeprovisionStarted, "", "Cluster deprovisioning started", nil) cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusDeprovisioning, "") - // Stop all services using systemd - cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName) + // 1. Get cluster nodes WITH IPs (must happen before any DB deletion) + type deprovisionNodeInfo struct { + NodeID string `db:"node_id"` + InternalIP string `db:"internal_ip"` + } + var clusterNodes []deprovisionNodeInfo + nodeQuery := ` + SELECT ncn.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip + FROM namespace_cluster_nodes ncn + JOIN dns_nodes dn ON ncn.node_id = dn.id + WHERE ncn.namespace_cluster_id = ? + ` + if err := cm.db.Query(ctx, &clusterNodes, nodeQuery, cluster.ID); err != nil { + cm.logger.Warn("Failed to query cluster nodes for deprovisioning, falling back to local-only stop", zap.Error(err)) + // Fall back to local-only stop (individual methods, NOT StopAll which uses dangerous glob) + cm.systemdSpawner.StopGateway(ctx, cluster.NamespaceName, cm.localNodeID) + cm.systemdSpawner.StopOlric(ctx, cluster.NamespaceName, cm.localNodeID) + cm.systemdSpawner.StopRQLite(ctx, cluster.NamespaceName, cm.localNodeID) + cm.systemdSpawner.DeleteClusterState(cluster.NamespaceName) + } else { + // 2. Stop namespace infra on ALL nodes (reverse dependency order: Gateway → Olric → RQLite) + for _, node := range clusterNodes { + cm.stopGatewayOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName) + } + for _, node := range clusterNodes { + cm.stopOlricOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName) + } + for _, node := range clusterNodes { + cm.stopRQLiteOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName, nil) + } - // Deallocate all ports + // 3. Delete cluster-state.json on all nodes + for _, node := range clusterNodes { + if node.NodeID == cm.localNodeID { + cm.systemdSpawner.DeleteClusterState(cluster.NamespaceName) + } else { + cm.sendStopRequest(ctx, node.InternalIP, "delete-cluster-state", cluster.NamespaceName, node.NodeID) + } + } + } + + // 4. Deallocate all ports cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID) - // Delete DNS records + // 5. Delete namespace DNS records cm.dnsManager.DeleteNamespaceRecords(ctx, cluster.NamespaceName) - // Delete cluster record + // 6. Explicitly delete child tables (FK cascades disabled in rqlite) + cm.db.Exec(ctx, `DELETE FROM namespace_cluster_events WHERE namespace_cluster_id = ?`, cluster.ID) + cm.db.Exec(ctx, `DELETE FROM namespace_cluster_nodes WHERE namespace_cluster_id = ?`, cluster.ID) + cm.db.Exec(ctx, `DELETE FROM namespace_port_allocations WHERE namespace_cluster_id = ?`, cluster.ID) + + // 7. Delete cluster record cm.db.Exec(ctx, `DELETE FROM namespace_clusters WHERE id = ?`, cluster.ID) cm.logEvent(ctx, cluster.ID, EventDeprovisioned, "", "Cluster deprovisioned", nil) diff --git a/pkg/namespace/systemd_spawner.go b/pkg/namespace/systemd_spawner.go index e873b7e..a3db02b 100644 --- a/pkg/namespace/systemd_spawner.go +++ b/pkg/namespace/systemd_spawner.go @@ -306,6 +306,18 @@ func (s *SystemdSpawner) SaveClusterState(namespace string, data []byte) error { return nil } +// DeleteClusterState removes cluster state and config files for a namespace. +func (s *SystemdSpawner) DeleteClusterState(namespace string) error { + dir := filepath.Join(s.namespaceBase, namespace) + if err := os.RemoveAll(dir); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to delete namespace data directory: %w", err) + } + s.logger.Info("Deleted namespace data directory", + zap.String("namespace", namespace), + zap.String("path", dir)) + return nil +} + // StopAll stops all services for a namespace, including deployment processes func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error { s.logger.Info("Stopping all namespace services via systemd",