diff --git a/Makefile b/Makefile index d57ff9b..827fc78 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ test-e2e-quick: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.101.5 +VERSION := 0.101.6 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/gateway/handlers/namespace/spawn_handler.go b/pkg/gateway/handlers/namespace/spawn_handler.go index 4f16128..39378f8 100644 --- a/pkg/gateway/handlers/namespace/spawn_handler.go +++ b/pkg/gateway/handlers/namespace/spawn_handler.go @@ -36,14 +36,19 @@ type SpawnRequest struct { OlricPeerAddresses []string `json:"olric_peer_addresses,omitempty"` // Gateway config (when action = "spawn-gateway") - GatewayHTTPPort int `json:"gateway_http_port,omitempty"` - GatewayBaseDomain string `json:"gateway_base_domain,omitempty"` - GatewayRQLiteDSN string `json:"gateway_rqlite_dsn,omitempty"` - GatewayOlricServers []string `json:"gateway_olric_servers,omitempty"` - IPFSClusterAPIURL string `json:"ipfs_cluster_api_url,omitempty"` - IPFSAPIURL string `json:"ipfs_api_url,omitempty"` - IPFSTimeout string `json:"ipfs_timeout,omitempty"` - IPFSReplicationFactor int `json:"ipfs_replication_factor,omitempty"` + GatewayHTTPPort int `json:"gateway_http_port,omitempty"` + GatewayBaseDomain string `json:"gateway_base_domain,omitempty"` + GatewayRQLiteDSN string `json:"gateway_rqlite_dsn,omitempty"` + GatewayGlobalRQLiteDSN string `json:"gateway_global_rqlite_dsn,omitempty"` + GatewayOlricServers []string `json:"gateway_olric_servers,omitempty"` + GatewayOlricTimeout string `json:"gateway_olric_timeout,omitempty"` + IPFSClusterAPIURL string `json:"ipfs_cluster_api_url,omitempty"` + IPFSAPIURL string `json:"ipfs_api_url,omitempty"` + IPFSTimeout string `json:"ipfs_timeout,omitempty"` + IPFSReplicationFactor int `json:"ipfs_replication_factor,omitempty"` + + // Cluster state (when action = "save-cluster-state") + ClusterState json.RawMessage `json:"cluster_state,omitempty"` } // SpawnResponse represents the response from a spawn/stop request @@ -122,6 +127,13 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) case "spawn-olric": + // Reject empty or 0.0.0.0 BindAddr early — these cause IPv6 resolution on dual-stack hosts + if req.OlricBindAddr == "" || req.OlricBindAddr == "0.0.0.0" { + writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{ + Error: fmt.Sprintf("olric_bind_addr must be a valid IP, got %q", req.OlricBindAddr), + }) + return + } cfg := olric.InstanceConfig{ Namespace: req.Namespace, NodeID: req.NodeID, @@ -166,13 +178,28 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } + // Parse Olric timeout if provided + var olricTimeout time.Duration + if req.GatewayOlricTimeout != "" { + var err error + olricTimeout, err = time.ParseDuration(req.GatewayOlricTimeout) + if err != nil { + h.logger.Warn("Invalid Olric timeout, using default", zap.String("timeout", req.GatewayOlricTimeout), zap.Error(err)) + olricTimeout = 30 * time.Second + } + } else { + olricTimeout = 30 * time.Second + } + cfg := gateway.InstanceConfig{ Namespace: req.Namespace, NodeID: req.NodeID, HTTPPort: req.GatewayHTTPPort, BaseDomain: req.GatewayBaseDomain, RQLiteDSN: req.GatewayRQLiteDSN, + GlobalRQLiteDSN: req.GatewayGlobalRQLiteDSN, OlricServers: req.GatewayOlricServers, + OlricTimeout: olricTimeout, IPFSClusterAPIURL: req.IPFSClusterAPIURL, IPFSAPIURL: req.IPFSAPIURL, IPFSTimeout: ipfsTimeout, @@ -193,6 +220,18 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) + case "save-cluster-state": + if len(req.ClusterState) == 0 { + writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: "cluster_state is required"}) + return + } + if err := h.systemdSpawner.SaveClusterState(req.Namespace, req.ClusterState); err != nil { + h.logger.Error("Failed to save cluster state", zap.Error(err)) + writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()}) + return + } + writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true}) + default: writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: fmt.Sprintf("unknown action: %s", req.Action)}) } diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index d431ff8..82914f5 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -309,6 +309,9 @@ func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "") cm.logEvent(ctx, cluster.ID, EventClusterReady, "", "Cluster is ready", nil) + // Save cluster-state.json on all nodes (local + remote) for disk-based restore on restart + cm.saveClusterStateToAllNodes(ctx, cluster, nodes, portBlocks) + cm.logger.Info("Cluster provisioning completed", zap.String("cluster_id", cluster.ID), zap.String("namespace", namespaceName), @@ -630,18 +633,25 @@ func (cm *ClusterManager) spawnGatewayRemote(ctx context.Context, nodeIP string, ipfsTimeout = cfg.IPFSTimeout.String() } + olricTimeout := "" + if cfg.OlricTimeout > 0 { + olricTimeout = cfg.OlricTimeout.String() + } + resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ - "action": "spawn-gateway", - "namespace": cfg.Namespace, - "node_id": cfg.NodeID, - "gateway_http_port": cfg.HTTPPort, - "gateway_base_domain": cfg.BaseDomain, - "gateway_rqlite_dsn": cfg.RQLiteDSN, - "gateway_olric_servers": cfg.OlricServers, - "ipfs_cluster_api_url": cfg.IPFSClusterAPIURL, - "ipfs_api_url": cfg.IPFSAPIURL, - "ipfs_timeout": ipfsTimeout, - "ipfs_replication_factor": cfg.IPFSReplicationFactor, + "action": "spawn-gateway", + "namespace": cfg.Namespace, + "node_id": cfg.NodeID, + "gateway_http_port": cfg.HTTPPort, + "gateway_base_domain": cfg.BaseDomain, + "gateway_rqlite_dsn": cfg.RQLiteDSN, + "gateway_global_rqlite_dsn": cfg.GlobalRQLiteDSN, + "gateway_olric_servers": cfg.OlricServers, + "gateway_olric_timeout": olricTimeout, + "ipfs_cluster_api_url": cfg.IPFSClusterAPIURL, + "ipfs_api_url": cfg.IPFSAPIURL, + "ipfs_timeout": ipfsTimeout, + "ipfs_replication_factor": cfg.IPFSReplicationFactor, }) if err != nil { return nil, err @@ -1566,6 +1576,69 @@ type ClusterLocalStateNode struct { OlricMemberlistPort int `json:"olric_memberlist_port"` } +// saveClusterStateToAllNodes builds and saves cluster-state.json on every node in the cluster. +// Each node gets its own state file with node-specific LocalNodeID, LocalIP, and LocalPorts. +func (cm *ClusterManager) saveClusterStateToAllNodes(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) { + // Build the shared AllNodes list + var allNodes []ClusterLocalStateNode + for i, node := range nodes { + allNodes = append(allNodes, ClusterLocalStateNode{ + NodeID: node.NodeID, + InternalIP: node.InternalIP, + RQLiteHTTPPort: portBlocks[i].RQLiteHTTPPort, + RQLiteRaftPort: portBlocks[i].RQLiteRaftPort, + OlricHTTPPort: portBlocks[i].OlricHTTPPort, + OlricMemberlistPort: portBlocks[i].OlricMemberlistPort, + }) + } + + for i, node := range nodes { + state := &ClusterLocalState{ + ClusterID: cluster.ID, + NamespaceName: cluster.NamespaceName, + LocalNodeID: node.NodeID, + LocalIP: node.InternalIP, + LocalPorts: ClusterLocalStatePorts{ + RQLiteHTTPPort: portBlocks[i].RQLiteHTTPPort, + RQLiteRaftPort: portBlocks[i].RQLiteRaftPort, + OlricHTTPPort: portBlocks[i].OlricHTTPPort, + OlricMemberlistPort: portBlocks[i].OlricMemberlistPort, + GatewayHTTPPort: portBlocks[i].GatewayHTTPPort, + }, + AllNodes: allNodes, + HasGateway: true, + BaseDomain: cm.baseDomain, + SavedAt: time.Now(), + } + + if node.NodeID == cm.localNodeID { + // Save locally + if err := cm.saveLocalState(state); err != nil { + cm.logger.Warn("Failed to save local cluster state", zap.String("namespace", cluster.NamespaceName), zap.Error(err)) + } + } else { + // Send to remote node + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + cm.logger.Warn("Failed to marshal cluster state for remote node", zap.String("node", node.NodeID), zap.Error(err)) + continue + } + _, err = cm.sendSpawnRequest(ctx, node.InternalIP, map[string]interface{}{ + "action": "save-cluster-state", + "namespace": cluster.NamespaceName, + "node_id": node.NodeID, + "cluster_state": json.RawMessage(data), + }) + if err != nil { + cm.logger.Warn("Failed to send cluster state to remote node", + zap.String("node", node.NodeID), + zap.String("ip", node.InternalIP), + zap.Error(err)) + } + } + } +} + // saveLocalState writes cluster state to disk for fast recovery without DB queries. func (cm *ClusterManager) saveLocalState(state *ClusterLocalState) error { dir := filepath.Join(cm.baseDataDir, state.NamespaceName) diff --git a/pkg/namespace/systemd_spawner.go b/pkg/namespace/systemd_spawner.go index 570d355..6392cb4 100644 --- a/pkg/namespace/systemd_spawner.go +++ b/pkg/namespace/systemd_spawner.go @@ -83,6 +83,23 @@ func (s *SystemdSpawner) SpawnOlric(ctx context.Context, namespace, nodeID strin zap.String("namespace", namespace), zap.String("node_id", nodeID)) + // Validate BindAddr: 0.0.0.0 or empty causes IPv6 resolution on dual-stack hosts, + // breaking memberlist UDP gossip over WireGuard. Resolve from wg0 as fallback. + if cfg.BindAddr == "" || cfg.BindAddr == "0.0.0.0" { + wgIP, err := getWireGuardIP() + if err != nil { + return fmt.Errorf("Olric BindAddr is %q and failed to detect WireGuard IP: %w", cfg.BindAddr, err) + } + s.logger.Warn("Olric BindAddr was invalid, resolved from wg0", + zap.String("original", cfg.BindAddr), + zap.String("resolved", wgIP), + zap.String("namespace", namespace)) + cfg.BindAddr = wgIP + if cfg.AdvertiseAddr == "" || cfg.AdvertiseAddr == "0.0.0.0" { + cfg.AdvertiseAddr = wgIP + } + } + // Create config directory configDir := filepath.Join(s.namespaceBase, namespace, "configs") if err := os.MkdirAll(configDir, 0755); err != nil { @@ -272,6 +289,23 @@ func (s *SystemdSpawner) StopGateway(ctx context.Context, namespace, nodeID stri return s.systemdMgr.StopService(namespace, systemd.ServiceTypeGateway) } +// SaveClusterState writes cluster state JSON to the namespace data directory. +// Used by the spawn handler to persist state received from the coordinator node. +func (s *SystemdSpawner) SaveClusterState(namespace string, data []byte) error { + dir := filepath.Join(s.namespaceBase, namespace) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create namespace dir: %w", err) + } + path := filepath.Join(dir, "cluster-state.json") + if err := os.WriteFile(path, data, 0644); err != nil { + return fmt.Errorf("failed to write cluster state: %w", err) + } + s.logger.Info("Saved cluster state from coordinator", + zap.String("namespace", namespace), + zap.String("path", path)) + return nil +} + // StopAll stops all services for a namespace func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error { s.logger.Info("Stopping all namespace services via systemd", diff --git a/pkg/namespace/wireguard.go b/pkg/namespace/wireguard.go new file mode 100644 index 0000000..38add16 --- /dev/null +++ b/pkg/namespace/wireguard.go @@ -0,0 +1,25 @@ +package namespace + +import ( + "fmt" + "net" +) + +// getWireGuardIP returns the IPv4 address of the wg0 interface. +// Used as a fallback when Olric BindAddr is empty or 0.0.0.0. +func getWireGuardIP() (string, error) { + iface, err := net.InterfaceByName("wg0") + if err != nil { + return "", fmt.Errorf("wg0 interface not found: %w", err) + } + addrs, err := iface.Addrs() + if err != nil { + return "", fmt.Errorf("failed to get wg0 addresses: %w", err) + } + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil { + return ipnet.IP.String(), nil + } + } + return "", fmt.Errorf("no IPv4 address on wg0") +}