mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 03:33:01 +00:00
feat: enhance namespace management with cluster state deletion and improved deprovisioning process
This commit is contained in:
parent
c731486454
commit
d256a83fb7
4
.gitignore
vendored
4
.gitignore
vendored
@ -109,4 +109,6 @@ terms-agreement
|
||||
|
||||
results/
|
||||
|
||||
phantom-auth/
|
||||
phantom-auth/
|
||||
|
||||
*.db
|
||||
@ -278,7 +278,7 @@ func (o *Orchestrator) executeJoinFlow() error {
|
||||
|
||||
// Step 4: Verify WG tunnel
|
||||
fmt.Printf("\n🔍 Verifying WireGuard tunnel...\n")
|
||||
if err := o.verifyWGTunnel(joinResp.WGPeers); err != nil {
|
||||
if err := o.verifyWGTunnel(joinResp.WGPeers, o.flags.JoinAddress); err != nil {
|
||||
return fmt.Errorf("WireGuard tunnel verification failed: %w", err)
|
||||
}
|
||||
fmt.Printf(" ✓ WireGuard tunnel established\n")
|
||||
@ -422,14 +422,31 @@ func (o *Orchestrator) saveSecretsFromJoinResponse(resp *joinhandlers.JoinRespon
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyWGTunnel pings a WG peer to verify the tunnel is working
|
||||
func (o *Orchestrator) verifyWGTunnel(peers []joinhandlers.WGPeerInfo) error {
|
||||
// verifyWGTunnel pings a WG peer to verify the tunnel is working.
|
||||
// It targets the node that handled the join request (joinAddress), since that
|
||||
// node is the only one guaranteed to have the new peer's key immediately.
|
||||
// Other peers learn the key via the WireGuard sync loop (up to 60s delay),
|
||||
// so pinging them would race against replication.
|
||||
func (o *Orchestrator) verifyWGTunnel(peers []joinhandlers.WGPeerInfo, joinAddress string) error {
|
||||
if len(peers) == 0 {
|
||||
return fmt.Errorf("no WG peers to verify")
|
||||
}
|
||||
|
||||
// Extract the IP from the first peer's AllowedIP (e.g. "10.0.0.1/32" -> "10.0.0.1")
|
||||
targetIP := strings.TrimSuffix(peers[0].AllowedIP, "/32")
|
||||
// Find the join node's WG IP by matching its public IP against peer endpoints.
|
||||
targetIP := ""
|
||||
joinHost := extractHost(joinAddress)
|
||||
for _, p := range peers {
|
||||
endpointHost := extractHost(p.Endpoint)
|
||||
if endpointHost == joinHost {
|
||||
targetIP = strings.TrimSuffix(p.AllowedIP, "/32")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback to first peer if the join node wasn't found in the peer list.
|
||||
if targetIP == "" {
|
||||
targetIP = strings.TrimSuffix(peers[0].AllowedIP, "/32")
|
||||
}
|
||||
|
||||
// Retry ping for up to 30 seconds
|
||||
deadline := time.Now().Add(30 * time.Second)
|
||||
@ -444,6 +461,22 @@ func (o *Orchestrator) verifyWGTunnel(peers []joinhandlers.WGPeerInfo) error {
|
||||
return fmt.Errorf("could not reach %s via WireGuard after 30s", targetIP)
|
||||
}
|
||||
|
||||
// extractHost returns the host part from a URL or host:port string.
|
||||
func extractHost(addr string) string {
|
||||
// Strip scheme (http://, https://)
|
||||
addr = strings.TrimPrefix(addr, "http://")
|
||||
addr = strings.TrimPrefix(addr, "https://")
|
||||
// Strip port
|
||||
if idx := strings.LastIndex(addr, ":"); idx != -1 {
|
||||
addr = addr[:idx]
|
||||
}
|
||||
// Strip trailing path
|
||||
if idx := strings.Index(addr, "/"); idx != -1 {
|
||||
addr = addr[:idx]
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
func (o *Orchestrator) printFirstNodeSecrets() {
|
||||
fmt.Printf("📋 To add more nodes to this cluster:\n\n")
|
||||
fmt.Printf(" 1. Generate an invite token:\n")
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
package namespace
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys"
|
||||
"github.com/DeBrosOfficial/network/pkg/ipfs"
|
||||
@ -79,25 +81,28 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
h.logger.Info("Deprovisioning namespace cluster",
|
||||
h.logger.Info("Deleting namespace",
|
||||
zap.String("namespace", ns),
|
||||
zap.Int64("namespace_id", namespaceID),
|
||||
)
|
||||
|
||||
// 1. Deprovision the cluster (stops infra + deployment processes, deallocates ports, deletes DNS)
|
||||
// 1. Deprovision the cluster (stops infra on ALL nodes, deletes cluster-state, deallocates ports, deletes DNS)
|
||||
if err := h.deprovisioner.DeprovisionCluster(r.Context(), namespaceID); err != nil {
|
||||
h.logger.Error("Failed to deprovision cluster", zap.Error(err))
|
||||
writeDeleteResponse(w, http.StatusInternalServerError, map[string]interface{}{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// 2. Unpin IPFS content (must run before global table cleanup to read CID list)
|
||||
// 2. Clean up deployments (teardown replicas on all nodes, unpin IPFS, delete DB records)
|
||||
h.cleanupDeployments(r.Context(), ns)
|
||||
|
||||
// 3. Unpin IPFS content from ipfs_content_ownership (separate from deployment CIDs)
|
||||
h.unpinNamespaceContent(r.Context(), ns)
|
||||
|
||||
// 3. Clean up global tables that use namespace TEXT (not FK cascade)
|
||||
// 4. Clean up global tables that use namespace TEXT (not FK cascade)
|
||||
h.cleanupGlobalTables(r.Context(), ns)
|
||||
|
||||
// 4. Delete API keys, ownership records, and namespace record (FK cascade handles children)
|
||||
// 5. Delete API keys, ownership records, and namespace record
|
||||
h.ormClient.Exec(r.Context(), "DELETE FROM wallet_api_keys WHERE namespace_id = ?", namespaceID)
|
||||
h.ormClient.Exec(r.Context(), "DELETE FROM api_keys WHERE namespace_id = ?", namespaceID)
|
||||
h.ormClient.Exec(r.Context(), "DELETE FROM namespace_ownership WHERE namespace_id = ?", namespaceID)
|
||||
@ -111,6 +116,140 @@ func (h *DeleteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
})
|
||||
}
|
||||
|
||||
// cleanupDeployments tears down all deployment replicas on all nodes, unpins IPFS content,
|
||||
// and deletes all deployment-related DB records for the namespace.
|
||||
// Best-effort: individual failures are logged but do not abort deletion.
|
||||
func (h *DeleteHandler) cleanupDeployments(ctx context.Context, ns string) {
|
||||
type deploymentInfo struct {
|
||||
ID string `db:"id"`
|
||||
Name string `db:"name"`
|
||||
Type string `db:"type"`
|
||||
ContentCID string `db:"content_cid"`
|
||||
BuildCID string `db:"build_cid"`
|
||||
}
|
||||
var deps []deploymentInfo
|
||||
if err := h.ormClient.Query(ctx, &deps,
|
||||
"SELECT id, name, type, content_cid, build_cid FROM deployments WHERE namespace = ?", ns); err != nil {
|
||||
h.logger.Warn("Failed to query deployments for cleanup",
|
||||
zap.String("namespace", ns), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if len(deps) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
h.logger.Info("Cleaning up deployments for namespace",
|
||||
zap.String("namespace", ns),
|
||||
zap.Int("count", len(deps)))
|
||||
|
||||
// 1. Send teardown to all replica nodes for each deployment
|
||||
for _, dep := range deps {
|
||||
h.teardownDeploymentReplicas(ctx, ns, dep.ID, dep.Name, dep.Type)
|
||||
}
|
||||
|
||||
// 2. Unpin deployment IPFS content
|
||||
if h.ipfsClient != nil {
|
||||
for _, dep := range deps {
|
||||
if dep.ContentCID != "" {
|
||||
if err := h.ipfsClient.Unpin(ctx, dep.ContentCID); err != nil {
|
||||
h.logger.Warn("Failed to unpin deployment content CID",
|
||||
zap.String("deployment_id", dep.ID),
|
||||
zap.String("cid", dep.ContentCID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
if dep.BuildCID != "" {
|
||||
if err := h.ipfsClient.Unpin(ctx, dep.BuildCID); err != nil {
|
||||
h.logger.Warn("Failed to unpin deployment build CID",
|
||||
zap.String("deployment_id", dep.ID),
|
||||
zap.String("cid", dep.BuildCID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Clean up deployment DB records (children first, since FK cascades disabled in rqlite)
|
||||
for _, dep := range deps {
|
||||
// Child tables with FK to deployments(id)
|
||||
h.ormClient.Exec(ctx, "DELETE FROM deployment_replicas WHERE deployment_id = ?", dep.ID)
|
||||
h.ormClient.Exec(ctx, "DELETE FROM port_allocations WHERE deployment_id = ?", dep.ID)
|
||||
h.ormClient.Exec(ctx, "DELETE FROM deployment_domains WHERE deployment_id = ?", dep.ID)
|
||||
h.ormClient.Exec(ctx, "DELETE FROM deployment_history WHERE deployment_id = ?", dep.ID)
|
||||
h.ormClient.Exec(ctx, "DELETE FROM deployment_env_vars WHERE deployment_id = ?", dep.ID)
|
||||
h.ormClient.Exec(ctx, "DELETE FROM deployment_events WHERE deployment_id = ?", dep.ID)
|
||||
h.ormClient.Exec(ctx, "DELETE FROM deployment_health_checks WHERE deployment_id = ?", dep.ID)
|
||||
// Tables with no FK constraint
|
||||
h.ormClient.Exec(ctx, "DELETE FROM dns_records WHERE deployment_id = ?", dep.ID)
|
||||
h.ormClient.Exec(ctx, "DELETE FROM global_deployment_subdomains WHERE deployment_id = ?", dep.ID)
|
||||
}
|
||||
h.ormClient.Exec(ctx, "DELETE FROM deployments WHERE namespace = ?", ns)
|
||||
|
||||
h.logger.Info("Deployment cleanup completed",
|
||||
zap.String("namespace", ns),
|
||||
zap.Int("deployments_cleaned", len(deps)))
|
||||
}
|
||||
|
||||
// teardownDeploymentReplicas sends a teardown request to every node that has a replica
|
||||
// of the given deployment. Each node stops its process, removes files, and deallocates its port.
|
||||
func (h *DeleteHandler) teardownDeploymentReplicas(ctx context.Context, ns, deploymentID, name, depType string) {
|
||||
type replicaNode struct {
|
||||
NodeID string `db:"node_id"`
|
||||
InternalIP string `db:"internal_ip"`
|
||||
}
|
||||
var nodes []replicaNode
|
||||
query := `
|
||||
SELECT dr.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip
|
||||
FROM deployment_replicas dr
|
||||
JOIN dns_nodes dn ON dr.node_id = dn.id
|
||||
WHERE dr.deployment_id = ?
|
||||
`
|
||||
if err := h.ormClient.Query(ctx, &nodes, query, deploymentID); err != nil {
|
||||
h.logger.Warn("Failed to query replica nodes for teardown",
|
||||
zap.String("deployment_id", deploymentID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"deployment_id": deploymentID,
|
||||
"namespace": ns,
|
||||
"name": name,
|
||||
"type": depType,
|
||||
}
|
||||
jsonData, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to marshal teardown payload", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
url := fmt.Sprintf("http://%s:6001/v1/internal/deployments/replica/teardown", node.InternalIP)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonData))
|
||||
if err != nil {
|
||||
h.logger.Warn("Failed to create teardown request",
|
||||
zap.String("node_id", node.NodeID), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-Orama-Internal-Auth", "replica-coordination")
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
h.logger.Warn("Failed to send teardown to replica node",
|
||||
zap.String("deployment_id", deploymentID),
|
||||
zap.String("node_id", node.NodeID),
|
||||
zap.String("node_ip", node.InternalIP),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
resp.Body.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// unpinNamespaceContent unpins all IPFS content owned by the namespace.
|
||||
// Best-effort: individual failures are logged but do not abort deletion.
|
||||
func (h *DeleteHandler) unpinNamespaceContent(ctx context.Context, ns string) {
|
||||
|
||||
@ -17,7 +17,7 @@ import (
|
||||
|
||||
// SpawnRequest represents a request to spawn or stop a namespace instance
|
||||
type SpawnRequest struct {
|
||||
Action string `json:"action"` // "spawn-rqlite", "spawn-olric", "spawn-gateway", "stop-rqlite", "stop-olric", "stop-gateway"
|
||||
Action string `json:"action"` // "spawn-rqlite", "spawn-olric", "spawn-gateway", "stop-rqlite", "stop-olric", "stop-gateway", "save-cluster-state", "delete-cluster-state"
|
||||
Namespace string `json:"namespace"`
|
||||
NodeID string `json:"node_id"`
|
||||
|
||||
@ -234,6 +234,14 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
||||
|
||||
case "delete-cluster-state":
|
||||
if err := h.systemdSpawner.DeleteClusterState(req.Namespace); err != nil {
|
||||
h.logger.Error("Failed to delete cluster state", zap.Error(err))
|
||||
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
|
||||
return
|
||||
}
|
||||
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
||||
|
||||
default:
|
||||
writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: fmt.Sprintf("unknown action: %s", req.Action)})
|
||||
}
|
||||
|
||||
@ -818,7 +818,9 @@ func (cm *ClusterManager) rollbackProvisioning(ctx context.Context, cluster *Nam
|
||||
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusFailed, "Provisioning failed and rolled back")
|
||||
}
|
||||
|
||||
// DeprovisionCluster tears down a namespace cluster
|
||||
// DeprovisionCluster tears down a namespace cluster on all nodes.
|
||||
// Stops namespace infrastructure (Gateway, Olric, RQLite) on every cluster node,
|
||||
// deletes cluster-state.json, deallocates ports, removes DNS records, and cleans up DB.
|
||||
func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID int64) error {
|
||||
cluster, err := cm.GetClusterByNamespaceID(ctx, namespaceID)
|
||||
if err != nil {
|
||||
@ -837,16 +839,59 @@ func (cm *ClusterManager) DeprovisionCluster(ctx context.Context, namespaceID in
|
||||
cm.logEvent(ctx, cluster.ID, EventDeprovisionStarted, "", "Cluster deprovisioning started", nil)
|
||||
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusDeprovisioning, "")
|
||||
|
||||
// Stop all services using systemd
|
||||
cm.systemdSpawner.StopAll(ctx, cluster.NamespaceName)
|
||||
// 1. Get cluster nodes WITH IPs (must happen before any DB deletion)
|
||||
type deprovisionNodeInfo struct {
|
||||
NodeID string `db:"node_id"`
|
||||
InternalIP string `db:"internal_ip"`
|
||||
}
|
||||
var clusterNodes []deprovisionNodeInfo
|
||||
nodeQuery := `
|
||||
SELECT ncn.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip
|
||||
FROM namespace_cluster_nodes ncn
|
||||
JOIN dns_nodes dn ON ncn.node_id = dn.id
|
||||
WHERE ncn.namespace_cluster_id = ?
|
||||
`
|
||||
if err := cm.db.Query(ctx, &clusterNodes, nodeQuery, cluster.ID); err != nil {
|
||||
cm.logger.Warn("Failed to query cluster nodes for deprovisioning, falling back to local-only stop", zap.Error(err))
|
||||
// Fall back to local-only stop (individual methods, NOT StopAll which uses dangerous glob)
|
||||
cm.systemdSpawner.StopGateway(ctx, cluster.NamespaceName, cm.localNodeID)
|
||||
cm.systemdSpawner.StopOlric(ctx, cluster.NamespaceName, cm.localNodeID)
|
||||
cm.systemdSpawner.StopRQLite(ctx, cluster.NamespaceName, cm.localNodeID)
|
||||
cm.systemdSpawner.DeleteClusterState(cluster.NamespaceName)
|
||||
} else {
|
||||
// 2. Stop namespace infra on ALL nodes (reverse dependency order: Gateway → Olric → RQLite)
|
||||
for _, node := range clusterNodes {
|
||||
cm.stopGatewayOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName)
|
||||
}
|
||||
for _, node := range clusterNodes {
|
||||
cm.stopOlricOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName)
|
||||
}
|
||||
for _, node := range clusterNodes {
|
||||
cm.stopRQLiteOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName, nil)
|
||||
}
|
||||
|
||||
// Deallocate all ports
|
||||
// 3. Delete cluster-state.json on all nodes
|
||||
for _, node := range clusterNodes {
|
||||
if node.NodeID == cm.localNodeID {
|
||||
cm.systemdSpawner.DeleteClusterState(cluster.NamespaceName)
|
||||
} else {
|
||||
cm.sendStopRequest(ctx, node.InternalIP, "delete-cluster-state", cluster.NamespaceName, node.NodeID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Deallocate all ports
|
||||
cm.portAllocator.DeallocateAllPortBlocks(ctx, cluster.ID)
|
||||
|
||||
// Delete DNS records
|
||||
// 5. Delete namespace DNS records
|
||||
cm.dnsManager.DeleteNamespaceRecords(ctx, cluster.NamespaceName)
|
||||
|
||||
// Delete cluster record
|
||||
// 6. Explicitly delete child tables (FK cascades disabled in rqlite)
|
||||
cm.db.Exec(ctx, `DELETE FROM namespace_cluster_events WHERE namespace_cluster_id = ?`, cluster.ID)
|
||||
cm.db.Exec(ctx, `DELETE FROM namespace_cluster_nodes WHERE namespace_cluster_id = ?`, cluster.ID)
|
||||
cm.db.Exec(ctx, `DELETE FROM namespace_port_allocations WHERE namespace_cluster_id = ?`, cluster.ID)
|
||||
|
||||
// 7. Delete cluster record
|
||||
cm.db.Exec(ctx, `DELETE FROM namespace_clusters WHERE id = ?`, cluster.ID)
|
||||
|
||||
cm.logEvent(ctx, cluster.ID, EventDeprovisioned, "", "Cluster deprovisioned", nil)
|
||||
|
||||
@ -306,6 +306,18 @@ func (s *SystemdSpawner) SaveClusterState(namespace string, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteClusterState removes cluster state and config files for a namespace.
|
||||
func (s *SystemdSpawner) DeleteClusterState(namespace string) error {
|
||||
dir := filepath.Join(s.namespaceBase, namespace)
|
||||
if err := os.RemoveAll(dir); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to delete namespace data directory: %w", err)
|
||||
}
|
||||
s.logger.Info("Deleted namespace data directory",
|
||||
zap.String("namespace", namespace),
|
||||
zap.String("path", dir))
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopAll stops all services for a namespace, including deployment processes
|
||||
func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error {
|
||||
s.logger.Info("Stopping all namespace services via systemd",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user