From 51371e199d7876ea7a40c0faff8dd1d8f45620c3 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Sat, 31 Jan 2026 10:07:15 +0200 Subject: [PATCH] Added self signed cert fallback, fixed dns bugs --- docs/DEV_DEPLOY.md | 62 ++++++ .../production/installers/caddy.go | 15 +- .../production/installers/gateway.go | 9 +- pkg/gateway/gateway.go | 8 + .../handlers/namespace/spawn_handler.go | 174 +++++++++++++++ pkg/gateway/middleware.go | 5 + pkg/gateway/routes.go | 5 + pkg/namespace/cluster_manager.go | 208 +++++++++++++++--- pkg/node/gateway.go | 11 + 9 files changed, 464 insertions(+), 33 deletions(-) create mode 100644 pkg/gateway/handlers/namespace/spawn_handler.go diff --git a/docs/DEV_DEPLOY.md b/docs/DEV_DEPLOY.md index 9606988..038f1f6 100644 --- a/docs/DEV_DEPLOY.md +++ b/docs/DEV_DEPLOY.md @@ -176,6 +176,68 @@ Always follow the local-first approach: Never fix issues directly on the server — those fixes are lost on next deployment. +## Trusting the Self-Signed TLS Certificate + +When Let's Encrypt is rate-limited, Caddy falls back to its internal CA (self-signed certificates). Browsers will show security warnings unless you install the root CA certificate. + +### Downloading the Root CA Certificate + +From VPS 1 (or any node), copy the certificate: + +```bash +# Copy the cert to an accessible location on the VPS +ssh ubuntu@ "sudo cp /var/lib/caddy/.local/share/caddy/pki/authorities/local/root.crt /tmp/caddy-root-ca.crt && sudo chmod 644 /tmp/caddy-root-ca.crt" + +# Download to your local machine +scp ubuntu@:/tmp/caddy-root-ca.crt ~/Downloads/caddy-root-ca.crt +``` + +### macOS + +```bash +sudo security add-trusted-cert -d -r trustRoot -k /Library/Keychains/System.keychain ~/Downloads/caddy-root-ca.crt +``` + +This adds the cert system-wide. All browsers (Safari, Chrome, Arc, etc.) will trust it immediately. Firefox uses its own certificate store — go to **Settings > Privacy & Security > Certificates > View Certificates > Import** and import the `.crt` file there. + +To remove it later: +```bash +sudo security remove-trusted-cert -d ~/Downloads/caddy-root-ca.crt +``` + +### iOS (iPhone/iPad) + +1. Transfer `caddy-root-ca.crt` to your device (AirDrop, email attachment, or host it on a URL) +2. Open the file — iOS will show "Profile Downloaded" +3. Go to **Settings > General > VPN & Device Management** (or "Profiles" on older iOS) +4. Tap the "Caddy Local Authority" profile and tap **Install** +5. Go to **Settings > General > About > Certificate Trust Settings** +6. Enable **full trust** for "Caddy Local Authority - 2026 ECC Root" + +### Android + +1. Transfer `caddy-root-ca.crt` to your device +2. Go to **Settings > Security > Encryption & Credentials > Install a certificate > CA certificate** +3. Select the `caddy-root-ca.crt` file +4. Confirm the installation + +Note: On Android 7+, user-installed CA certificates are only trusted by apps that explicitly opt in. Chrome will trust it, but some apps may not. + +### Windows + +```powershell +certutil -addstore -f "ROOT" caddy-root-ca.crt +``` + +Or double-click the `.crt` file > **Install Certificate** > **Local Machine** > **Place in "Trusted Root Certification Authorities"**. + +### Linux + +```bash +sudo cp caddy-root-ca.crt /usr/local/share/ca-certificates/caddy-root-ca.crt +sudo update-ca-certificates +``` + ## Project Structure See [ARCHITECTURE.md](ARCHITECTURE.md) for the full architecture overview. diff --git a/pkg/environments/production/installers/caddy.go b/pkg/environments/production/installers/caddy.go index 1954758..504e071 100644 --- a/pkg/environments/production/installers/caddy.go +++ b/pkg/environments/production/installers/caddy.go @@ -17,9 +17,9 @@ const ( // CaddyInstaller handles Caddy installation with custom DNS module type CaddyInstaller struct { *BaseInstaller - version string - oramaHome string - dnsModule string // Path to the orama DNS module source + version string + oramaHome string + dnsModule string // Path to the orama DNS module source } // NewCaddyInstaller creates a new Caddy installer @@ -371,10 +371,15 @@ require ( // If baseDomain is provided and different from domain, Caddy also serves // the base domain and its wildcard (e.g., *.dbrs.space alongside *.node1.dbrs.space). func (ci *CaddyInstaller) generateCaddyfile(domain, email, acmeEndpoint, baseDomain string) string { + // Primary: Let's Encrypt via ACME DNS-01 challenge + // Fallback: Caddy's internal CA (self-signed, trust root on clients) tlsBlock := fmt.Sprintf(` tls { - dns orama { - endpoint %s + issuer acme { + dns orama { + endpoint %s + } } + issuer internal }`, acmeEndpoint) var sb strings.Builder diff --git a/pkg/environments/production/installers/gateway.go b/pkg/environments/production/installers/gateway.go index 6600de2..050d818 100644 --- a/pkg/environments/production/installers/gateway.go +++ b/pkg/environments/production/installers/gateway.go @@ -171,7 +171,11 @@ func (gi *GatewayInstaller) InstallDeBrosBinaries(branch string, oramaHome strin return fmt.Errorf("source bin directory is empty - build may have failed") } - // Copy each binary individually to avoid wildcard expansion issues + // Copy each binary individually to avoid wildcard expansion issues. + // We remove the destination first to avoid "text file busy" errors when + // overwriting a binary that is currently executing (e.g., the orama CLI + // running this upgrade). On Linux, removing a running binary is safe — + // the kernel keeps the inode alive until the process exits. for _, entry := range entries { if entry.IsDir() { continue @@ -185,6 +189,9 @@ func (gi *GatewayInstaller) InstallDeBrosBinaries(branch string, oramaHome strin return fmt.Errorf("failed to read binary %s: %w", entry.Name(), err) } + // Remove existing binary first to avoid "text file busy" on running executables + _ = os.Remove(dstPath) + // Write destination file if err := os.WriteFile(dstPath, data, 0755); err != nil { return fmt.Errorf("failed to write binary %s: %w", entry.Name(), err) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 6acadb0..1074cbc 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -111,6 +111,9 @@ type Gateway struct { // Cluster provisioning for namespace clusters clusterProvisioner authhandlers.ClusterProvisioner + + // Namespace instance spawn handler (for distributed provisioning) + spawnHandler http.Handler } // localSubscriber represents a WebSocket subscriber for local message delivery @@ -436,6 +439,11 @@ func (g *Gateway) SetClusterProvisioner(cp authhandlers.ClusterProvisioner) { } } +// SetSpawnHandler sets the handler for internal namespace spawn/stop requests. +func (g *Gateway) SetSpawnHandler(h http.Handler) { + g.spawnHandler = h +} + // GetORMClient returns the RQLite ORM client for external use (e.g., by ClusterManager) func (g *Gateway) GetORMClient() rqlite.Client { return g.ormClient diff --git a/pkg/gateway/handlers/namespace/spawn_handler.go b/pkg/gateway/handlers/namespace/spawn_handler.go new file mode 100644 index 0000000..e66bf60 --- /dev/null +++ b/pkg/gateway/handlers/namespace/spawn_handler.go @@ -0,0 +1,174 @@ +package namespace + +import ( + "encoding/json" + "fmt" + "net/http" + "sync" + + "github.com/DeBrosOfficial/network/pkg/olric" + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// SpawnRequest represents a request to spawn or stop a namespace instance +type SpawnRequest struct { + Action string `json:"action"` // "spawn-rqlite", "spawn-olric", "stop-rqlite", "stop-olric" + Namespace string `json:"namespace"` + NodeID string `json:"node_id"` + + // RQLite config (when action = "spawn-rqlite") + RQLiteHTTPPort int `json:"rqlite_http_port,omitempty"` + RQLiteRaftPort int `json:"rqlite_raft_port,omitempty"` + RQLiteHTTPAdvAddr string `json:"rqlite_http_adv_addr,omitempty"` + RQLiteRaftAdvAddr string `json:"rqlite_raft_adv_addr,omitempty"` + RQLiteJoinAddrs []string `json:"rqlite_join_addrs,omitempty"` + RQLiteIsLeader bool `json:"rqlite_is_leader,omitempty"` + + // Olric config (when action = "spawn-olric") + OlricHTTPPort int `json:"olric_http_port,omitempty"` + OlricMemberlistPort int `json:"olric_memberlist_port,omitempty"` + OlricBindAddr string `json:"olric_bind_addr,omitempty"` + OlricAdvertiseAddr string `json:"olric_advertise_addr,omitempty"` + OlricPeerAddresses []string `json:"olric_peer_addresses,omitempty"` +} + +// SpawnResponse represents the response from a spawn/stop request +type SpawnResponse struct { + Success bool `json:"success"` + Error string `json:"error,omitempty"` + PID int `json:"pid,omitempty"` +} + +// SpawnHandler handles remote namespace instance spawn/stop requests. +// It tracks spawned RQLite instances locally so they can be stopped later. +type SpawnHandler struct { + rqliteSpawner *rqlite.InstanceSpawner + olricSpawner *olric.InstanceSpawner + logger *zap.Logger + rqliteInstances map[string]*rqlite.Instance // key: "namespace:nodeID" + rqliteInstanceMu sync.RWMutex +} + +// NewSpawnHandler creates a new spawn handler +func NewSpawnHandler(rs *rqlite.InstanceSpawner, os *olric.InstanceSpawner, logger *zap.Logger) *SpawnHandler { + return &SpawnHandler{ + rqliteSpawner: rs, + olricSpawner: os, + logger: logger.With(zap.String("component", "namespace-spawn-handler")), + rqliteInstances: make(map[string]*rqlite.Instance), + } +} + +// ServeHTTP implements http.Handler +func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + // Authenticate via internal auth header + if r.Header.Get("X-Orama-Internal-Auth") != "namespace-coordination" { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + + var req SpawnRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: "invalid request body"}) + return + } + + if req.Namespace == "" || req.NodeID == "" { + writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: "namespace and node_id are required"}) + return + } + + h.logger.Info("Received spawn request", + zap.String("action", req.Action), + zap.String("namespace", req.Namespace), + zap.String("node_id", req.NodeID), + ) + + ctx := r.Context() + + switch req.Action { + case "spawn-rqlite": + cfg := rqlite.InstanceConfig{ + Namespace: req.Namespace, + NodeID: req.NodeID, + HTTPPort: req.RQLiteHTTPPort, + RaftPort: req.RQLiteRaftPort, + HTTPAdvAddress: req.RQLiteHTTPAdvAddr, + RaftAdvAddress: req.RQLiteRaftAdvAddr, + JoinAddresses: req.RQLiteJoinAddrs, + IsLeader: req.RQLiteIsLeader, + } + instance, err := h.rqliteSpawner.SpawnInstance(ctx, cfg) + if err != nil { + h.logger.Error("Failed to spawn RQLite instance", zap.Error(err)) + writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) + return + } + // Track the instance for later stop requests + key := fmt.Sprintf("%s:%s", req.Namespace, req.NodeID) + h.rqliteInstanceMu.Lock() + h.rqliteInstances[key] = instance + h.rqliteInstanceMu.Unlock() + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID}) + + case "spawn-olric": + cfg := olric.InstanceConfig{ + Namespace: req.Namespace, + NodeID: req.NodeID, + HTTPPort: req.OlricHTTPPort, + MemberlistPort: req.OlricMemberlistPort, + BindAddr: req.OlricBindAddr, + AdvertiseAddr: req.OlricAdvertiseAddr, + PeerAddresses: req.OlricPeerAddresses, + } + instance, err := h.olricSpawner.SpawnInstance(ctx, cfg) + if err != nil { + h.logger.Error("Failed to spawn Olric instance", zap.Error(err)) + writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) + return + } + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID}) + + case "stop-rqlite": + key := fmt.Sprintf("%s:%s", req.Namespace, req.NodeID) + h.rqliteInstanceMu.Lock() + instance, ok := h.rqliteInstances[key] + if ok { + delete(h.rqliteInstances, key) + } + h.rqliteInstanceMu.Unlock() + if !ok { + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) // Already stopped + return + } + if err := h.rqliteSpawner.StopInstance(ctx, instance); err != nil { + h.logger.Error("Failed to stop RQLite instance", zap.Error(err)) + writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) + return + } + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) + + case "stop-olric": + if err := h.olricSpawner.StopInstance(ctx, req.Namespace, req.NodeID); err != nil { + h.logger.Error("Failed to stop Olric instance", zap.Error(err)) + writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) + return + } + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) + + default: + writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: fmt.Sprintf("unknown action: %s", req.Action)}) + } +} + +func writeSpawnResponse(w http.ResponseWriter, status int, resp SpawnResponse) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(resp) +} diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index d2f0d98..a71846a 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -229,6 +229,11 @@ func isPublicPath(p string) bool { return true } + // Namespace spawn endpoint (auth handled by internal auth header) + if p == "/v1/internal/namespace/spawn" { + return true + } + switch p { case "/health", "/v1/health", "/status", "/v1/status", "/v1/auth/jwks", "/.well-known/jwks.json", "/v1/version", "/v1/auth/login", "/v1/auth/challenge", "/v1/auth/verify", "/v1/auth/register", "/v1/auth/refresh", "/v1/auth/logout", "/v1/auth/api-key", "/v1/auth/simple-key", "/v1/network/status", "/v1/network/peers", "/v1/internal/tls/check", "/v1/internal/acme/present", "/v1/internal/acme/cleanup": return true diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index 0864e2a..f2f6820 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -36,6 +36,11 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/internal/join", g.joinHandler.HandleJoin) } + // Namespace instance spawn/stop (internal, handler does its own auth) + if g.spawnHandler != nil { + mux.Handle("/v1/internal/namespace/spawn", g.spawnHandler) + } + // auth endpoints mux.HandleFunc("/v1/auth/jwks", g.authService.JWKSHandler) mux.HandleFunc("/.well-known/jwks.json", g.authService.JWKSHandler) diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index a2c30db..561630c 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -1,9 +1,12 @@ package namespace import ( + "bytes" "context" "encoding/json" "fmt" + "io" + "net/http" "strings" "sync" "time" @@ -33,6 +36,9 @@ type ClusterManager struct { baseDomain string baseDataDir string + // Local node identity for distributed spawning + localNodeID string + // Track provisioning operations provisioningMu sync.RWMutex provisioning map[string]bool // namespace -> in progress @@ -90,6 +96,12 @@ func NewClusterManagerWithComponents( } } +// SetLocalNodeID sets this node's peer ID for local/remote dispatch during provisioning +func (cm *ClusterManager) SetLocalNodeID(id string) { + cm.localNodeID = id + cm.logger.Info("Local node ID set for distributed provisioning", zap.String("local_node_id", id)) +} + // ProvisionCluster provisions a new 3-node cluster for a namespace // This is an async operation - returns immediately with cluster ID for polling func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, namespaceName, provisionedBy string) (*NamespaceCluster, error) { @@ -168,14 +180,14 @@ func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, // Start RQLite instances (leader first, then followers) rqliteInstances, err := cm.startRQLiteCluster(ctx, cluster, nodes, portBlocks) if err != nil { - cm.rollbackProvisioning(ctx, cluster, portBlocks, nil, nil) + cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, nil, nil) return nil, fmt.Errorf("failed to start RQLite cluster: %w", err) } // Start Olric instances olricInstances, err := cm.startOlricCluster(ctx, cluster, nodes, portBlocks) if err != nil { - cm.rollbackProvisioning(ctx, cluster, portBlocks, rqliteInstances, nil) + cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, nil) return nil, fmt.Errorf("failed to start Olric cluster: %w", err) } @@ -190,7 +202,7 @@ func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, ) cm.logEvent(ctx, cluster.ID, "gateway_skipped", "", "Gateway binary not available, cluster will use main gateway", nil) } else { - cm.rollbackProvisioning(ctx, cluster, portBlocks, rqliteInstances, olricInstances) + cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances) return nil, fmt.Errorf("failed to start Gateway cluster: %w", err) } } @@ -216,7 +228,7 @@ func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, return cluster, nil } -// startRQLiteCluster starts RQLite instances on all nodes +// startRQLiteCluster starts RQLite instances on all nodes (locally or remotely) func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) ([]*rqlite.Instance, error) { instances := make([]*rqlite.Instance, len(nodes)) @@ -231,7 +243,15 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names IsLeader: true, } - leaderInstance, err := cm.rqliteSpawner.SpawnInstance(ctx, leaderCfg) + var leaderInstance *rqlite.Instance + var err error + if nodes[0].NodeID == cm.localNodeID { + cm.logger.Info("Spawning RQLite leader locally", zap.String("node", nodes[0].NodeID)) + leaderInstance, err = cm.rqliteSpawner.SpawnInstance(ctx, leaderCfg) + } else { + cm.logger.Info("Spawning RQLite leader remotely", zap.String("node", nodes[0].NodeID), zap.String("ip", nodes[0].InternalIP)) + leaderInstance, err = cm.spawnRQLiteRemote(ctx, nodes[0].InternalIP, leaderCfg) + } if err != nil { return nil, fmt.Errorf("failed to start RQLite leader: %w", err) } @@ -240,13 +260,11 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names cm.logEvent(ctx, cluster.ID, EventRQLiteStarted, nodes[0].NodeID, "RQLite leader started", nil) cm.logEvent(ctx, cluster.ID, EventRQLiteLeaderElected, nodes[0].NodeID, "RQLite leader elected", nil) - // Record leader node if err := cm.insertClusterNode(ctx, cluster.ID, nodes[0].NodeID, NodeRoleRQLiteLeader, portBlocks[0]); err != nil { cm.logger.Warn("Failed to record cluster node", zap.Error(err)) } // Start followers - // Note: RQLite's -join flag requires the Raft address, not the HTTP address leaderRaftAddr := leaderCfg.RaftAdvAddress for i := 1; i < len(nodes); i++ { followerCfg := rqlite.InstanceConfig{ @@ -260,11 +278,18 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names IsLeader: false, } - followerInstance, err := cm.rqliteSpawner.SpawnInstance(ctx, followerCfg) + var followerInstance *rqlite.Instance + if nodes[i].NodeID == cm.localNodeID { + cm.logger.Info("Spawning RQLite follower locally", zap.String("node", nodes[i].NodeID)) + followerInstance, err = cm.rqliteSpawner.SpawnInstance(ctx, followerCfg) + } else { + cm.logger.Info("Spawning RQLite follower remotely", zap.String("node", nodes[i].NodeID), zap.String("ip", nodes[i].InternalIP)) + followerInstance, err = cm.spawnRQLiteRemote(ctx, nodes[i].InternalIP, followerCfg) + } if err != nil { // Stop previously started instances for j := 0; j < i; j++ { - cm.rqliteSpawner.StopInstance(ctx, instances[j]) + cm.stopRQLiteOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName, instances[j]) } return nil, fmt.Errorf("failed to start RQLite follower on node %s: %w", nodes[i].NodeID, err) } @@ -281,7 +306,7 @@ func (cm *ClusterManager) startRQLiteCluster(ctx context.Context, cluster *Names return instances, nil } -// startOlricCluster starts Olric instances on all nodes +// startOlricCluster starts Olric instances on all nodes (locally or remotely) func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) ([]*olric.OlricInstance, error) { instances := make([]*olric.OlricInstance, len(nodes)) @@ -298,16 +323,24 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp NodeID: node.NodeID, HTTPPort: portBlocks[i].OlricHTTPPort, MemberlistPort: portBlocks[i].OlricMemberlistPort, - BindAddr: node.InternalIP, - AdvertiseAddr: node.InternalIP, + BindAddr: node.InternalIP, // Bind to node's WG IP (0.0.0.0 resolves to IPv6 on some hosts) + AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers PeerAddresses: peerAddresses, } - instance, err := cm.olricSpawner.SpawnInstance(ctx, cfg) + var instance *olric.OlricInstance + var err error + if node.NodeID == cm.localNodeID { + cm.logger.Info("Spawning Olric locally", zap.String("node", node.NodeID)) + instance, err = cm.olricSpawner.SpawnInstance(ctx, cfg) + } else { + cm.logger.Info("Spawning Olric remotely", zap.String("node", node.NodeID), zap.String("ip", node.InternalIP)) + instance, err = cm.spawnOlricRemote(ctx, node.InternalIP, cfg) + } if err != nil { // Stop previously started instances for j := 0; j < i; j++ { - cm.olricSpawner.StopInstance(ctx, cluster.NamespaceName, nodes[j].NodeID) + cm.stopOlricOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName) } return nil, fmt.Errorf("failed to start Olric on node %s: %w", node.NodeID, err) } @@ -368,6 +401,125 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name return instances, nil } +// spawnRQLiteRemote sends a spawn-rqlite request to a remote node +func (cm *ClusterManager) spawnRQLiteRemote(ctx context.Context, nodeIP string, cfg rqlite.InstanceConfig) (*rqlite.Instance, error) { + resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ + "action": "spawn-rqlite", + "namespace": cfg.Namespace, + "node_id": cfg.NodeID, + "rqlite_http_port": cfg.HTTPPort, + "rqlite_raft_port": cfg.RaftPort, + "rqlite_http_adv_addr": cfg.HTTPAdvAddress, + "rqlite_raft_adv_addr": cfg.RaftAdvAddress, + "rqlite_join_addrs": cfg.JoinAddresses, + "rqlite_is_leader": cfg.IsLeader, + }) + if err != nil { + return nil, err + } + return &rqlite.Instance{PID: resp.PID}, nil +} + +// spawnOlricRemote sends a spawn-olric request to a remote node +func (cm *ClusterManager) spawnOlricRemote(ctx context.Context, nodeIP string, cfg olric.InstanceConfig) (*olric.OlricInstance, error) { + resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ + "action": "spawn-olric", + "namespace": cfg.Namespace, + "node_id": cfg.NodeID, + "olric_http_port": cfg.HTTPPort, + "olric_memberlist_port": cfg.MemberlistPort, + "olric_bind_addr": cfg.BindAddr, + "olric_advertise_addr": cfg.AdvertiseAddr, + "olric_peer_addresses": cfg.PeerAddresses, + }) + if err != nil { + return nil, err + } + return &olric.OlricInstance{PID: resp.PID}, nil +} + +// spawnResponse represents the JSON response from a spawn request +type spawnResponse struct { + Success bool `json:"success"` + Error string `json:"error,omitempty"` + PID int `json:"pid,omitempty"` +} + +// sendSpawnRequest sends a spawn/stop request to a remote node's spawn endpoint +func (cm *ClusterManager) sendSpawnRequest(ctx context.Context, nodeIP string, req map[string]interface{}) (*spawnResponse, error) { + url := fmt.Sprintf("http://%s:6001/v1/internal/namespace/spawn", nodeIP) + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal spawn request: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("X-Orama-Internal-Auth", "namespace-coordination") + + client := &http.Client{Timeout: 60 * time.Second} + resp, err := client.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("failed to send spawn request to %s: %w", nodeIP, err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response from %s: %w", nodeIP, err) + } + + var spawnResp spawnResponse + if err := json.Unmarshal(respBody, &spawnResp); err != nil { + return nil, fmt.Errorf("failed to decode response from %s: %w", nodeIP, err) + } + + if !spawnResp.Success { + return nil, fmt.Errorf("spawn request failed on %s: %s", nodeIP, spawnResp.Error) + } + + return &spawnResp, nil +} + +// stopRQLiteOnNode stops a RQLite instance on a node (local or remote) +func (cm *ClusterManager) stopRQLiteOnNode(ctx context.Context, nodeID, nodeIP, namespace string, inst *rqlite.Instance) { + if nodeID == cm.localNodeID { + if inst != nil { + cm.rqliteSpawner.StopInstance(ctx, inst) + } + } else { + cm.sendStopRequest(ctx, nodeIP, "stop-rqlite", namespace, nodeID) + } +} + +// stopOlricOnNode stops an Olric instance on a node (local or remote) +func (cm *ClusterManager) stopOlricOnNode(ctx context.Context, nodeID, nodeIP, namespace string) { + if nodeID == cm.localNodeID { + cm.olricSpawner.StopInstance(ctx, namespace, nodeID) + } else { + cm.sendStopRequest(ctx, nodeIP, "stop-olric", namespace, nodeID) + } +} + +// sendStopRequest sends a stop request to a remote node +func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, namespace, nodeID string) { + _, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ + "action": action, + "namespace": namespace, + "node_id": nodeID, + }) + if err != nil { + cm.logger.Warn("Failed to send stop request to remote node", + zap.String("node_ip", nodeIP), + zap.String("action", action), + zap.Error(err), + ) + } +} + // createDNSRecords creates DNS records for the namespace gateway func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error { // Create A records for ns-{namespace}.{baseDomain} pointing to all 3 nodes @@ -399,22 +551,24 @@ func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *Namespa } // rollbackProvisioning cleans up a failed provisioning attempt -func (cm *ClusterManager) rollbackProvisioning(ctx context.Context, cluster *NamespaceCluster, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) { +func (cm *ClusterManager) rollbackProvisioning(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) { cm.logger.Info("Rolling back failed provisioning", zap.String("cluster_id", cluster.ID)) - // Stop Gateway instances + // Stop Gateway instances (local only for now) cm.gatewaySpawner.StopAllInstances(ctx, cluster.NamespaceName) - // Stop Olric instances - if olricInstances != nil { - cm.olricSpawner.StopAllInstances(ctx, cluster.NamespaceName) + // Stop Olric instances on each node + if olricInstances != nil && nodes != nil { + for _, node := range nodes { + cm.stopOlricOnNode(ctx, node.NodeID, node.InternalIP, cluster.NamespaceName) + } } - // Stop RQLite instances - if rqliteInstances != nil { - for _, inst := range rqliteInstances { - if inst != nil { - cm.rqliteSpawner.StopInstance(ctx, inst) + // Stop RQLite instances on each node + if rqliteInstances != nil && nodes != nil { + for i, inst := range rqliteInstances { + if inst != nil && i < len(nodes) { + cm.stopRQLiteOnNode(ctx, nodes[i].NodeID, nodes[i].InternalIP, cluster.NamespaceName, inst) } } } @@ -768,7 +922,7 @@ func (cm *ClusterManager) provisionClusterAsync(cluster *NamespaceCluster, names // Start RQLite instances (leader first, then followers) rqliteInstances, err := cm.startRQLiteCluster(ctx, cluster, nodes, portBlocks) if err != nil { - cm.rollbackProvisioning(ctx, cluster, portBlocks, nil, nil) + cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, nil, nil) cm.logger.Error("Failed to start RQLite cluster", zap.Error(err)) return } @@ -776,7 +930,7 @@ func (cm *ClusterManager) provisionClusterAsync(cluster *NamespaceCluster, names // Start Olric instances olricInstances, err := cm.startOlricCluster(ctx, cluster, nodes, portBlocks) if err != nil { - cm.rollbackProvisioning(ctx, cluster, portBlocks, rqliteInstances, nil) + cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, nil) cm.logger.Error("Failed to start Olric cluster", zap.Error(err)) return } @@ -792,7 +946,7 @@ func (cm *ClusterManager) provisionClusterAsync(cluster *NamespaceCluster, names ) cm.logEvent(ctx, cluster.ID, "gateway_skipped", "", "Gateway binary not available, cluster will use main gateway", nil) } else { - cm.rollbackProvisioning(ctx, cluster, portBlocks, rqliteInstances, olricInstances) + cm.rollbackProvisioning(ctx, cluster, nodes, portBlocks, rqliteInstances, olricInstances) cm.logger.Error("Failed to start Gateway cluster", zap.Error(err)) return } diff --git a/pkg/node/gateway.go b/pkg/node/gateway.go index 4052a4d..b3da3ad 100644 --- a/pkg/node/gateway.go +++ b/pkg/node/gateway.go @@ -8,9 +8,12 @@ import ( "path/filepath" "github.com/DeBrosOfficial/network/pkg/gateway" + namespacehandlers "github.com/DeBrosOfficial/network/pkg/gateway/handlers/namespace" "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/namespace" + olricpkg "github.com/DeBrosOfficial/network/pkg/olric" + rqlitepkg "github.com/DeBrosOfficial/network/pkg/rqlite" "go.uber.org/zap" ) @@ -72,7 +75,15 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { BaseDataDir: baseDataDir, } clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger) + clusterManager.SetLocalNodeID(gwCfg.NodePeerID) apiGateway.SetClusterProvisioner(clusterManager) + + // Wire spawn handler for distributed namespace instance spawning + rqliteSpawner := rqlitepkg.NewInstanceSpawner(baseDataDir, n.logger.Logger) + olricSpawner := olricpkg.NewInstanceSpawner(baseDataDir, n.logger.Logger) + spawnHandler := namespacehandlers.NewSpawnHandler(rqliteSpawner, olricSpawner, n.logger.Logger) + apiGateway.SetSpawnHandler(spawnHandler) + n.logger.ComponentInfo(logging.ComponentNode, "Namespace cluster provisioning enabled", zap.String("base_domain", clusterCfg.BaseDomain), zap.String("base_data_dir", baseDataDir))