diff --git a/Makefile b/Makefile index eabb8ee..99068ad 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ test-e2e-quick: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill redeploy-devnet redeploy-testnet release health -VERSION := 0.102.4 +VERSION := 0.102.5 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/gateway/status_handlers.go b/pkg/gateway/status_handlers.go index 8b7f683..c869b38 100644 --- a/pkg/gateway/status_handlers.go +++ b/pkg/gateway/status_handlers.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/DeBrosOfficial/network/pkg/anyoneproxy" "github.com/DeBrosOfficial/network/pkg/client" ) @@ -51,7 +52,7 @@ func (g *Gateway) healthHandler(w http.ResponseWriter, r *http.Request) { name string result checkResult } - ch := make(chan namedResult, 4) + ch := make(chan namedResult, 5) // RQLite go func() { @@ -118,9 +119,25 @@ func (g *Gateway) healthHandler(w http.ResponseWriter, r *http.Request) { ch <- nr }() + // Anyone proxy (SOCKS5) + go func() { + nr := namedResult{name: "anyone"} + if !anyoneproxy.Enabled() { + nr.result = checkResult{Status: "unavailable"} + } else { + start := time.Now() + if anyoneproxy.Running() { + nr.result = checkResult{Status: "ok", Latency: time.Since(start).String()} + } else { + nr.result = checkResult{Status: "error", Latency: time.Since(start).String(), Error: "SOCKS5 proxy not reachable at " + anyoneproxy.Address()} + } + } + ch <- nr + }() + // Collect - checks := make(map[string]checkResult, 4) - for i := 0; i < 4; i++ { + checks := make(map[string]checkResult, 5) + for i := 0; i < 5; i++ { nr := <-ch checks[nr.name] = nr.result } diff --git a/pkg/inspector/checks/network.go b/pkg/inspector/checks/network.go index 6fa8cbe..52083c3 100644 --- a/pkg/inspector/checks/network.go +++ b/pkg/inspector/checks/network.go @@ -79,11 +79,13 @@ func checkNetworkPerNode(nd *inspector.NodeData) []inspector.CheckResult { } // 7.8 TCP retransmission rate + // Thresholds are relaxed for WireGuard-encapsulated traffic across VPS providers: + // <2% normal, 2-10% elevated (warn), >=10% problematic (fail). if net.TCPRetransRate >= 0 { - if net.TCPRetransRate < 1 { + if net.TCPRetransRate < 2 { r = append(r, inspector.Pass("network.tcp_retrans", "TCP retransmission rate low", networkSub, node, fmt.Sprintf("retrans=%.2f%%", net.TCPRetransRate), inspector.Medium)) - } else if net.TCPRetransRate < 5 { + } else if net.TCPRetransRate < 10 { r = append(r, inspector.Warn("network.tcp_retrans", "TCP retransmission rate low", networkSub, node, fmt.Sprintf("retrans=%.2f%% (elevated)", net.TCPRetransRate), inspector.Medium)) } else { diff --git a/pkg/inspector/checks/network_test.go b/pkg/inspector/checks/network_test.go index cb6a902..d086646 100644 --- a/pkg/inspector/checks/network_test.go +++ b/pkg/inspector/checks/network_test.go @@ -93,9 +93,9 @@ func TestCheckNetwork_TCPRetransmission(t *testing.T) { rate float64 status inspector.Status }{ - {"low", 0.1, inspector.StatusPass}, - {"elevated", 3.0, inspector.StatusWarn}, - {"high", 8.0, inspector.StatusFail}, + {"low", 0.5, inspector.StatusPass}, + {"elevated", 6.0, inspector.StatusWarn}, + {"high", 12.0, inspector.StatusFail}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/inspector/checks/rqlite.go b/pkg/inspector/checks/rqlite.go index 39576cd..cf2cf52 100644 --- a/pkg/inspector/checks/rqlite.go +++ b/pkg/inspector/checks/rqlite.go @@ -18,21 +18,57 @@ const rqliteSub = "rqlite" func CheckRQLite(data *inspector.ClusterData) []inspector.CheckResult { var results []inspector.CheckResult + // Find the leader's authoritative /nodes data + leaderNodes := findLeaderNodes(data) + // Per-node checks for _, nd := range data.Nodes { if nd.RQLite == nil { continue } - results = append(results, checkRQLitePerNode(nd, data)...) + results = append(results, checkRQLitePerNode(nd, data, leaderNodes)...) } // Cross-node checks - results = append(results, checkRQLiteCrossNode(data)...) + results = append(results, checkRQLiteCrossNode(data, leaderNodes)...) return results } -func checkRQLitePerNode(nd *inspector.NodeData, data *inspector.ClusterData) []inspector.CheckResult { +// findLeaderNodes returns the leader's /nodes map as the authoritative cluster membership. +func findLeaderNodes(data *inspector.ClusterData) map[string]*inspector.RQLiteNode { + for _, nd := range data.Nodes { + if nd.RQLite != nil && nd.RQLite.Status != nil && nd.RQLite.Status.RaftState == "Leader" && nd.RQLite.Nodes != nil { + return nd.RQLite.Nodes + } + } + return nil +} + +// nodeIP extracts the IP from a "host:port" address. +func nodeIP(addr string) string { + if idx := strings.LastIndex(addr, ":"); idx >= 0 { + return addr[:idx] + } + return addr +} + +// lookupInLeaderNodes finds a node in the leader's /nodes map by matching IP. +// Leader's /nodes keys use HTTP port (5001), while node IDs use Raft port (7001). +func lookupInLeaderNodes(leaderNodes map[string]*inspector.RQLiteNode, nodeID string) *inspector.RQLiteNode { + if leaderNodes == nil { + return nil + } + ip := nodeIP(nodeID) + for addr, n := range leaderNodes { + if nodeIP(addr) == ip { + return n + } + } + return nil +} + +func checkRQLitePerNode(nd *inspector.NodeData, data *inspector.ClusterData, leaderNodes map[string]*inspector.RQLiteNode) []inspector.CheckResult { var r []inspector.CheckResult rq := nd.RQLite node := nd.Node.Name() @@ -99,19 +135,39 @@ func checkRQLitePerNode(nd *inspector.NodeData, data *inspector.ClusterData) []i fmt.Sprintf("leader=%s", s.LeaderNodeID), inspector.Critical)) } - // 1.8 Voter status - if s.Voter { + // 1.8 Voter status — use leader's /nodes as authoritative source + if leaderNode := lookupInLeaderNodes(leaderNodes, s.NodeID); leaderNode != nil { + if leaderNode.Voter { + r = append(r, inspector.Pass("rqlite.voter", "Node is voter", rqliteSub, node, + "voter=true (confirmed by leader)", inspector.Low)) + } else { + r = append(r, inspector.Pass("rqlite.voter", "Node is non-voter", rqliteSub, node, + "non-voter (by design, confirmed by leader)", inspector.Low)) + } + } else if s.Voter { r = append(r, inspector.Pass("rqlite.voter", "Node is voter", rqliteSub, node, "voter=true", inspector.Low)) } else { - r = append(r, inspector.Warn("rqlite.voter", "Node is voter", rqliteSub, node, - "voter=false (non-voter)", inspector.Low)) + r = append(r, inspector.Pass("rqlite.voter", "Node is non-voter", rqliteSub, node, + "non-voter (no leader data to confirm)", inspector.Low)) } - // 1.9 Num peers — use the node's own /nodes endpoint to determine cluster size - // (not config file, since not all config nodes are necessarily in the Raft cluster) - if rq.Nodes != nil && len(rq.Nodes) > 0 { - expectedPeers := len(rq.Nodes) - 1 // cluster members minus self + // 1.9 Num peers — use leader's /nodes as authoritative cluster size + if leaderNodes != nil && len(leaderNodes) > 0 { + expectedPeers := len(leaderNodes) - 1 // cluster members minus self + if expectedPeers < 0 { + expectedPeers = 0 + } + if s.NumPeers == expectedPeers { + r = append(r, inspector.Pass("rqlite.num_peers", "Peer count matches cluster size", rqliteSub, node, + fmt.Sprintf("peers=%d (cluster has %d nodes)", s.NumPeers, len(leaderNodes)), inspector.Critical)) + } else { + r = append(r, inspector.Warn("rqlite.num_peers", "Peer count matches cluster size", rqliteSub, node, + fmt.Sprintf("peers=%d but leader reports %d members", s.NumPeers, len(leaderNodes)), inspector.High)) + } + } else if rq.Nodes != nil && len(rq.Nodes) > 0 { + // Fallback: use node's own /nodes if leader data unavailable + expectedPeers := len(rq.Nodes) - 1 if expectedPeers < 0 { expectedPeers = 0 } @@ -332,7 +388,7 @@ func checkRQLitePerNode(nd *inspector.NodeData, data *inspector.ClusterData) []i return r } -func checkRQLiteCrossNode(data *inspector.ClusterData) []inspector.CheckResult { +func checkRQLiteCrossNode(data *inspector.ClusterData, leaderNodes map[string]*inspector.RQLiteNode) []inspector.CheckResult { var r []inspector.CheckResult type nodeInfo struct { @@ -506,13 +562,25 @@ func checkRQLiteCrossNode(data *inspector.ClusterData) []inspector.CheckResult { } } - // 1.42 Quorum math + // 1.42 Quorum math — use leader's /nodes as authoritative voter source voters := 0 reachableVoters := 0 - for _, n := range nodes { - if n.status.Voter { - voters++ - reachableVoters++ // responded to SSH + curl = reachable + if leaderNodes != nil && len(leaderNodes) > 0 { + for _, ln := range leaderNodes { + if ln.Voter { + voters++ + if ln.Reachable { + reachableVoters++ + } + } + } + } else { + // Fallback: use each node's self-reported voter status + for _, n := range nodes { + if n.status.Voter { + voters++ + reachableVoters++ // responded to SSH + curl = reachable + } } } quorumNeeded := int(math.Floor(float64(voters)/2)) + 1 diff --git a/pkg/inspector/collector.go b/pkg/inspector/collector.go index faae327..dcdc663 100644 --- a/pkg/inspector/collector.go +++ b/pkg/inspector/collector.go @@ -307,7 +307,7 @@ func collectNode(ctx context.Context, node Node, subsystems []string, verbose bo if shouldCollect("network") { nd.Network = collectNetwork(ctx, node, nd.WireGuard) } - if shouldCollect("anyone") { + if shouldCollect("anyone") && !node.IsNameserver() { nd.Anyone = collectAnyone(ctx, node) } // Namespace collection — always collect if any subsystem is collected diff --git a/pkg/rqlite/process.go b/pkg/rqlite/process.go index 61c6cff..85d2e3f 100644 --- a/pkg/rqlite/process.go +++ b/pkg/rqlite/process.go @@ -306,18 +306,52 @@ func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool { } nodesURL := fmt.Sprintf("http://%s:%d/nodes?timeout=2s", host, r.config.RQLitePort) + // Retry with backoff — network (WireGuard) may not be ready immediately. + // Defaulting to voter on failure is dangerous: it creates excess voters + // that can cause split-brain during leader failover. + const maxRetries = 5 + var lastErr error + + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + delay := time.Duration(attempt*2) * time.Second + r.logger.Info("Retrying voter check", + zap.Int("attempt", attempt+1), + zap.Duration("delay", delay)) + time.Sleep(delay) + } + + voterCount, err := r.queryVoterCount(nodesURL) + if err != nil { + lastErr = err + continue + } + + r.logger.Info("Checked existing voter count from join target", + zap.Int("reachable_voters", voterCount), + zap.Int("max_voters", MaxDefaultVoters)) + + return voterCount >= MaxDefaultVoters + } + + r.logger.Warn("Could not determine voter count after retries, defaulting to voter", + zap.Int("attempts", maxRetries), + zap.Error(lastErr)) + return false +} + +// queryVoterCount queries the /nodes endpoint and returns the number of reachable voters. +func (r *RQLiteManager) queryVoterCount(nodesURL string) (int, error) { client := tlsutil.NewHTTPClient(5 * time.Second) resp, err := client.Get(nodesURL) if err != nil { - r.logger.Warn("Could not query /nodes to check voter count, defaulting to voter", zap.Error(err)) - return false + return 0, fmt.Errorf("query /nodes: %w", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - r.logger.Warn("Could not read /nodes response, defaulting to voter", zap.Error(err)) - return false + return 0, fmt.Errorf("read /nodes response: %w", err) } var nodes map[string]struct { @@ -325,8 +359,7 @@ func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool { Reachable bool `json:"reachable"` } if err := json.Unmarshal(body, &nodes); err != nil { - r.logger.Warn("Could not parse /nodes response, defaulting to voter", zap.Error(err)) - return false + return 0, fmt.Errorf("parse /nodes response: %w", err) } voterCount := 0 @@ -336,11 +369,7 @@ func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool { } } - r.logger.Info("Checked existing voter count from join target", - zap.Int("reachable_voters", voterCount), - zap.Int("max_voters", MaxDefaultVoters)) - - return voterCount >= MaxDefaultVoters + return voterCount, nil } // waitForJoinTarget waits until the join target's HTTP status becomes reachable diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 87f7e75..cd82081 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -68,6 +68,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error { if r.discoveryService != nil { go r.startHealthMonitoring(ctx) + go r.startVoterReconciliation(ctx) } if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil { diff --git a/pkg/rqlite/voter_reconciliation.go b/pkg/rqlite/voter_reconciliation.go new file mode 100644 index 0000000..6ab9e89 --- /dev/null +++ b/pkg/rqlite/voter_reconciliation.go @@ -0,0 +1,236 @@ +package rqlite + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "go.uber.org/zap" +) + +// startVoterReconciliation periodically checks and corrects voter/non-voter +// assignments. Only takes effect on the leader node. Corrects at most one +// node per cycle to minimize disruption. +func (r *RQLiteManager) startVoterReconciliation(ctx context.Context) { + // Wait for cluster to stabilize after startup + time.Sleep(3 * time.Minute) + + ticker := time.NewTicker(2 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := r.reconcileVoters(); err != nil { + r.logger.Debug("Voter reconciliation skipped", zap.Error(err)) + } + } + } +} + +// reconcileVoters compares actual cluster voter assignments (from RQLite's +// /nodes endpoint) against the deterministic desired set (computeVoterSet) +// and corrects mismatches. Uses remove + re-join since RQLite's /join +// ignores voter flag changes for existing members. +// +// Safety: only runs on the leader, only when all nodes are reachable, +// never demotes the leader, and fixes at most one node per cycle. +func (r *RQLiteManager) reconcileVoters() error { + // 1. Only the leader reconciles + status, err := r.getRQLiteStatus() + if err != nil { + return fmt.Errorf("get status: %w", err) + } + if status.Store.Raft.State != "Leader" { + return nil + } + + // 2. Get all cluster nodes including non-voters + nodes, err := r.getAllClusterNodes() + if err != nil { + return fmt.Errorf("get all nodes: %w", err) + } + + if len(nodes) <= MaxDefaultVoters { + return nil // Small cluster — all nodes should be voters + } + + // 3. Only reconcile when every node is reachable (stable cluster) + for _, n := range nodes { + if !n.Reachable { + return nil + } + } + + // 4. Compute desired voter set from raft addresses + raftAddrs := make([]string, 0, len(nodes)) + for _, n := range nodes { + raftAddrs = append(raftAddrs, n.ID) + } + desiredVoters := computeVoterSet(raftAddrs, MaxDefaultVoters) + + // 5. Safety: never demote ourselves (the current leader) + myRaftAddr := status.Store.Raft.LeaderID + if _, shouldBeVoter := desiredVoters[myRaftAddr]; !shouldBeVoter { + r.logger.Warn("Leader is not in computed voter set — skipping reconciliation", + zap.String("leader_id", myRaftAddr)) + return nil + } + + // 6. Find one mismatch to fix (one change per cycle) + for _, n := range nodes { + _, shouldBeVoter := desiredVoters[n.ID] + + if n.Voter && !shouldBeVoter { + // Skip if this is the leader + if n.ID == myRaftAddr { + continue + } + + r.logger.Info("Demoting excess voter to non-voter", + zap.String("node_id", n.ID)) + + if err := r.changeNodeVoterStatus(n.ID, false); err != nil { + r.logger.Warn("Failed to demote voter", + zap.String("node_id", n.ID), + zap.Error(err)) + return err + } + + r.logger.Info("Successfully demoted voter to non-voter", + zap.String("node_id", n.ID)) + return nil // One change per cycle + } + + if !n.Voter && shouldBeVoter { + r.logger.Info("Promoting non-voter to voter", + zap.String("node_id", n.ID)) + + if err := r.changeNodeVoterStatus(n.ID, true); err != nil { + r.logger.Warn("Failed to promote non-voter", + zap.String("node_id", n.ID), + zap.Error(err)) + return err + } + + r.logger.Info("Successfully promoted non-voter to voter", + zap.String("node_id", n.ID)) + return nil + } + } + + return nil +} + +// changeNodeVoterStatus changes a node's voter status by removing it from the +// cluster and immediately re-adding it with the desired voter flag. +// This is necessary because RQLite's /join endpoint ignores voter flag changes +// for nodes that are already cluster members with the same ID and address. +func (r *RQLiteManager) changeNodeVoterStatus(nodeID string, voter bool) error { + // Step 1: Remove the node from the cluster + if err := r.removeClusterNode(nodeID); err != nil { + return fmt.Errorf("remove node: %w", err) + } + + // Brief pause for Raft to commit the configuration change + time.Sleep(2 * time.Second) + + // Step 2: Re-add with the correct voter status + if err := r.joinClusterNode(nodeID, nodeID, voter); err != nil { + return fmt.Errorf("rejoin node: %w", err) + } + + return nil +} + +// getAllClusterNodes queries /nodes?nonvoters&ver=2 to get all cluster members +// including non-voters. +func (r *RQLiteManager) getAllClusterNodes() (RQLiteNodes, error) { + url := fmt.Sprintf("http://localhost:%d/nodes?nonvoters&ver=2&timeout=5s", r.config.RQLitePort) + client := &http.Client{Timeout: 10 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("query nodes: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("nodes returned %d: %s", resp.StatusCode, string(body)) + } + + // Try ver=2 wrapped format first + var wrapped struct { + Nodes RQLiteNodes `json:"nodes"` + } + if err := json.Unmarshal(body, &wrapped); err == nil && wrapped.Nodes != nil { + return wrapped.Nodes, nil + } + + // Fall back to plain array + var nodes RQLiteNodes + if err := json.Unmarshal(body, &nodes); err != nil { + return nil, fmt.Errorf("parse nodes: %w", err) + } + return nodes, nil +} + +// removeClusterNode sends DELETE /remove to remove a node from the Raft cluster. +func (r *RQLiteManager) removeClusterNode(nodeID string) error { + url := fmt.Sprintf("http://localhost:%d/remove", r.config.RQLitePort) + payload, _ := json.Marshal(map[string]string{"id": nodeID}) + + req, err := http.NewRequest(http.MethodDelete, url, bytes.NewReader(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("remove request: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("remove returned %d: %s", resp.StatusCode, string(body)) + } + return nil +} + +// joinClusterNode sends POST /join to add a node to the Raft cluster +// with the specified voter status. +func (r *RQLiteManager) joinClusterNode(nodeID, raftAddr string, voter bool) error { + url := fmt.Sprintf("http://localhost:%d/join", r.config.RQLitePort) + payload, _ := json.Marshal(map[string]interface{}{ + "id": nodeID, + "addr": raftAddr, + "voter": voter, + }) + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Post(url, "application/json", bytes.NewReader(payload)) + if err != nil { + return fmt.Errorf("join request: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("join returned %d: %s", resp.StatusCode, string(body)) + } + return nil +}