From 0b5b6e68e3ed38102a85ea254d6fff41896989e4 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Mon, 16 Feb 2026 16:35:29 +0200 Subject: [PATCH] A ton of updates on the monitoring mostly bug fixes --- docs/CLEAN_NODE.md | 6 + pkg/cli/monitor/alerts.go | 155 +++++++++++++++++--- pkg/cli/production/install/remote.go | 2 +- pkg/cli/production/lifecycle/restart.go | 5 +- pkg/cli/production/lifecycle/stop.go | 14 +- pkg/cli/production/report/ipfs.go | 22 ++- pkg/cli/production/report/processes.go | 2 +- pkg/cli/production/report/services.go | 1 - pkg/cli/production/status/command.go | 5 +- pkg/cli/production/uninstall/command.go | 4 +- pkg/cli/utils/systemd.go | 7 +- pkg/environments/production/orchestrator.go | 13 +- pkg/gateway/handlers/join/handler.go | 28 ++-- pkg/inspector/collector.go | 2 +- pkg/node/wireguard_sync.go | 10 ++ pkg/rqlite/process.go | 10 +- pkg/rqlite/rqlite.go | 21 +-- scripts/clean-testnet.sh | 12 ++ 18 files changed, 246 insertions(+), 73 deletions(-) diff --git a/docs/CLEAN_NODE.md b/docs/CLEAN_NODE.md index d61f518..8414394 100644 --- a/docs/CLEAN_NODE.md +++ b/docs/CLEAN_NODE.md @@ -11,6 +11,12 @@ Run this as root or with sudo on the target VPS: sudo systemctl stop orama-node orama-ipfs orama-ipfs-cluster orama-olric orama-anyone-relay orama-anyone-client coredns caddy 2>/dev/null sudo systemctl disable orama-node orama-ipfs orama-ipfs-cluster orama-olric orama-anyone-relay orama-anyone-client coredns caddy 2>/dev/null +# 1b. Kill leftover processes (binaries may run outside systemd) +sudo pkill -f orama-node 2>/dev/null; sudo pkill -f ipfs-cluster-service 2>/dev/null +sudo pkill -f "ipfs daemon" 2>/dev/null; sudo pkill -f olric-server 2>/dev/null +sudo pkill -f rqlited 2>/dev/null; sudo pkill -f coredns 2>/dev/null +sleep 1 + # 2. Remove systemd service files sudo rm -f /etc/systemd/system/orama-*.service sudo rm -f /etc/systemd/system/coredns.service diff --git a/pkg/cli/monitor/alerts.go b/pkg/cli/monitor/alerts.go index 3f81e71..49e1437 100644 --- a/pkg/cli/monitor/alerts.go +++ b/pkg/cli/monitor/alerts.go @@ -24,6 +24,55 @@ type Alert struct { Message string `json:"message"` } +// joiningGraceSec is the grace period (in seconds) after a node starts during +// which unreachability alerts from other nodes are downgraded to info. +const joiningGraceSec = 300 + +// nodeContext carries per-node metadata needed for context-aware alerting. +type nodeContext struct { + host string + role string // "node", "nameserver-ns1", etc. + isNameserver bool + isJoining bool // orama-node active_since_sec < joiningGraceSec + uptimeSec int // orama-node active_since_sec +} + +// buildNodeContexts builds a map of WG IP -> nodeContext for all healthy nodes. +func buildNodeContexts(snap *ClusterSnapshot) map[string]*nodeContext { + ctxMap := make(map[string]*nodeContext) + for _, cs := range snap.Nodes { + if cs.Report == nil { + continue + } + r := cs.Report + host := nodeHost(r) + + nc := &nodeContext{ + host: host, + role: cs.Node.Role, + isNameserver: strings.HasPrefix(cs.Node.Role, "nameserver"), + } + + // Determine uptime from orama-node service + if r.Services != nil { + for _, svc := range r.Services.Services { + if svc.Name == "orama-node" && svc.ActiveState == "active" { + nc.uptimeSec = int(svc.ActiveSinceSec) + nc.isJoining = svc.ActiveSinceSec < joiningGraceSec + break + } + } + } + + ctxMap[host] = nc + // Also index by WG IP for cross-node RQLite unreachability lookups + if r.WireGuard != nil && r.WireGuard.WgIP != "" { + ctxMap[r.WireGuard.WgIP] = nc + } + } + return ctxMap +} + // DeriveAlerts scans a ClusterSnapshot and produces alerts. func DeriveAlerts(snap *ClusterSnapshot) []Alert { var alerts []Alert @@ -45,6 +94,9 @@ func DeriveAlerts(snap *ClusterSnapshot) []Alert { return alerts } + // Build context map for role/uptime-aware alerting + nodeCtxMap := buildNodeContexts(snap) + // Cross-node checks alerts = append(alerts, checkRQLiteLeader(reports)...) alerts = append(alerts, checkRQLiteQuorum(reports)...) @@ -60,11 +112,12 @@ func DeriveAlerts(snap *ClusterSnapshot) []Alert { // Per-node checks for _, r := range reports { host := nodeHost(r) - alerts = append(alerts, checkNodeRQLite(r, host)...) + nc := nodeCtxMap[host] + alerts = append(alerts, checkNodeRQLite(r, host, nodeCtxMap)...) alerts = append(alerts, checkNodeWireGuard(r, host)...) alerts = append(alerts, checkNodeSystem(r, host)...) - alerts = append(alerts, checkNodeServices(r, host)...) - alerts = append(alerts, checkNodeDNS(r, host)...) + alerts = append(alerts, checkNodeServices(r, host, nc)...) + alerts = append(alerts, checkNodeDNS(r, host, nc)...) alerts = append(alerts, checkNodeAnyone(r, host)...) alerts = append(alerts, checkNodeProcesses(r, host)...) alerts = append(alerts, checkNodeNamespaces(r, host)...) @@ -377,7 +430,7 @@ func checkIPFSClusterConsistency(reports []*report.NodeReport) []Alert { // Per-node checks // --------------------------------------------------------------------------- -func checkNodeRQLite(r *report.NodeReport, host string) []Alert { +func checkNodeRQLite(r *report.NodeReport, host string, nodeCtxMap map[string]*nodeContext) []Alert { if r.RQLite == nil { return nil } @@ -428,11 +481,21 @@ func checkNodeRQLite(r *report.NodeReport, host string) []Alert { fmt.Sprintf("RQLite heap memory high: %dMB", r.RQLite.HeapMB)}) } - // Cluster partition detection: check if this node reports other nodes as unreachable + // Cluster partition detection: check if this node reports other nodes as unreachable. + // If the unreachable node recently joined (< 5 min), downgrade to info — probes + // may not have succeeded yet and this is expected transient behavior. for nodeAddr, info := range r.RQLite.Nodes { if !info.Reachable { - alerts = append(alerts, Alert{AlertCritical, "rqlite", host, - fmt.Sprintf("RQLite reports node %s unreachable (cluster partition)", nodeAddr)}) + // nodeAddr is like "10.0.0.4:7001" — extract the IP to look up context + targetIP := strings.Split(nodeAddr, ":")[0] + if targetCtx, ok := nodeCtxMap[targetIP]; ok && targetCtx.isJoining { + alerts = append(alerts, Alert{AlertInfo, "rqlite", host, + fmt.Sprintf("Node %s recently joined (%ds ago), probe pending for %s", + targetCtx.host, targetCtx.uptimeSec, nodeAddr)}) + } else { + alerts = append(alerts, Alert{AlertCritical, "rqlite", host, + fmt.Sprintf("RQLite reports node %s unreachable (cluster partition)", nodeAddr)}) + } } } @@ -521,12 +584,17 @@ func checkNodeSystem(r *report.NodeReport, host string) []Alert { return alerts } -func checkNodeServices(r *report.NodeReport, host string) []Alert { +func checkNodeServices(r *report.NodeReport, host string, nc *nodeContext) []Alert { if r.Services == nil { return nil } var alerts []Alert for _, svc := range r.Services.Services { + // Skip services that are expected to be inactive based on node role/mode + if shouldSkipServiceAlert(svc.Name, svc.ActiveState, r, nc) { + continue + } + if svc.ActiveState == "failed" { alerts = append(alerts, Alert{AlertCritical, "service", host, fmt.Sprintf("Service %s is FAILED", svc.Name)}) @@ -546,30 +614,72 @@ func checkNodeServices(r *report.NodeReport, host string) []Alert { return alerts } -func checkNodeDNS(r *report.NodeReport, host string) []Alert { +// shouldSkipServiceAlert returns true if this service being inactive is expected +// given the node's role and anyone mode. +func shouldSkipServiceAlert(svcName, state string, r *report.NodeReport, nc *nodeContext) bool { + if state == "active" || state == "failed" { + return false // always report active (no alert) and failed (always alert) + } + + // CoreDNS: only expected on nameserver nodes + if svcName == "coredns" && (nc == nil || !nc.isNameserver) { + return true + } + + // Anyone services: only alert for the mode the node is configured for + if r.Anyone != nil { + mode := r.Anyone.Mode + if svcName == "orama-anyone-client" && mode == "relay" { + return true // relay node doesn't run client + } + if svcName == "orama-anyone-relay" && mode == "client" { + return true // client node doesn't run relay + } + } + // If anyone section is nil (no anyone configured), skip both anyone services + if r.Anyone == nil && (svcName == "orama-anyone-client" || svcName == "orama-anyone-relay") { + return true + } + + return false +} + +func checkNodeDNS(r *report.NodeReport, host string, nc *nodeContext) []Alert { if r.DNS == nil { return nil } + + isNameserver := nc != nil && nc.isNameserver + var alerts []Alert - if !r.DNS.CoreDNSActive { + + // CoreDNS: only check on nameserver nodes + if isNameserver && !r.DNS.CoreDNSActive { alerts = append(alerts, Alert{AlertCritical, "dns", host, "CoreDNS is down"}) } + + // Caddy: check on all nodes (any node can host namespaces) if !r.DNS.CaddyActive { alerts = append(alerts, Alert{AlertCritical, "dns", host, "Caddy is down"}) } - if r.DNS.BaseTLSDaysLeft >= 0 && r.DNS.BaseTLSDaysLeft < 14 { - alerts = append(alerts, Alert{AlertWarning, "dns", host, - fmt.Sprintf("Base TLS cert expires in %d days", r.DNS.BaseTLSDaysLeft)}) + + // TLS cert expiry: only meaningful on nameserver nodes that have public domains + if isNameserver { + if r.DNS.BaseTLSDaysLeft >= 0 && r.DNS.BaseTLSDaysLeft < 14 { + alerts = append(alerts, Alert{AlertWarning, "dns", host, + fmt.Sprintf("Base TLS cert expires in %d days", r.DNS.BaseTLSDaysLeft)}) + } + if r.DNS.WildTLSDaysLeft >= 0 && r.DNS.WildTLSDaysLeft < 14 { + alerts = append(alerts, Alert{AlertWarning, "dns", host, + fmt.Sprintf("Wildcard TLS cert expires in %d days", r.DNS.WildTLSDaysLeft)}) + } } - if r.DNS.WildTLSDaysLeft >= 0 && r.DNS.WildTLSDaysLeft < 14 { - alerts = append(alerts, Alert{AlertWarning, "dns", host, - fmt.Sprintf("Wildcard TLS cert expires in %d days", r.DNS.WildTLSDaysLeft)}) - } - if r.DNS.CoreDNSActive && !r.DNS.SOAResolves { - alerts = append(alerts, Alert{AlertWarning, "dns", host, "SOA record not resolving"}) - } - // Additional DNS checks (only when CoreDNS is running) - if r.DNS.CoreDNSActive { + + // DNS resolution checks: only on nameserver nodes with CoreDNS running + if isNameserver && r.DNS.CoreDNSActive { + if !r.DNS.SOAResolves { + alerts = append(alerts, Alert{AlertWarning, "dns", host, "SOA record not resolving"}) + } if !r.DNS.WildcardResolves { alerts = append(alerts, Alert{AlertWarning, "dns", host, "Wildcard DNS not resolving"}) } @@ -583,6 +693,7 @@ func checkNodeDNS(r *report.NodeReport, host string) []Alert { alerts = append(alerts, Alert{AlertCritical, "dns", host, "CoreDNS active but port 53 not bound"}) } } + if r.DNS.CaddyActive && !r.DNS.Port443Bound { alerts = append(alerts, Alert{AlertCritical, "dns", host, "Caddy active but port 443 not bound"}) } diff --git a/pkg/cli/production/install/remote.go b/pkg/cli/production/install/remote.go index 44b3aaf..b4ca02b 100644 --- a/pkg/cli/production/install/remote.go +++ b/pkg/cli/production/install/remote.go @@ -62,7 +62,7 @@ func (r *RemoteOrchestrator) buildRemoteCommand() string { if r.node.User != "root" { args = append(args, "sudo") } - args = append(args, "orama", "install") + args = append(args, "orama", "node", "install") args = append(args, "--vps-ip", r.flags.VpsIP) diff --git a/pkg/cli/production/lifecycle/restart.go b/pkg/cli/production/lifecycle/restart.go index b92cef2..3560b1c 100644 --- a/pkg/cli/production/lifecycle/restart.go +++ b/pkg/cli/production/lifecycle/restart.go @@ -47,14 +47,13 @@ func HandleRestartWithFlags(force bool) { fmt.Printf("\n Stopping namespace services...\n") stopAllNamespaceServices() - // Ordered stop: gateway first, then node (RQLite), then supporting services + // Ordered stop: node first (includes embedded gateway + RQLite), then supporting services fmt.Printf("\n Stopping services (ordered)...\n") shutdownOrder := [][]string{ - {"orama-gateway"}, {"orama-node"}, {"orama-olric"}, {"orama-ipfs-cluster", "orama-ipfs"}, - {"orama-anyone-relay", "anyone-client"}, + {"orama-anyone-relay", "orama-anyone-client"}, {"coredns", "caddy"}, } diff --git a/pkg/cli/production/lifecycle/stop.go b/pkg/cli/production/lifecycle/stop.go index 2dbb599..9db0264 100644 --- a/pkg/cli/production/lifecycle/stop.go +++ b/pkg/cli/production/lifecycle/stop.go @@ -50,15 +50,13 @@ func HandleStopWithFlags(force bool) { fmt.Printf("\n Stopping main services (ordered)...\n") - // Ordered shutdown: gateway first, then node (RQLite), then supporting services - // This ensures we stop accepting requests before shutting down the database + // Ordered shutdown: node first (includes embedded gateway + RQLite), then supporting services shutdownOrder := [][]string{ - {"orama-gateway"}, // 1. Stop accepting new requests - {"orama-node"}, // 2. Stop node (includes RQLite with leadership transfer) - {"orama-olric"}, // 3. Stop cache - {"orama-ipfs-cluster", "orama-ipfs"}, // 4. Stop storage - {"orama-anyone-relay", "anyone-client"}, // 5. Stop privacy relay - {"coredns", "caddy"}, // 6. Stop DNS/TLS last + {"orama-node"}, // 1. Stop node (includes gateway + RQLite with leadership transfer) + {"orama-olric"}, // 2. Stop cache + {"orama-ipfs-cluster", "orama-ipfs"}, // 3. Stop storage + {"orama-anyone-relay", "orama-anyone-client"}, // 4. Stop privacy relay + {"coredns", "caddy"}, // 5. Stop DNS/TLS last } // First, disable all services to prevent auto-restart diff --git a/pkg/cli/production/report/ipfs.go b/pkg/cli/production/report/ipfs.go index 8ffed76..35070ea 100644 --- a/pkg/cli/production/report/ipfs.go +++ b/pkg/cli/production/report/ipfs.go @@ -46,9 +46,12 @@ func collectIPFS() *IPFSReport { } } - // 4. ClusterPeerCount: GET /peers + // 4. ClusterPeerCount: GET /peers (with fallback to /id) + // The /peers endpoint does a synchronous round-trip to ALL cluster peers, + // so it can be slow if some peers are unreachable (ghost WG entries, etc.). + // Use a generous timeout and fall back to /id if /peers times out. { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if body, err := httpGet(ctx, "http://localhost:9094/peers"); err == nil { var peers []interface{} @@ -57,6 +60,21 @@ func collectIPFS() *IPFSReport { } } } + // Fallback: if /peers returned 0 (timeout or error), try /id which returns + // cached cluster_peers instantly without contacting other nodes. + if r.ClusterPeerCount == 0 { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if body, err := httpGet(ctx, "http://localhost:9094/id"); err == nil { + var resp struct { + ClusterPeers []string `json:"cluster_peers"` + } + if err := json.Unmarshal(body, &resp); err == nil && len(resp.ClusterPeers) > 0 { + // cluster_peers includes self, so count is len(cluster_peers) + r.ClusterPeerCount = len(resp.ClusterPeers) + } + } + } // 5. RepoSizeBytes/RepoMaxBytes: POST /api/v0/repo/stat { diff --git a/pkg/cli/production/report/processes.go b/pkg/cli/production/report/processes.go index b1d21d9..1cd19d6 100644 --- a/pkg/cli/production/report/processes.go +++ b/pkg/cli/production/report/processes.go @@ -84,7 +84,7 @@ func collectProcesses() *ProcessReport { // managedServiceUnits lists systemd units whose MainPID should be excluded from orphan detection. var managedServiceUnits = []string{ - "orama-node", "orama-gateway", "orama-olric", + "orama-node", "orama-olric", "orama-ipfs", "orama-ipfs-cluster", "orama-anyone-relay", "orama-anyone-client", "coredns", "caddy", "rqlited", diff --git a/pkg/cli/production/report/services.go b/pkg/cli/production/report/services.go index e276d18..f8b48d7 100644 --- a/pkg/cli/production/report/services.go +++ b/pkg/cli/production/report/services.go @@ -10,7 +10,6 @@ import ( var coreServices = []string{ "orama-node", - "orama-gateway", "orama-olric", "orama-ipfs", "orama-ipfs-cluster", diff --git a/pkg/cli/production/status/command.go b/pkg/cli/production/status/command.go index c5e94f1..4120693 100644 --- a/pkg/cli/production/status/command.go +++ b/pkg/cli/production/status/command.go @@ -18,7 +18,7 @@ func Handle() { // Note: RQLite is managed by node process, not as separate service "orama-olric", "orama-node", - "orama-gateway", + // Note: gateway is embedded in orama-node, no separate service } // Friendly descriptions @@ -26,8 +26,7 @@ func Handle() { "orama-ipfs": "IPFS Daemon", "orama-ipfs-cluster": "IPFS Cluster", "orama-olric": "Olric Cache Server", - "orama-node": "Orama Node (includes RQLite)", - "orama-gateway": "Orama Gateway", + "orama-node": "Orama Node (includes RQLite + Gateway)", } fmt.Printf("Services:\n") diff --git a/pkg/cli/production/uninstall/command.go b/pkg/cli/production/uninstall/command.go index f654a78..7991ad2 100644 --- a/pkg/cli/production/uninstall/command.go +++ b/pkg/cli/production/uninstall/command.go @@ -33,9 +33,9 @@ func Handle() { fmt.Printf("Stopping namespace services...\n") stopNamespaceServices() - // All global services (was missing: orama-anyone-relay, coredns, caddy) + // All global services (orama-gateway is legacy — now embedded in orama-node) services := []string{ - "orama-gateway", + "orama-gateway", // Legacy: kept for cleanup of old installs "orama-node", "orama-olric", "orama-ipfs-cluster", diff --git a/pkg/cli/utils/systemd.go b/pkg/cli/utils/systemd.go index 1799a69..354b9d4 100644 --- a/pkg/cli/utils/systemd.go +++ b/pkg/cli/utils/systemd.go @@ -23,14 +23,12 @@ type PortSpec struct { } var ServicePorts = map[string][]PortSpec{ - "orama-gateway": { - {Name: "Gateway API", Port: constants.GatewayAPIPort}, - }, "orama-olric": { {Name: "Olric HTTP", Port: constants.OlricHTTPPort}, {Name: "Olric Memberlist", Port: constants.OlricMemberlistPort}, }, "orama-node": { + {Name: "Gateway API", Port: constants.GatewayAPIPort}, // Gateway is embedded in orama-node {Name: "RQLite HTTP", Port: constants.RQLiteHTTPPort}, {Name: "RQLite Raft", Port: constants.RQLiteRaftPort}, }, @@ -67,7 +65,7 @@ func ResolveServiceName(alias string) ([]string, error) { "ipfs": {"orama-ipfs"}, "cluster": {"orama-ipfs-cluster"}, "ipfs-cluster": {"orama-ipfs-cluster"}, - "gateway": {"orama-gateway"}, + "gateway": {"orama-node"}, // Gateway is embedded in orama-node "olric": {"orama-olric"}, "rqlite": {"orama-node"}, // RQLite logs are in node logs } @@ -158,7 +156,6 @@ func IsServiceMasked(service string) (bool, error) { func GetProductionServices() []string { // Global/default service names globalServices := []string{ - "orama-gateway", "orama-node", "orama-olric", "orama-ipfs-cluster", diff --git a/pkg/environments/production/orchestrator.go b/pkg/environments/production/orchestrator.go index 8c212ca..9223d56 100644 --- a/pkg/environments/production/orchestrator.go +++ b/pkg/environments/production/orchestrator.go @@ -612,8 +612,8 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error { } ps.logf(" ✓ Anyone Relay service created (operator mode, ORPort: %d)", ps.anyoneRelayConfig.ORPort) } else if ps.IsAnyoneClient() { - anyoneUnit := ps.serviceGenerator.GenerateAnyoneRelayService() - if err := ps.serviceController.WriteServiceUnit("orama-anyone-relay.service", anyoneUnit); err != nil { + anyoneUnit := ps.serviceGenerator.GenerateAnyoneClientService() + if err := ps.serviceController.WriteServiceUnit("orama-anyone-client.service", anyoneUnit); err != nil { return fmt.Errorf("failed to write Anyone client service: %w", err) } ps.logf(" ✓ Anyone client service created (SocksPort 9050)") @@ -656,8 +656,10 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error { services := []string{"orama-ipfs.service", "orama-ipfs-cluster.service", "orama-olric.service", "orama-node.service"} // Add Anyone service if configured (relay or client) - if ps.IsAnyoneRelay() || ps.IsAnyoneClient() { + if ps.IsAnyoneRelay() { services = append(services, "orama-anyone-relay.service") + } else if ps.IsAnyoneClient() { + services = append(services, "orama-anyone-client.service") } // Add CoreDNS only for nameserver nodes @@ -698,7 +700,7 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error { infraServices = append(infraServices, "orama-anyone-relay.service") } } else if ps.IsAnyoneClient() { - infraServices = append(infraServices, "orama-anyone-relay.service") + infraServices = append(infraServices, "orama-anyone-client.service") } for _, svc := range infraServices { @@ -915,6 +917,9 @@ func (ps *ProductionSetup) LogSetupComplete(peerID string) { ps.logf(" Config: /etc/anon/anonrc") ps.logf(" Register at: https://dashboard.anyone.io") ps.logf(" IMPORTANT: You need 100 $ANYONE tokens in your wallet to receive rewards") + } else if ps.IsAnyoneClient() { + ps.logf("\nStart All Services:") + ps.logf(" systemctl start orama-ipfs orama-ipfs-cluster orama-olric orama-anyone-client orama-node") } else { ps.logf("\nStart All Services:") ps.logf(" systemctl start orama-ipfs orama-ipfs-cluster orama-olric orama-node") diff --git a/pkg/gateway/handlers/join/handler.go b/pkg/gateway/handlers/join/handler.go index 6d218a2..2be17f9 100644 --- a/pkg/gateway/handlers/join/handler.go +++ b/pkg/gateway/handlers/join/handler.go @@ -105,7 +105,17 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { return } - // 2. Assign WG IP with retry on conflict + // 2. Clean up stale WG entries for this public IP (from previous installs). + // This prevents ghost peers: old rows with different node_id/wg_key that + // the sync loop would keep trying to reach. + if _, err := h.rqliteClient.Exec(ctx, + "DELETE FROM wireguard_peers WHERE public_ip = ?", req.PublicIP); err != nil { + h.logger.Warn("failed to clean up stale WG entries", zap.Error(err)) + // Non-fatal: proceed with join + } + + // 3. Assign WG IP with retry on conflict (runs after cleanup so ghost IPs + // from this public_ip are not counted) wgIP, err := h.assignWGIP(ctx) if err != nil { h.logger.Error("failed to assign WG IP", zap.Error(err)) @@ -113,7 +123,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { return } - // 3. Register WG peer in database + // 4. Register WG peer in database nodeID := fmt.Sprintf("node-%s", wgIP) // temporary ID based on WG IP _, err = h.rqliteClient.Exec(ctx, "INSERT OR REPLACE INTO wireguard_peers (node_id, wg_ip, public_key, public_ip, wg_port) VALUES (?, ?, ?, ?, ?)", @@ -124,13 +134,13 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { return } - // 4. Add peer to local WireGuard interface immediately + // 5. Add peer to local WireGuard interface immediately if err := h.addWGPeerLocally(req.WGPublicKey, req.PublicIP, wgIP); err != nil { h.logger.Warn("failed to add WG peer to local interface", zap.Error(err)) // Non-fatal: the sync loop will pick it up } - // 5. Read secrets from disk + // 6. Read secrets from disk clusterSecret, err := os.ReadFile(h.oramaDir + "/secrets/cluster-secret") if err != nil { h.logger.Error("failed to read cluster secret", zap.Error(err)) @@ -145,7 +155,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { return } - // 6. Get all WG peers + // 7. Get all WG peers wgPeers, err := h.getWGPeers(ctx, req.WGPublicKey) if err != nil { h.logger.Error("failed to list WG peers", zap.Error(err)) @@ -153,7 +163,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { return } - // 7. Get this node's WG IP + // 8. Get this node's WG IP myWGIP, err := h.getMyWGIP() if err != nil { h.logger.Error("failed to get local WG IP", zap.Error(err)) @@ -161,14 +171,14 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { return } - // 8. Query IPFS and IPFS Cluster peer info + // 9. Query IPFS and IPFS Cluster peer info ipfsPeer := h.queryIPFSPeerInfo(myWGIP) ipfsClusterPeer := h.queryIPFSClusterPeerInfo(myWGIP) - // 9. Get this node's libp2p peer ID for bootstrap peers + // 10. Get this node's libp2p peer ID for bootstrap peers bootstrapPeers := h.buildBootstrapPeers(myWGIP, ipfsPeer.ID) - // 10. Read base domain from config + // 11. Read base domain from config baseDomain := h.readBaseDomain() // Build Olric seed peers from all existing WG peer IPs (memberlist port 3322) diff --git a/pkg/inspector/collector.go b/pkg/inspector/collector.go index e5ea49f..67470b2 100644 --- a/pkg/inspector/collector.go +++ b/pkg/inspector/collector.go @@ -617,7 +617,7 @@ systemctl is-active orama-ipfs-cluster 2>/dev/null echo "$SEP" curl -sf -X POST 'http://localhost:4501/api/v0/swarm/peers' 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d.get('Peers') or []))" 2>/dev/null || echo -1 echo "$SEP" -curl -sf 'http://localhost:9094/peers' 2>/dev/null | python3 -c "import sys,json; peers=json.load(sys.stdin); print(len(peers)); errs=sum(1 for p in peers if p.get('error','')); print(errs)" 2>/dev/null || echo -1 +curl -sf --max-time 10 'http://localhost:9094/peers' 2>/dev/null | python3 -c "import sys,json; peers=json.load(sys.stdin); print(len(peers)); errs=sum(1 for p in peers if p.get('error','')); print(errs)" 2>/dev/null || (curl -sf 'http://localhost:9094/id' 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); peers=d.get('cluster_peers',[]); print(len(peers)); print(0)" 2>/dev/null || echo -1) echo "$SEP" curl -sf -X POST 'http://localhost:4501/api/v0/repo/stat' 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('RepoSize',0)); print(d.get('StorageMax',0))" 2>/dev/null || echo -1 echo "$SEP" diff --git a/pkg/node/wireguard_sync.go b/pkg/node/wireguard_sync.go index d3a669c..c1fdc2f 100644 --- a/pkg/node/wireguard_sync.go +++ b/pkg/node/wireguard_sync.go @@ -162,6 +162,16 @@ func (n *Node) ensureWireGuardSelfRegistered(ctx context.Context) { ipfsPeerID := queryLocalIPFSPeerID() db := n.rqliteAdapter.GetSQLDB() + + // Clean up stale entries for this public IP with a different node_id. + // This prevents ghost peers from previous installs or from the temporary + // "node-10.0.0.X" ID that the join handler creates. + if _, err := db.ExecContext(ctx, + "DELETE FROM wireguard_peers WHERE public_ip = ? AND node_id != ?", + publicIP, nodeID); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to clean stale WG entries", zap.Error(err)) + } + _, err = db.ExecContext(ctx, "INSERT OR REPLACE INTO wireguard_peers (node_id, wg_ip, public_key, public_ip, wg_port, ipfs_peer_id) VALUES (?, ?, ?, ?, ?, ?)", nodeID, wgIP, localPubKey, publicIP, 51820, ipfsPeerID) diff --git a/pkg/rqlite/process.go b/pkg/rqlite/process.go index 1078c03..a70c692 100644 --- a/pkg/rqlite/process.go +++ b/pkg/rqlite/process.go @@ -197,7 +197,15 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) _ = os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", r.cmd.Process.Pid)), 0644) r.logger.Info("RQLite process started", zap.Int("pid", r.cmd.Process.Pid), zap.String("pid_file", pidPath)) - logFile.Close() + // Reap the child process in the background to prevent zombies. + // Stop() waits on this channel instead of calling cmd.Wait() directly. + r.waitDone = make(chan struct{}) + go func() { + _ = r.cmd.Wait() + logFile.Close() + close(r.waitDone) + }() + return nil } diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index eda0c44..a456ba8 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -24,6 +24,7 @@ type RQLiteManager struct { cmd *exec.Cmd connection *gorqlite.Connection discoveryService *ClusterDiscoveryService + waitDone chan struct{} // closed when cmd.Wait() completes (reaps zombie) } // NewRQLiteManager creates a new RQLite manager @@ -118,16 +119,16 @@ func (r *RQLiteManager) Stop() error { _ = r.cmd.Process.Signal(syscall.SIGTERM) - done := make(chan error, 1) - go func() { done <- r.cmd.Wait() }() - - // Give RQLite 30s to flush pending writes and shut down gracefully - // (previously 5s which risked Raft log corruption) - select { - case <-done: - case <-time.After(30 * time.Second): - r.logger.Warn("RQLite did not stop within 30s, sending SIGKILL") - _ = r.cmd.Process.Kill() + // Wait for the background reaper goroutine (started in launchProcess) to + // collect the child process. This avoids a double cmd.Wait() panic. + if r.waitDone != nil { + select { + case <-r.waitDone: + case <-time.After(30 * time.Second): + r.logger.Warn("RQLite did not stop within 30s, sending SIGKILL") + _ = r.cmd.Process.Kill() + <-r.waitDone // wait for reaper after kill + } } // Clean up PID file diff --git a/scripts/clean-testnet.sh b/scripts/clean-testnet.sh index b340bb1..5c4eb83 100755 --- a/scripts/clean-testnet.sh +++ b/scripts/clean-testnet.sh @@ -35,6 +35,18 @@ systemctl disable orama-node orama-gateway orama-ipfs orama-ipfs-cluster orama-o systemctl stop debros-anyone-relay debros-anyone-client 2>/dev/null || true systemctl disable debros-anyone-relay debros-anyone-client 2>/dev/null || true +echo " Killing leftover processes..." +# Kill any orama/ipfs/olric/rqlite/coredns/caddy processes that survived systemd stop +pkill -f orama-node 2>/dev/null || true +pkill -f orama-gateway 2>/dev/null || true +pkill -f ipfs-cluster-service 2>/dev/null || true +pkill -f "ipfs daemon" 2>/dev/null || true +pkill -f olric-server 2>/dev/null || true +pkill -f rqlited 2>/dev/null || true +pkill -f coredns 2>/dev/null || true +# Don't pkill caddy — it's a common system service +sleep 1 + echo " Removing systemd service files..." rm -f /etc/systemd/system/orama-*.service rm -f /etc/systemd/system/debros-*.service