mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 09:36:56 +00:00
Fixed bug on spawn handler and cluster manager
This commit is contained in:
parent
f7db698273
commit
91ac56c50a
2
Makefile
2
Makefile
@ -86,7 +86,7 @@ test-e2e-quick:
|
|||||||
|
|
||||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
||||||
|
|
||||||
VERSION := 0.101.5
|
VERSION := 0.101.6
|
||||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||||
|
|||||||
@ -39,11 +39,16 @@ type SpawnRequest struct {
|
|||||||
GatewayHTTPPort int `json:"gateway_http_port,omitempty"`
|
GatewayHTTPPort int `json:"gateway_http_port,omitempty"`
|
||||||
GatewayBaseDomain string `json:"gateway_base_domain,omitempty"`
|
GatewayBaseDomain string `json:"gateway_base_domain,omitempty"`
|
||||||
GatewayRQLiteDSN string `json:"gateway_rqlite_dsn,omitempty"`
|
GatewayRQLiteDSN string `json:"gateway_rqlite_dsn,omitempty"`
|
||||||
|
GatewayGlobalRQLiteDSN string `json:"gateway_global_rqlite_dsn,omitempty"`
|
||||||
GatewayOlricServers []string `json:"gateway_olric_servers,omitempty"`
|
GatewayOlricServers []string `json:"gateway_olric_servers,omitempty"`
|
||||||
|
GatewayOlricTimeout string `json:"gateway_olric_timeout,omitempty"`
|
||||||
IPFSClusterAPIURL string `json:"ipfs_cluster_api_url,omitempty"`
|
IPFSClusterAPIURL string `json:"ipfs_cluster_api_url,omitempty"`
|
||||||
IPFSAPIURL string `json:"ipfs_api_url,omitempty"`
|
IPFSAPIURL string `json:"ipfs_api_url,omitempty"`
|
||||||
IPFSTimeout string `json:"ipfs_timeout,omitempty"`
|
IPFSTimeout string `json:"ipfs_timeout,omitempty"`
|
||||||
IPFSReplicationFactor int `json:"ipfs_replication_factor,omitempty"`
|
IPFSReplicationFactor int `json:"ipfs_replication_factor,omitempty"`
|
||||||
|
|
||||||
|
// Cluster state (when action = "save-cluster-state")
|
||||||
|
ClusterState json.RawMessage `json:"cluster_state,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SpawnResponse represents the response from a spawn/stop request
|
// SpawnResponse represents the response from a spawn/stop request
|
||||||
@ -122,6 +127,13 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
||||||
|
|
||||||
case "spawn-olric":
|
case "spawn-olric":
|
||||||
|
// Reject empty or 0.0.0.0 BindAddr early — these cause IPv6 resolution on dual-stack hosts
|
||||||
|
if req.OlricBindAddr == "" || req.OlricBindAddr == "0.0.0.0" {
|
||||||
|
writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{
|
||||||
|
Error: fmt.Sprintf("olric_bind_addr must be a valid IP, got %q", req.OlricBindAddr),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
cfg := olric.InstanceConfig{
|
cfg := olric.InstanceConfig{
|
||||||
Namespace: req.Namespace,
|
Namespace: req.Namespace,
|
||||||
NodeID: req.NodeID,
|
NodeID: req.NodeID,
|
||||||
@ -166,13 +178,28 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parse Olric timeout if provided
|
||||||
|
var olricTimeout time.Duration
|
||||||
|
if req.GatewayOlricTimeout != "" {
|
||||||
|
var err error
|
||||||
|
olricTimeout, err = time.ParseDuration(req.GatewayOlricTimeout)
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Warn("Invalid Olric timeout, using default", zap.String("timeout", req.GatewayOlricTimeout), zap.Error(err))
|
||||||
|
olricTimeout = 30 * time.Second
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
olricTimeout = 30 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
cfg := gateway.InstanceConfig{
|
cfg := gateway.InstanceConfig{
|
||||||
Namespace: req.Namespace,
|
Namespace: req.Namespace,
|
||||||
NodeID: req.NodeID,
|
NodeID: req.NodeID,
|
||||||
HTTPPort: req.GatewayHTTPPort,
|
HTTPPort: req.GatewayHTTPPort,
|
||||||
BaseDomain: req.GatewayBaseDomain,
|
BaseDomain: req.GatewayBaseDomain,
|
||||||
RQLiteDSN: req.GatewayRQLiteDSN,
|
RQLiteDSN: req.GatewayRQLiteDSN,
|
||||||
|
GlobalRQLiteDSN: req.GatewayGlobalRQLiteDSN,
|
||||||
OlricServers: req.GatewayOlricServers,
|
OlricServers: req.GatewayOlricServers,
|
||||||
|
OlricTimeout: olricTimeout,
|
||||||
IPFSClusterAPIURL: req.IPFSClusterAPIURL,
|
IPFSClusterAPIURL: req.IPFSClusterAPIURL,
|
||||||
IPFSAPIURL: req.IPFSAPIURL,
|
IPFSAPIURL: req.IPFSAPIURL,
|
||||||
IPFSTimeout: ipfsTimeout,
|
IPFSTimeout: ipfsTimeout,
|
||||||
@ -193,6 +220,18 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
||||||
|
|
||||||
|
case "save-cluster-state":
|
||||||
|
if len(req.ClusterState) == 0 {
|
||||||
|
writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: "cluster_state is required"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := h.systemdSpawner.SaveClusterState(req.Namespace, req.ClusterState); err != nil {
|
||||||
|
h.logger.Error("Failed to save cluster state", zap.Error(err))
|
||||||
|
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
||||||
|
|
||||||
default:
|
default:
|
||||||
writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: fmt.Sprintf("unknown action: %s", req.Action)})
|
writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: fmt.Sprintf("unknown action: %s", req.Action)})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -309,6 +309,9 @@ func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int,
|
|||||||
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "")
|
cm.updateClusterStatus(ctx, cluster.ID, ClusterStatusReady, "")
|
||||||
cm.logEvent(ctx, cluster.ID, EventClusterReady, "", "Cluster is ready", nil)
|
cm.logEvent(ctx, cluster.ID, EventClusterReady, "", "Cluster is ready", nil)
|
||||||
|
|
||||||
|
// Save cluster-state.json on all nodes (local + remote) for disk-based restore on restart
|
||||||
|
cm.saveClusterStateToAllNodes(ctx, cluster, nodes, portBlocks)
|
||||||
|
|
||||||
cm.logger.Info("Cluster provisioning completed",
|
cm.logger.Info("Cluster provisioning completed",
|
||||||
zap.String("cluster_id", cluster.ID),
|
zap.String("cluster_id", cluster.ID),
|
||||||
zap.String("namespace", namespaceName),
|
zap.String("namespace", namespaceName),
|
||||||
@ -630,6 +633,11 @@ func (cm *ClusterManager) spawnGatewayRemote(ctx context.Context, nodeIP string,
|
|||||||
ipfsTimeout = cfg.IPFSTimeout.String()
|
ipfsTimeout = cfg.IPFSTimeout.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
olricTimeout := ""
|
||||||
|
if cfg.OlricTimeout > 0 {
|
||||||
|
olricTimeout = cfg.OlricTimeout.String()
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
|
resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
|
||||||
"action": "spawn-gateway",
|
"action": "spawn-gateway",
|
||||||
"namespace": cfg.Namespace,
|
"namespace": cfg.Namespace,
|
||||||
@ -637,7 +645,9 @@ func (cm *ClusterManager) spawnGatewayRemote(ctx context.Context, nodeIP string,
|
|||||||
"gateway_http_port": cfg.HTTPPort,
|
"gateway_http_port": cfg.HTTPPort,
|
||||||
"gateway_base_domain": cfg.BaseDomain,
|
"gateway_base_domain": cfg.BaseDomain,
|
||||||
"gateway_rqlite_dsn": cfg.RQLiteDSN,
|
"gateway_rqlite_dsn": cfg.RQLiteDSN,
|
||||||
|
"gateway_global_rqlite_dsn": cfg.GlobalRQLiteDSN,
|
||||||
"gateway_olric_servers": cfg.OlricServers,
|
"gateway_olric_servers": cfg.OlricServers,
|
||||||
|
"gateway_olric_timeout": olricTimeout,
|
||||||
"ipfs_cluster_api_url": cfg.IPFSClusterAPIURL,
|
"ipfs_cluster_api_url": cfg.IPFSClusterAPIURL,
|
||||||
"ipfs_api_url": cfg.IPFSAPIURL,
|
"ipfs_api_url": cfg.IPFSAPIURL,
|
||||||
"ipfs_timeout": ipfsTimeout,
|
"ipfs_timeout": ipfsTimeout,
|
||||||
@ -1566,6 +1576,69 @@ type ClusterLocalStateNode struct {
|
|||||||
OlricMemberlistPort int `json:"olric_memberlist_port"`
|
OlricMemberlistPort int `json:"olric_memberlist_port"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// saveClusterStateToAllNodes builds and saves cluster-state.json on every node in the cluster.
|
||||||
|
// Each node gets its own state file with node-specific LocalNodeID, LocalIP, and LocalPorts.
|
||||||
|
func (cm *ClusterManager) saveClusterStateToAllNodes(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) {
|
||||||
|
// Build the shared AllNodes list
|
||||||
|
var allNodes []ClusterLocalStateNode
|
||||||
|
for i, node := range nodes {
|
||||||
|
allNodes = append(allNodes, ClusterLocalStateNode{
|
||||||
|
NodeID: node.NodeID,
|
||||||
|
InternalIP: node.InternalIP,
|
||||||
|
RQLiteHTTPPort: portBlocks[i].RQLiteHTTPPort,
|
||||||
|
RQLiteRaftPort: portBlocks[i].RQLiteRaftPort,
|
||||||
|
OlricHTTPPort: portBlocks[i].OlricHTTPPort,
|
||||||
|
OlricMemberlistPort: portBlocks[i].OlricMemberlistPort,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, node := range nodes {
|
||||||
|
state := &ClusterLocalState{
|
||||||
|
ClusterID: cluster.ID,
|
||||||
|
NamespaceName: cluster.NamespaceName,
|
||||||
|
LocalNodeID: node.NodeID,
|
||||||
|
LocalIP: node.InternalIP,
|
||||||
|
LocalPorts: ClusterLocalStatePorts{
|
||||||
|
RQLiteHTTPPort: portBlocks[i].RQLiteHTTPPort,
|
||||||
|
RQLiteRaftPort: portBlocks[i].RQLiteRaftPort,
|
||||||
|
OlricHTTPPort: portBlocks[i].OlricHTTPPort,
|
||||||
|
OlricMemberlistPort: portBlocks[i].OlricMemberlistPort,
|
||||||
|
GatewayHTTPPort: portBlocks[i].GatewayHTTPPort,
|
||||||
|
},
|
||||||
|
AllNodes: allNodes,
|
||||||
|
HasGateway: true,
|
||||||
|
BaseDomain: cm.baseDomain,
|
||||||
|
SavedAt: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if node.NodeID == cm.localNodeID {
|
||||||
|
// Save locally
|
||||||
|
if err := cm.saveLocalState(state); err != nil {
|
||||||
|
cm.logger.Warn("Failed to save local cluster state", zap.String("namespace", cluster.NamespaceName), zap.Error(err))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Send to remote node
|
||||||
|
data, err := json.MarshalIndent(state, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
cm.logger.Warn("Failed to marshal cluster state for remote node", zap.String("node", node.NodeID), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
_, err = cm.sendSpawnRequest(ctx, node.InternalIP, map[string]interface{}{
|
||||||
|
"action": "save-cluster-state",
|
||||||
|
"namespace": cluster.NamespaceName,
|
||||||
|
"node_id": node.NodeID,
|
||||||
|
"cluster_state": json.RawMessage(data),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
cm.logger.Warn("Failed to send cluster state to remote node",
|
||||||
|
zap.String("node", node.NodeID),
|
||||||
|
zap.String("ip", node.InternalIP),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// saveLocalState writes cluster state to disk for fast recovery without DB queries.
|
// saveLocalState writes cluster state to disk for fast recovery without DB queries.
|
||||||
func (cm *ClusterManager) saveLocalState(state *ClusterLocalState) error {
|
func (cm *ClusterManager) saveLocalState(state *ClusterLocalState) error {
|
||||||
dir := filepath.Join(cm.baseDataDir, state.NamespaceName)
|
dir := filepath.Join(cm.baseDataDir, state.NamespaceName)
|
||||||
|
|||||||
@ -83,6 +83,23 @@ func (s *SystemdSpawner) SpawnOlric(ctx context.Context, namespace, nodeID strin
|
|||||||
zap.String("namespace", namespace),
|
zap.String("namespace", namespace),
|
||||||
zap.String("node_id", nodeID))
|
zap.String("node_id", nodeID))
|
||||||
|
|
||||||
|
// Validate BindAddr: 0.0.0.0 or empty causes IPv6 resolution on dual-stack hosts,
|
||||||
|
// breaking memberlist UDP gossip over WireGuard. Resolve from wg0 as fallback.
|
||||||
|
if cfg.BindAddr == "" || cfg.BindAddr == "0.0.0.0" {
|
||||||
|
wgIP, err := getWireGuardIP()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Olric BindAddr is %q and failed to detect WireGuard IP: %w", cfg.BindAddr, err)
|
||||||
|
}
|
||||||
|
s.logger.Warn("Olric BindAddr was invalid, resolved from wg0",
|
||||||
|
zap.String("original", cfg.BindAddr),
|
||||||
|
zap.String("resolved", wgIP),
|
||||||
|
zap.String("namespace", namespace))
|
||||||
|
cfg.BindAddr = wgIP
|
||||||
|
if cfg.AdvertiseAddr == "" || cfg.AdvertiseAddr == "0.0.0.0" {
|
||||||
|
cfg.AdvertiseAddr = wgIP
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create config directory
|
// Create config directory
|
||||||
configDir := filepath.Join(s.namespaceBase, namespace, "configs")
|
configDir := filepath.Join(s.namespaceBase, namespace, "configs")
|
||||||
if err := os.MkdirAll(configDir, 0755); err != nil {
|
if err := os.MkdirAll(configDir, 0755); err != nil {
|
||||||
@ -272,6 +289,23 @@ func (s *SystemdSpawner) StopGateway(ctx context.Context, namespace, nodeID stri
|
|||||||
return s.systemdMgr.StopService(namespace, systemd.ServiceTypeGateway)
|
return s.systemdMgr.StopService(namespace, systemd.ServiceTypeGateway)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SaveClusterState writes cluster state JSON to the namespace data directory.
|
||||||
|
// Used by the spawn handler to persist state received from the coordinator node.
|
||||||
|
func (s *SystemdSpawner) SaveClusterState(namespace string, data []byte) error {
|
||||||
|
dir := filepath.Join(s.namespaceBase, namespace)
|
||||||
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create namespace dir: %w", err)
|
||||||
|
}
|
||||||
|
path := filepath.Join(dir, "cluster-state.json")
|
||||||
|
if err := os.WriteFile(path, data, 0644); err != nil {
|
||||||
|
return fmt.Errorf("failed to write cluster state: %w", err)
|
||||||
|
}
|
||||||
|
s.logger.Info("Saved cluster state from coordinator",
|
||||||
|
zap.String("namespace", namespace),
|
||||||
|
zap.String("path", path))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// StopAll stops all services for a namespace
|
// StopAll stops all services for a namespace
|
||||||
func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error {
|
func (s *SystemdSpawner) StopAll(ctx context.Context, namespace string) error {
|
||||||
s.logger.Info("Stopping all namespace services via systemd",
|
s.logger.Info("Stopping all namespace services via systemd",
|
||||||
|
|||||||
25
pkg/namespace/wireguard.go
Normal file
25
pkg/namespace/wireguard.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package namespace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
)
|
||||||
|
|
||||||
|
// getWireGuardIP returns the IPv4 address of the wg0 interface.
|
||||||
|
// Used as a fallback when Olric BindAddr is empty or 0.0.0.0.
|
||||||
|
func getWireGuardIP() (string, error) {
|
||||||
|
iface, err := net.InterfaceByName("wg0")
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("wg0 interface not found: %w", err)
|
||||||
|
}
|
||||||
|
addrs, err := iface.Addrs()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to get wg0 addresses: %w", err)
|
||||||
|
}
|
||||||
|
for _, addr := range addrs {
|
||||||
|
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
|
||||||
|
return ipnet.IP.String(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("no IPv4 address on wg0")
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user