A ton of updates on the monitoring mostly bug fixes

This commit is contained in:
anonpenguin23 2026-02-16 16:35:29 +02:00
parent f889c2e358
commit 0b5b6e68e3
18 changed files with 246 additions and 73 deletions

View File

@ -11,6 +11,12 @@ Run this as root or with sudo on the target VPS:
sudo systemctl stop orama-node orama-ipfs orama-ipfs-cluster orama-olric orama-anyone-relay orama-anyone-client coredns caddy 2>/dev/null
sudo systemctl disable orama-node orama-ipfs orama-ipfs-cluster orama-olric orama-anyone-relay orama-anyone-client coredns caddy 2>/dev/null
# 1b. Kill leftover processes (binaries may run outside systemd)
sudo pkill -f orama-node 2>/dev/null; sudo pkill -f ipfs-cluster-service 2>/dev/null
sudo pkill -f "ipfs daemon" 2>/dev/null; sudo pkill -f olric-server 2>/dev/null
sudo pkill -f rqlited 2>/dev/null; sudo pkill -f coredns 2>/dev/null
sleep 1
# 2. Remove systemd service files
sudo rm -f /etc/systemd/system/orama-*.service
sudo rm -f /etc/systemd/system/coredns.service

View File

@ -24,6 +24,55 @@ type Alert struct {
Message string `json:"message"`
}
// joiningGraceSec is the grace period (in seconds) after a node starts during
// which unreachability alerts from other nodes are downgraded to info.
const joiningGraceSec = 300
// nodeContext carries per-node metadata needed for context-aware alerting.
type nodeContext struct {
host string
role string // "node", "nameserver-ns1", etc.
isNameserver bool
isJoining bool // orama-node active_since_sec < joiningGraceSec
uptimeSec int // orama-node active_since_sec
}
// buildNodeContexts builds a map of WG IP -> nodeContext for all healthy nodes.
func buildNodeContexts(snap *ClusterSnapshot) map[string]*nodeContext {
ctxMap := make(map[string]*nodeContext)
for _, cs := range snap.Nodes {
if cs.Report == nil {
continue
}
r := cs.Report
host := nodeHost(r)
nc := &nodeContext{
host: host,
role: cs.Node.Role,
isNameserver: strings.HasPrefix(cs.Node.Role, "nameserver"),
}
// Determine uptime from orama-node service
if r.Services != nil {
for _, svc := range r.Services.Services {
if svc.Name == "orama-node" && svc.ActiveState == "active" {
nc.uptimeSec = int(svc.ActiveSinceSec)
nc.isJoining = svc.ActiveSinceSec < joiningGraceSec
break
}
}
}
ctxMap[host] = nc
// Also index by WG IP for cross-node RQLite unreachability lookups
if r.WireGuard != nil && r.WireGuard.WgIP != "" {
ctxMap[r.WireGuard.WgIP] = nc
}
}
return ctxMap
}
// DeriveAlerts scans a ClusterSnapshot and produces alerts.
func DeriveAlerts(snap *ClusterSnapshot) []Alert {
var alerts []Alert
@ -45,6 +94,9 @@ func DeriveAlerts(snap *ClusterSnapshot) []Alert {
return alerts
}
// Build context map for role/uptime-aware alerting
nodeCtxMap := buildNodeContexts(snap)
// Cross-node checks
alerts = append(alerts, checkRQLiteLeader(reports)...)
alerts = append(alerts, checkRQLiteQuorum(reports)...)
@ -60,11 +112,12 @@ func DeriveAlerts(snap *ClusterSnapshot) []Alert {
// Per-node checks
for _, r := range reports {
host := nodeHost(r)
alerts = append(alerts, checkNodeRQLite(r, host)...)
nc := nodeCtxMap[host]
alerts = append(alerts, checkNodeRQLite(r, host, nodeCtxMap)...)
alerts = append(alerts, checkNodeWireGuard(r, host)...)
alerts = append(alerts, checkNodeSystem(r, host)...)
alerts = append(alerts, checkNodeServices(r, host)...)
alerts = append(alerts, checkNodeDNS(r, host)...)
alerts = append(alerts, checkNodeServices(r, host, nc)...)
alerts = append(alerts, checkNodeDNS(r, host, nc)...)
alerts = append(alerts, checkNodeAnyone(r, host)...)
alerts = append(alerts, checkNodeProcesses(r, host)...)
alerts = append(alerts, checkNodeNamespaces(r, host)...)
@ -377,7 +430,7 @@ func checkIPFSClusterConsistency(reports []*report.NodeReport) []Alert {
// Per-node checks
// ---------------------------------------------------------------------------
func checkNodeRQLite(r *report.NodeReport, host string) []Alert {
func checkNodeRQLite(r *report.NodeReport, host string, nodeCtxMap map[string]*nodeContext) []Alert {
if r.RQLite == nil {
return nil
}
@ -428,13 +481,23 @@ func checkNodeRQLite(r *report.NodeReport, host string) []Alert {
fmt.Sprintf("RQLite heap memory high: %dMB", r.RQLite.HeapMB)})
}
// Cluster partition detection: check if this node reports other nodes as unreachable
// Cluster partition detection: check if this node reports other nodes as unreachable.
// If the unreachable node recently joined (< 5 min), downgrade to info — probes
// may not have succeeded yet and this is expected transient behavior.
for nodeAddr, info := range r.RQLite.Nodes {
if !info.Reachable {
// nodeAddr is like "10.0.0.4:7001" — extract the IP to look up context
targetIP := strings.Split(nodeAddr, ":")[0]
if targetCtx, ok := nodeCtxMap[targetIP]; ok && targetCtx.isJoining {
alerts = append(alerts, Alert{AlertInfo, "rqlite", host,
fmt.Sprintf("Node %s recently joined (%ds ago), probe pending for %s",
targetCtx.host, targetCtx.uptimeSec, nodeAddr)})
} else {
alerts = append(alerts, Alert{AlertCritical, "rqlite", host,
fmt.Sprintf("RQLite reports node %s unreachable (cluster partition)", nodeAddr)})
}
}
}
// Debug vars
if dv := r.RQLite.DebugVars; dv != nil {
@ -521,12 +584,17 @@ func checkNodeSystem(r *report.NodeReport, host string) []Alert {
return alerts
}
func checkNodeServices(r *report.NodeReport, host string) []Alert {
func checkNodeServices(r *report.NodeReport, host string, nc *nodeContext) []Alert {
if r.Services == nil {
return nil
}
var alerts []Alert
for _, svc := range r.Services.Services {
// Skip services that are expected to be inactive based on node role/mode
if shouldSkipServiceAlert(svc.Name, svc.ActiveState, r, nc) {
continue
}
if svc.ActiveState == "failed" {
alerts = append(alerts, Alert{AlertCritical, "service", host,
fmt.Sprintf("Service %s is FAILED", svc.Name)})
@ -546,17 +614,57 @@ func checkNodeServices(r *report.NodeReport, host string) []Alert {
return alerts
}
func checkNodeDNS(r *report.NodeReport, host string) []Alert {
// shouldSkipServiceAlert returns true if this service being inactive is expected
// given the node's role and anyone mode.
func shouldSkipServiceAlert(svcName, state string, r *report.NodeReport, nc *nodeContext) bool {
if state == "active" || state == "failed" {
return false // always report active (no alert) and failed (always alert)
}
// CoreDNS: only expected on nameserver nodes
if svcName == "coredns" && (nc == nil || !nc.isNameserver) {
return true
}
// Anyone services: only alert for the mode the node is configured for
if r.Anyone != nil {
mode := r.Anyone.Mode
if svcName == "orama-anyone-client" && mode == "relay" {
return true // relay node doesn't run client
}
if svcName == "orama-anyone-relay" && mode == "client" {
return true // client node doesn't run relay
}
}
// If anyone section is nil (no anyone configured), skip both anyone services
if r.Anyone == nil && (svcName == "orama-anyone-client" || svcName == "orama-anyone-relay") {
return true
}
return false
}
func checkNodeDNS(r *report.NodeReport, host string, nc *nodeContext) []Alert {
if r.DNS == nil {
return nil
}
isNameserver := nc != nil && nc.isNameserver
var alerts []Alert
if !r.DNS.CoreDNSActive {
// CoreDNS: only check on nameserver nodes
if isNameserver && !r.DNS.CoreDNSActive {
alerts = append(alerts, Alert{AlertCritical, "dns", host, "CoreDNS is down"})
}
// Caddy: check on all nodes (any node can host namespaces)
if !r.DNS.CaddyActive {
alerts = append(alerts, Alert{AlertCritical, "dns", host, "Caddy is down"})
}
// TLS cert expiry: only meaningful on nameserver nodes that have public domains
if isNameserver {
if r.DNS.BaseTLSDaysLeft >= 0 && r.DNS.BaseTLSDaysLeft < 14 {
alerts = append(alerts, Alert{AlertWarning, "dns", host,
fmt.Sprintf("Base TLS cert expires in %d days", r.DNS.BaseTLSDaysLeft)})
@ -565,11 +673,13 @@ func checkNodeDNS(r *report.NodeReport, host string) []Alert {
alerts = append(alerts, Alert{AlertWarning, "dns", host,
fmt.Sprintf("Wildcard TLS cert expires in %d days", r.DNS.WildTLSDaysLeft)})
}
if r.DNS.CoreDNSActive && !r.DNS.SOAResolves {
}
// DNS resolution checks: only on nameserver nodes with CoreDNS running
if isNameserver && r.DNS.CoreDNSActive {
if !r.DNS.SOAResolves {
alerts = append(alerts, Alert{AlertWarning, "dns", host, "SOA record not resolving"})
}
// Additional DNS checks (only when CoreDNS is running)
if r.DNS.CoreDNSActive {
if !r.DNS.WildcardResolves {
alerts = append(alerts, Alert{AlertWarning, "dns", host, "Wildcard DNS not resolving"})
}
@ -583,6 +693,7 @@ func checkNodeDNS(r *report.NodeReport, host string) []Alert {
alerts = append(alerts, Alert{AlertCritical, "dns", host, "CoreDNS active but port 53 not bound"})
}
}
if r.DNS.CaddyActive && !r.DNS.Port443Bound {
alerts = append(alerts, Alert{AlertCritical, "dns", host, "Caddy active but port 443 not bound"})
}

View File

@ -62,7 +62,7 @@ func (r *RemoteOrchestrator) buildRemoteCommand() string {
if r.node.User != "root" {
args = append(args, "sudo")
}
args = append(args, "orama", "install")
args = append(args, "orama", "node", "install")
args = append(args, "--vps-ip", r.flags.VpsIP)

View File

@ -47,14 +47,13 @@ func HandleRestartWithFlags(force bool) {
fmt.Printf("\n Stopping namespace services...\n")
stopAllNamespaceServices()
// Ordered stop: gateway first, then node (RQLite), then supporting services
// Ordered stop: node first (includes embedded gateway + RQLite), then supporting services
fmt.Printf("\n Stopping services (ordered)...\n")
shutdownOrder := [][]string{
{"orama-gateway"},
{"orama-node"},
{"orama-olric"},
{"orama-ipfs-cluster", "orama-ipfs"},
{"orama-anyone-relay", "anyone-client"},
{"orama-anyone-relay", "orama-anyone-client"},
{"coredns", "caddy"},
}

View File

@ -50,15 +50,13 @@ func HandleStopWithFlags(force bool) {
fmt.Printf("\n Stopping main services (ordered)...\n")
// Ordered shutdown: gateway first, then node (RQLite), then supporting services
// This ensures we stop accepting requests before shutting down the database
// Ordered shutdown: node first (includes embedded gateway + RQLite), then supporting services
shutdownOrder := [][]string{
{"orama-gateway"}, // 1. Stop accepting new requests
{"orama-node"}, // 2. Stop node (includes RQLite with leadership transfer)
{"orama-olric"}, // 3. Stop cache
{"orama-ipfs-cluster", "orama-ipfs"}, // 4. Stop storage
{"orama-anyone-relay", "anyone-client"}, // 5. Stop privacy relay
{"coredns", "caddy"}, // 6. Stop DNS/TLS last
{"orama-node"}, // 1. Stop node (includes gateway + RQLite with leadership transfer)
{"orama-olric"}, // 2. Stop cache
{"orama-ipfs-cluster", "orama-ipfs"}, // 3. Stop storage
{"orama-anyone-relay", "orama-anyone-client"}, // 4. Stop privacy relay
{"coredns", "caddy"}, // 5. Stop DNS/TLS last
}
// First, disable all services to prevent auto-restart

View File

@ -46,9 +46,12 @@ func collectIPFS() *IPFSReport {
}
}
// 4. ClusterPeerCount: GET /peers
// 4. ClusterPeerCount: GET /peers (with fallback to /id)
// The /peers endpoint does a synchronous round-trip to ALL cluster peers,
// so it can be slow if some peers are unreachable (ghost WG entries, etc.).
// Use a generous timeout and fall back to /id if /peers times out.
{
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if body, err := httpGet(ctx, "http://localhost:9094/peers"); err == nil {
var peers []interface{}
@ -57,6 +60,21 @@ func collectIPFS() *IPFSReport {
}
}
}
// Fallback: if /peers returned 0 (timeout or error), try /id which returns
// cached cluster_peers instantly without contacting other nodes.
if r.ClusterPeerCount == 0 {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if body, err := httpGet(ctx, "http://localhost:9094/id"); err == nil {
var resp struct {
ClusterPeers []string `json:"cluster_peers"`
}
if err := json.Unmarshal(body, &resp); err == nil && len(resp.ClusterPeers) > 0 {
// cluster_peers includes self, so count is len(cluster_peers)
r.ClusterPeerCount = len(resp.ClusterPeers)
}
}
}
// 5. RepoSizeBytes/RepoMaxBytes: POST /api/v0/repo/stat
{

View File

@ -84,7 +84,7 @@ func collectProcesses() *ProcessReport {
// managedServiceUnits lists systemd units whose MainPID should be excluded from orphan detection.
var managedServiceUnits = []string{
"orama-node", "orama-gateway", "orama-olric",
"orama-node", "orama-olric",
"orama-ipfs", "orama-ipfs-cluster",
"orama-anyone-relay", "orama-anyone-client",
"coredns", "caddy", "rqlited",

View File

@ -10,7 +10,6 @@ import (
var coreServices = []string{
"orama-node",
"orama-gateway",
"orama-olric",
"orama-ipfs",
"orama-ipfs-cluster",

View File

@ -18,7 +18,7 @@ func Handle() {
// Note: RQLite is managed by node process, not as separate service
"orama-olric",
"orama-node",
"orama-gateway",
// Note: gateway is embedded in orama-node, no separate service
}
// Friendly descriptions
@ -26,8 +26,7 @@ func Handle() {
"orama-ipfs": "IPFS Daemon",
"orama-ipfs-cluster": "IPFS Cluster",
"orama-olric": "Olric Cache Server",
"orama-node": "Orama Node (includes RQLite)",
"orama-gateway": "Orama Gateway",
"orama-node": "Orama Node (includes RQLite + Gateway)",
}
fmt.Printf("Services:\n")

View File

@ -33,9 +33,9 @@ func Handle() {
fmt.Printf("Stopping namespace services...\n")
stopNamespaceServices()
// All global services (was missing: orama-anyone-relay, coredns, caddy)
// All global services (orama-gateway is legacy — now embedded in orama-node)
services := []string{
"orama-gateway",
"orama-gateway", // Legacy: kept for cleanup of old installs
"orama-node",
"orama-olric",
"orama-ipfs-cluster",

View File

@ -23,14 +23,12 @@ type PortSpec struct {
}
var ServicePorts = map[string][]PortSpec{
"orama-gateway": {
{Name: "Gateway API", Port: constants.GatewayAPIPort},
},
"orama-olric": {
{Name: "Olric HTTP", Port: constants.OlricHTTPPort},
{Name: "Olric Memberlist", Port: constants.OlricMemberlistPort},
},
"orama-node": {
{Name: "Gateway API", Port: constants.GatewayAPIPort}, // Gateway is embedded in orama-node
{Name: "RQLite HTTP", Port: constants.RQLiteHTTPPort},
{Name: "RQLite Raft", Port: constants.RQLiteRaftPort},
},
@ -67,7 +65,7 @@ func ResolveServiceName(alias string) ([]string, error) {
"ipfs": {"orama-ipfs"},
"cluster": {"orama-ipfs-cluster"},
"ipfs-cluster": {"orama-ipfs-cluster"},
"gateway": {"orama-gateway"},
"gateway": {"orama-node"}, // Gateway is embedded in orama-node
"olric": {"orama-olric"},
"rqlite": {"orama-node"}, // RQLite logs are in node logs
}
@ -158,7 +156,6 @@ func IsServiceMasked(service string) (bool, error) {
func GetProductionServices() []string {
// Global/default service names
globalServices := []string{
"orama-gateway",
"orama-node",
"orama-olric",
"orama-ipfs-cluster",

View File

@ -612,8 +612,8 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error {
}
ps.logf(" ✓ Anyone Relay service created (operator mode, ORPort: %d)", ps.anyoneRelayConfig.ORPort)
} else if ps.IsAnyoneClient() {
anyoneUnit := ps.serviceGenerator.GenerateAnyoneRelayService()
if err := ps.serviceController.WriteServiceUnit("orama-anyone-relay.service", anyoneUnit); err != nil {
anyoneUnit := ps.serviceGenerator.GenerateAnyoneClientService()
if err := ps.serviceController.WriteServiceUnit("orama-anyone-client.service", anyoneUnit); err != nil {
return fmt.Errorf("failed to write Anyone client service: %w", err)
}
ps.logf(" ✓ Anyone client service created (SocksPort 9050)")
@ -656,8 +656,10 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error {
services := []string{"orama-ipfs.service", "orama-ipfs-cluster.service", "orama-olric.service", "orama-node.service"}
// Add Anyone service if configured (relay or client)
if ps.IsAnyoneRelay() || ps.IsAnyoneClient() {
if ps.IsAnyoneRelay() {
services = append(services, "orama-anyone-relay.service")
} else if ps.IsAnyoneClient() {
services = append(services, "orama-anyone-client.service")
}
// Add CoreDNS only for nameserver nodes
@ -698,7 +700,7 @@ func (ps *ProductionSetup) Phase5CreateSystemdServices(enableHTTPS bool) error {
infraServices = append(infraServices, "orama-anyone-relay.service")
}
} else if ps.IsAnyoneClient() {
infraServices = append(infraServices, "orama-anyone-relay.service")
infraServices = append(infraServices, "orama-anyone-client.service")
}
for _, svc := range infraServices {
@ -915,6 +917,9 @@ func (ps *ProductionSetup) LogSetupComplete(peerID string) {
ps.logf(" Config: /etc/anon/anonrc")
ps.logf(" Register at: https://dashboard.anyone.io")
ps.logf(" IMPORTANT: You need 100 $ANYONE tokens in your wallet to receive rewards")
} else if ps.IsAnyoneClient() {
ps.logf("\nStart All Services:")
ps.logf(" systemctl start orama-ipfs orama-ipfs-cluster orama-olric orama-anyone-client orama-node")
} else {
ps.logf("\nStart All Services:")
ps.logf(" systemctl start orama-ipfs orama-ipfs-cluster orama-olric orama-node")

View File

@ -105,7 +105,17 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
return
}
// 2. Assign WG IP with retry on conflict
// 2. Clean up stale WG entries for this public IP (from previous installs).
// This prevents ghost peers: old rows with different node_id/wg_key that
// the sync loop would keep trying to reach.
if _, err := h.rqliteClient.Exec(ctx,
"DELETE FROM wireguard_peers WHERE public_ip = ?", req.PublicIP); err != nil {
h.logger.Warn("failed to clean up stale WG entries", zap.Error(err))
// Non-fatal: proceed with join
}
// 3. Assign WG IP with retry on conflict (runs after cleanup so ghost IPs
// from this public_ip are not counted)
wgIP, err := h.assignWGIP(ctx)
if err != nil {
h.logger.Error("failed to assign WG IP", zap.Error(err))
@ -113,7 +123,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
return
}
// 3. Register WG peer in database
// 4. Register WG peer in database
nodeID := fmt.Sprintf("node-%s", wgIP) // temporary ID based on WG IP
_, err = h.rqliteClient.Exec(ctx,
"INSERT OR REPLACE INTO wireguard_peers (node_id, wg_ip, public_key, public_ip, wg_port) VALUES (?, ?, ?, ?, ?)",
@ -124,13 +134,13 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
return
}
// 4. Add peer to local WireGuard interface immediately
// 5. Add peer to local WireGuard interface immediately
if err := h.addWGPeerLocally(req.WGPublicKey, req.PublicIP, wgIP); err != nil {
h.logger.Warn("failed to add WG peer to local interface", zap.Error(err))
// Non-fatal: the sync loop will pick it up
}
// 5. Read secrets from disk
// 6. Read secrets from disk
clusterSecret, err := os.ReadFile(h.oramaDir + "/secrets/cluster-secret")
if err != nil {
h.logger.Error("failed to read cluster secret", zap.Error(err))
@ -145,7 +155,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
return
}
// 6. Get all WG peers
// 7. Get all WG peers
wgPeers, err := h.getWGPeers(ctx, req.WGPublicKey)
if err != nil {
h.logger.Error("failed to list WG peers", zap.Error(err))
@ -153,7 +163,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
return
}
// 7. Get this node's WG IP
// 8. Get this node's WG IP
myWGIP, err := h.getMyWGIP()
if err != nil {
h.logger.Error("failed to get local WG IP", zap.Error(err))
@ -161,14 +171,14 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
return
}
// 8. Query IPFS and IPFS Cluster peer info
// 9. Query IPFS and IPFS Cluster peer info
ipfsPeer := h.queryIPFSPeerInfo(myWGIP)
ipfsClusterPeer := h.queryIPFSClusterPeerInfo(myWGIP)
// 9. Get this node's libp2p peer ID for bootstrap peers
// 10. Get this node's libp2p peer ID for bootstrap peers
bootstrapPeers := h.buildBootstrapPeers(myWGIP, ipfsPeer.ID)
// 10. Read base domain from config
// 11. Read base domain from config
baseDomain := h.readBaseDomain()
// Build Olric seed peers from all existing WG peer IPs (memberlist port 3322)

View File

@ -617,7 +617,7 @@ systemctl is-active orama-ipfs-cluster 2>/dev/null
echo "$SEP"
curl -sf -X POST 'http://localhost:4501/api/v0/swarm/peers' 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d.get('Peers') or []))" 2>/dev/null || echo -1
echo "$SEP"
curl -sf 'http://localhost:9094/peers' 2>/dev/null | python3 -c "import sys,json; peers=json.load(sys.stdin); print(len(peers)); errs=sum(1 for p in peers if p.get('error','')); print(errs)" 2>/dev/null || echo -1
curl -sf --max-time 10 'http://localhost:9094/peers' 2>/dev/null | python3 -c "import sys,json; peers=json.load(sys.stdin); print(len(peers)); errs=sum(1 for p in peers if p.get('error','')); print(errs)" 2>/dev/null || (curl -sf 'http://localhost:9094/id' 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); peers=d.get('cluster_peers',[]); print(len(peers)); print(0)" 2>/dev/null || echo -1)
echo "$SEP"
curl -sf -X POST 'http://localhost:4501/api/v0/repo/stat' 2>/dev/null | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('RepoSize',0)); print(d.get('StorageMax',0))" 2>/dev/null || echo -1
echo "$SEP"

View File

@ -162,6 +162,16 @@ func (n *Node) ensureWireGuardSelfRegistered(ctx context.Context) {
ipfsPeerID := queryLocalIPFSPeerID()
db := n.rqliteAdapter.GetSQLDB()
// Clean up stale entries for this public IP with a different node_id.
// This prevents ghost peers from previous installs or from the temporary
// "node-10.0.0.X" ID that the join handler creates.
if _, err := db.ExecContext(ctx,
"DELETE FROM wireguard_peers WHERE public_ip = ? AND node_id != ?",
publicIP, nodeID); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "Failed to clean stale WG entries", zap.Error(err))
}
_, err = db.ExecContext(ctx,
"INSERT OR REPLACE INTO wireguard_peers (node_id, wg_ip, public_key, public_ip, wg_port, ipfs_peer_id) VALUES (?, ?, ?, ?, ?, ?)",
nodeID, wgIP, localPubKey, publicIP, 51820, ipfsPeerID)

View File

@ -197,7 +197,15 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string)
_ = os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", r.cmd.Process.Pid)), 0644)
r.logger.Info("RQLite process started", zap.Int("pid", r.cmd.Process.Pid), zap.String("pid_file", pidPath))
// Reap the child process in the background to prevent zombies.
// Stop() waits on this channel instead of calling cmd.Wait() directly.
r.waitDone = make(chan struct{})
go func() {
_ = r.cmd.Wait()
logFile.Close()
close(r.waitDone)
}()
return nil
}

View File

@ -24,6 +24,7 @@ type RQLiteManager struct {
cmd *exec.Cmd
connection *gorqlite.Connection
discoveryService *ClusterDiscoveryService
waitDone chan struct{} // closed when cmd.Wait() completes (reaps zombie)
}
// NewRQLiteManager creates a new RQLite manager
@ -118,16 +119,16 @@ func (r *RQLiteManager) Stop() error {
_ = r.cmd.Process.Signal(syscall.SIGTERM)
done := make(chan error, 1)
go func() { done <- r.cmd.Wait() }()
// Give RQLite 30s to flush pending writes and shut down gracefully
// (previously 5s which risked Raft log corruption)
// Wait for the background reaper goroutine (started in launchProcess) to
// collect the child process. This avoids a double cmd.Wait() panic.
if r.waitDone != nil {
select {
case <-done:
case <-r.waitDone:
case <-time.After(30 * time.Second):
r.logger.Warn("RQLite did not stop within 30s, sending SIGKILL")
_ = r.cmd.Process.Kill()
<-r.waitDone // wait for reaper after kill
}
}
// Clean up PID file

View File

@ -35,6 +35,18 @@ systemctl disable orama-node orama-gateway orama-ipfs orama-ipfs-cluster orama-o
systemctl stop debros-anyone-relay debros-anyone-client 2>/dev/null || true
systemctl disable debros-anyone-relay debros-anyone-client 2>/dev/null || true
echo " Killing leftover processes..."
# Kill any orama/ipfs/olric/rqlite/coredns/caddy processes that survived systemd stop
pkill -f orama-node 2>/dev/null || true
pkill -f orama-gateway 2>/dev/null || true
pkill -f ipfs-cluster-service 2>/dev/null || true
pkill -f "ipfs daemon" 2>/dev/null || true
pkill -f olric-server 2>/dev/null || true
pkill -f rqlited 2>/dev/null || true
pkill -f coredns 2>/dev/null || true
# Don't pkill caddy — it's a common system service
sleep 1
echo " Removing systemd service files..."
rm -f /etc/systemd/system/orama-*.service
rm -f /etc/systemd/system/debros-*.service