mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 09:36:56 +00:00
Improved collector added anyone on health check
This commit is contained in:
parent
83bd495f0f
commit
1d186706f6
2
Makefile
2
Makefile
@ -86,7 +86,7 @@ test-e2e-quick:
|
|||||||
|
|
||||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill redeploy-devnet redeploy-testnet release health
|
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill redeploy-devnet redeploy-testnet release health
|
||||||
|
|
||||||
VERSION := 0.102.4
|
VERSION := 0.102.5
|
||||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/DeBrosOfficial/network/pkg/anyoneproxy"
|
||||||
"github.com/DeBrosOfficial/network/pkg/client"
|
"github.com/DeBrosOfficial/network/pkg/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -51,7 +52,7 @@ func (g *Gateway) healthHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
name string
|
name string
|
||||||
result checkResult
|
result checkResult
|
||||||
}
|
}
|
||||||
ch := make(chan namedResult, 4)
|
ch := make(chan namedResult, 5)
|
||||||
|
|
||||||
// RQLite
|
// RQLite
|
||||||
go func() {
|
go func() {
|
||||||
@ -118,9 +119,25 @@ func (g *Gateway) healthHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
ch <- nr
|
ch <- nr
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Anyone proxy (SOCKS5)
|
||||||
|
go func() {
|
||||||
|
nr := namedResult{name: "anyone"}
|
||||||
|
if !anyoneproxy.Enabled() {
|
||||||
|
nr.result = checkResult{Status: "unavailable"}
|
||||||
|
} else {
|
||||||
|
start := time.Now()
|
||||||
|
if anyoneproxy.Running() {
|
||||||
|
nr.result = checkResult{Status: "ok", Latency: time.Since(start).String()}
|
||||||
|
} else {
|
||||||
|
nr.result = checkResult{Status: "error", Latency: time.Since(start).String(), Error: "SOCKS5 proxy not reachable at " + anyoneproxy.Address()}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ch <- nr
|
||||||
|
}()
|
||||||
|
|
||||||
// Collect
|
// Collect
|
||||||
checks := make(map[string]checkResult, 4)
|
checks := make(map[string]checkResult, 5)
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
nr := <-ch
|
nr := <-ch
|
||||||
checks[nr.name] = nr.result
|
checks[nr.name] = nr.result
|
||||||
}
|
}
|
||||||
|
|||||||
@ -79,11 +79,13 @@ func checkNetworkPerNode(nd *inspector.NodeData) []inspector.CheckResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 7.8 TCP retransmission rate
|
// 7.8 TCP retransmission rate
|
||||||
|
// Thresholds are relaxed for WireGuard-encapsulated traffic across VPS providers:
|
||||||
|
// <2% normal, 2-10% elevated (warn), >=10% problematic (fail).
|
||||||
if net.TCPRetransRate >= 0 {
|
if net.TCPRetransRate >= 0 {
|
||||||
if net.TCPRetransRate < 1 {
|
if net.TCPRetransRate < 2 {
|
||||||
r = append(r, inspector.Pass("network.tcp_retrans", "TCP retransmission rate low", networkSub, node,
|
r = append(r, inspector.Pass("network.tcp_retrans", "TCP retransmission rate low", networkSub, node,
|
||||||
fmt.Sprintf("retrans=%.2f%%", net.TCPRetransRate), inspector.Medium))
|
fmt.Sprintf("retrans=%.2f%%", net.TCPRetransRate), inspector.Medium))
|
||||||
} else if net.TCPRetransRate < 5 {
|
} else if net.TCPRetransRate < 10 {
|
||||||
r = append(r, inspector.Warn("network.tcp_retrans", "TCP retransmission rate low", networkSub, node,
|
r = append(r, inspector.Warn("network.tcp_retrans", "TCP retransmission rate low", networkSub, node,
|
||||||
fmt.Sprintf("retrans=%.2f%% (elevated)", net.TCPRetransRate), inspector.Medium))
|
fmt.Sprintf("retrans=%.2f%% (elevated)", net.TCPRetransRate), inspector.Medium))
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -93,9 +93,9 @@ func TestCheckNetwork_TCPRetransmission(t *testing.T) {
|
|||||||
rate float64
|
rate float64
|
||||||
status inspector.Status
|
status inspector.Status
|
||||||
}{
|
}{
|
||||||
{"low", 0.1, inspector.StatusPass},
|
{"low", 0.5, inspector.StatusPass},
|
||||||
{"elevated", 3.0, inspector.StatusWarn},
|
{"elevated", 6.0, inspector.StatusWarn},
|
||||||
{"high", 8.0, inspector.StatusFail},
|
{"high", 12.0, inspector.StatusFail},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
|||||||
@ -18,21 +18,57 @@ const rqliteSub = "rqlite"
|
|||||||
func CheckRQLite(data *inspector.ClusterData) []inspector.CheckResult {
|
func CheckRQLite(data *inspector.ClusterData) []inspector.CheckResult {
|
||||||
var results []inspector.CheckResult
|
var results []inspector.CheckResult
|
||||||
|
|
||||||
|
// Find the leader's authoritative /nodes data
|
||||||
|
leaderNodes := findLeaderNodes(data)
|
||||||
|
|
||||||
// Per-node checks
|
// Per-node checks
|
||||||
for _, nd := range data.Nodes {
|
for _, nd := range data.Nodes {
|
||||||
if nd.RQLite == nil {
|
if nd.RQLite == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
results = append(results, checkRQLitePerNode(nd, data)...)
|
results = append(results, checkRQLitePerNode(nd, data, leaderNodes)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cross-node checks
|
// Cross-node checks
|
||||||
results = append(results, checkRQLiteCrossNode(data)...)
|
results = append(results, checkRQLiteCrossNode(data, leaderNodes)...)
|
||||||
|
|
||||||
return results
|
return results
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkRQLitePerNode(nd *inspector.NodeData, data *inspector.ClusterData) []inspector.CheckResult {
|
// findLeaderNodes returns the leader's /nodes map as the authoritative cluster membership.
|
||||||
|
func findLeaderNodes(data *inspector.ClusterData) map[string]*inspector.RQLiteNode {
|
||||||
|
for _, nd := range data.Nodes {
|
||||||
|
if nd.RQLite != nil && nd.RQLite.Status != nil && nd.RQLite.Status.RaftState == "Leader" && nd.RQLite.Nodes != nil {
|
||||||
|
return nd.RQLite.Nodes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodeIP extracts the IP from a "host:port" address.
|
||||||
|
func nodeIP(addr string) string {
|
||||||
|
if idx := strings.LastIndex(addr, ":"); idx >= 0 {
|
||||||
|
return addr[:idx]
|
||||||
|
}
|
||||||
|
return addr
|
||||||
|
}
|
||||||
|
|
||||||
|
// lookupInLeaderNodes finds a node in the leader's /nodes map by matching IP.
|
||||||
|
// Leader's /nodes keys use HTTP port (5001), while node IDs use Raft port (7001).
|
||||||
|
func lookupInLeaderNodes(leaderNodes map[string]*inspector.RQLiteNode, nodeID string) *inspector.RQLiteNode {
|
||||||
|
if leaderNodes == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ip := nodeIP(nodeID)
|
||||||
|
for addr, n := range leaderNodes {
|
||||||
|
if nodeIP(addr) == ip {
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkRQLitePerNode(nd *inspector.NodeData, data *inspector.ClusterData, leaderNodes map[string]*inspector.RQLiteNode) []inspector.CheckResult {
|
||||||
var r []inspector.CheckResult
|
var r []inspector.CheckResult
|
||||||
rq := nd.RQLite
|
rq := nd.RQLite
|
||||||
node := nd.Node.Name()
|
node := nd.Node.Name()
|
||||||
@ -99,19 +135,39 @@ func checkRQLitePerNode(nd *inspector.NodeData, data *inspector.ClusterData) []i
|
|||||||
fmt.Sprintf("leader=%s", s.LeaderNodeID), inspector.Critical))
|
fmt.Sprintf("leader=%s", s.LeaderNodeID), inspector.Critical))
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1.8 Voter status
|
// 1.8 Voter status — use leader's /nodes as authoritative source
|
||||||
if s.Voter {
|
if leaderNode := lookupInLeaderNodes(leaderNodes, s.NodeID); leaderNode != nil {
|
||||||
|
if leaderNode.Voter {
|
||||||
|
r = append(r, inspector.Pass("rqlite.voter", "Node is voter", rqliteSub, node,
|
||||||
|
"voter=true (confirmed by leader)", inspector.Low))
|
||||||
|
} else {
|
||||||
|
r = append(r, inspector.Pass("rqlite.voter", "Node is non-voter", rqliteSub, node,
|
||||||
|
"non-voter (by design, confirmed by leader)", inspector.Low))
|
||||||
|
}
|
||||||
|
} else if s.Voter {
|
||||||
r = append(r, inspector.Pass("rqlite.voter", "Node is voter", rqliteSub, node,
|
r = append(r, inspector.Pass("rqlite.voter", "Node is voter", rqliteSub, node,
|
||||||
"voter=true", inspector.Low))
|
"voter=true", inspector.Low))
|
||||||
} else {
|
} else {
|
||||||
r = append(r, inspector.Warn("rqlite.voter", "Node is voter", rqliteSub, node,
|
r = append(r, inspector.Pass("rqlite.voter", "Node is non-voter", rqliteSub, node,
|
||||||
"voter=false (non-voter)", inspector.Low))
|
"non-voter (no leader data to confirm)", inspector.Low))
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1.9 Num peers — use the node's own /nodes endpoint to determine cluster size
|
// 1.9 Num peers — use leader's /nodes as authoritative cluster size
|
||||||
// (not config file, since not all config nodes are necessarily in the Raft cluster)
|
if leaderNodes != nil && len(leaderNodes) > 0 {
|
||||||
if rq.Nodes != nil && len(rq.Nodes) > 0 {
|
expectedPeers := len(leaderNodes) - 1 // cluster members minus self
|
||||||
expectedPeers := len(rq.Nodes) - 1 // cluster members minus self
|
if expectedPeers < 0 {
|
||||||
|
expectedPeers = 0
|
||||||
|
}
|
||||||
|
if s.NumPeers == expectedPeers {
|
||||||
|
r = append(r, inspector.Pass("rqlite.num_peers", "Peer count matches cluster size", rqliteSub, node,
|
||||||
|
fmt.Sprintf("peers=%d (cluster has %d nodes)", s.NumPeers, len(leaderNodes)), inspector.Critical))
|
||||||
|
} else {
|
||||||
|
r = append(r, inspector.Warn("rqlite.num_peers", "Peer count matches cluster size", rqliteSub, node,
|
||||||
|
fmt.Sprintf("peers=%d but leader reports %d members", s.NumPeers, len(leaderNodes)), inspector.High))
|
||||||
|
}
|
||||||
|
} else if rq.Nodes != nil && len(rq.Nodes) > 0 {
|
||||||
|
// Fallback: use node's own /nodes if leader data unavailable
|
||||||
|
expectedPeers := len(rq.Nodes) - 1
|
||||||
if expectedPeers < 0 {
|
if expectedPeers < 0 {
|
||||||
expectedPeers = 0
|
expectedPeers = 0
|
||||||
}
|
}
|
||||||
@ -332,7 +388,7 @@ func checkRQLitePerNode(nd *inspector.NodeData, data *inspector.ClusterData) []i
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkRQLiteCrossNode(data *inspector.ClusterData) []inspector.CheckResult {
|
func checkRQLiteCrossNode(data *inspector.ClusterData, leaderNodes map[string]*inspector.RQLiteNode) []inspector.CheckResult {
|
||||||
var r []inspector.CheckResult
|
var r []inspector.CheckResult
|
||||||
|
|
||||||
type nodeInfo struct {
|
type nodeInfo struct {
|
||||||
@ -506,13 +562,25 @@ func checkRQLiteCrossNode(data *inspector.ClusterData) []inspector.CheckResult {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1.42 Quorum math
|
// 1.42 Quorum math — use leader's /nodes as authoritative voter source
|
||||||
voters := 0
|
voters := 0
|
||||||
reachableVoters := 0
|
reachableVoters := 0
|
||||||
for _, n := range nodes {
|
if leaderNodes != nil && len(leaderNodes) > 0 {
|
||||||
if n.status.Voter {
|
for _, ln := range leaderNodes {
|
||||||
voters++
|
if ln.Voter {
|
||||||
reachableVoters++ // responded to SSH + curl = reachable
|
voters++
|
||||||
|
if ln.Reachable {
|
||||||
|
reachableVoters++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Fallback: use each node's self-reported voter status
|
||||||
|
for _, n := range nodes {
|
||||||
|
if n.status.Voter {
|
||||||
|
voters++
|
||||||
|
reachableVoters++ // responded to SSH + curl = reachable
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
quorumNeeded := int(math.Floor(float64(voters)/2)) + 1
|
quorumNeeded := int(math.Floor(float64(voters)/2)) + 1
|
||||||
|
|||||||
@ -307,7 +307,7 @@ func collectNode(ctx context.Context, node Node, subsystems []string, verbose bo
|
|||||||
if shouldCollect("network") {
|
if shouldCollect("network") {
|
||||||
nd.Network = collectNetwork(ctx, node, nd.WireGuard)
|
nd.Network = collectNetwork(ctx, node, nd.WireGuard)
|
||||||
}
|
}
|
||||||
if shouldCollect("anyone") {
|
if shouldCollect("anyone") && !node.IsNameserver() {
|
||||||
nd.Anyone = collectAnyone(ctx, node)
|
nd.Anyone = collectAnyone(ctx, node)
|
||||||
}
|
}
|
||||||
// Namespace collection — always collect if any subsystem is collected
|
// Namespace collection — always collect if any subsystem is collected
|
||||||
|
|||||||
@ -306,18 +306,52 @@ func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool {
|
|||||||
}
|
}
|
||||||
nodesURL := fmt.Sprintf("http://%s:%d/nodes?timeout=2s", host, r.config.RQLitePort)
|
nodesURL := fmt.Sprintf("http://%s:%d/nodes?timeout=2s", host, r.config.RQLitePort)
|
||||||
|
|
||||||
|
// Retry with backoff — network (WireGuard) may not be ready immediately.
|
||||||
|
// Defaulting to voter on failure is dangerous: it creates excess voters
|
||||||
|
// that can cause split-brain during leader failover.
|
||||||
|
const maxRetries = 5
|
||||||
|
var lastErr error
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
if attempt > 0 {
|
||||||
|
delay := time.Duration(attempt*2) * time.Second
|
||||||
|
r.logger.Info("Retrying voter check",
|
||||||
|
zap.Int("attempt", attempt+1),
|
||||||
|
zap.Duration("delay", delay))
|
||||||
|
time.Sleep(delay)
|
||||||
|
}
|
||||||
|
|
||||||
|
voterCount, err := r.queryVoterCount(nodesURL)
|
||||||
|
if err != nil {
|
||||||
|
lastErr = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
r.logger.Info("Checked existing voter count from join target",
|
||||||
|
zap.Int("reachable_voters", voterCount),
|
||||||
|
zap.Int("max_voters", MaxDefaultVoters))
|
||||||
|
|
||||||
|
return voterCount >= MaxDefaultVoters
|
||||||
|
}
|
||||||
|
|
||||||
|
r.logger.Warn("Could not determine voter count after retries, defaulting to voter",
|
||||||
|
zap.Int("attempts", maxRetries),
|
||||||
|
zap.Error(lastErr))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// queryVoterCount queries the /nodes endpoint and returns the number of reachable voters.
|
||||||
|
func (r *RQLiteManager) queryVoterCount(nodesURL string) (int, error) {
|
||||||
client := tlsutil.NewHTTPClient(5 * time.Second)
|
client := tlsutil.NewHTTPClient(5 * time.Second)
|
||||||
resp, err := client.Get(nodesURL)
|
resp, err := client.Get(nodesURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Warn("Could not query /nodes to check voter count, defaulting to voter", zap.Error(err))
|
return 0, fmt.Errorf("query /nodes: %w", err)
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Warn("Could not read /nodes response, defaulting to voter", zap.Error(err))
|
return 0, fmt.Errorf("read /nodes response: %w", err)
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var nodes map[string]struct {
|
var nodes map[string]struct {
|
||||||
@ -325,8 +359,7 @@ func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool {
|
|||||||
Reachable bool `json:"reachable"`
|
Reachable bool `json:"reachable"`
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(body, &nodes); err != nil {
|
if err := json.Unmarshal(body, &nodes); err != nil {
|
||||||
r.logger.Warn("Could not parse /nodes response, defaulting to voter", zap.Error(err))
|
return 0, fmt.Errorf("parse /nodes response: %w", err)
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
voterCount := 0
|
voterCount := 0
|
||||||
@ -336,11 +369,7 @@ func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
r.logger.Info("Checked existing voter count from join target",
|
return voterCount, nil
|
||||||
zap.Int("reachable_voters", voterCount),
|
|
||||||
zap.Int("max_voters", MaxDefaultVoters))
|
|
||||||
|
|
||||||
return voterCount >= MaxDefaultVoters
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitForJoinTarget waits until the join target's HTTP status becomes reachable
|
// waitForJoinTarget waits until the join target's HTTP status becomes reachable
|
||||||
|
|||||||
@ -68,6 +68,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
|
|||||||
|
|
||||||
if r.discoveryService != nil {
|
if r.discoveryService != nil {
|
||||||
go r.startHealthMonitoring(ctx)
|
go r.startHealthMonitoring(ctx)
|
||||||
|
go r.startVoterReconciliation(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil {
|
if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil {
|
||||||
|
|||||||
236
pkg/rqlite/voter_reconciliation.go
Normal file
236
pkg/rqlite/voter_reconciliation.go
Normal file
@ -0,0 +1,236 @@
|
|||||||
|
package rqlite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// startVoterReconciliation periodically checks and corrects voter/non-voter
|
||||||
|
// assignments. Only takes effect on the leader node. Corrects at most one
|
||||||
|
// node per cycle to minimize disruption.
|
||||||
|
func (r *RQLiteManager) startVoterReconciliation(ctx context.Context) {
|
||||||
|
// Wait for cluster to stabilize after startup
|
||||||
|
time.Sleep(3 * time.Minute)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(2 * time.Minute)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := r.reconcileVoters(); err != nil {
|
||||||
|
r.logger.Debug("Voter reconciliation skipped", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// reconcileVoters compares actual cluster voter assignments (from RQLite's
|
||||||
|
// /nodes endpoint) against the deterministic desired set (computeVoterSet)
|
||||||
|
// and corrects mismatches. Uses remove + re-join since RQLite's /join
|
||||||
|
// ignores voter flag changes for existing members.
|
||||||
|
//
|
||||||
|
// Safety: only runs on the leader, only when all nodes are reachable,
|
||||||
|
// never demotes the leader, and fixes at most one node per cycle.
|
||||||
|
func (r *RQLiteManager) reconcileVoters() error {
|
||||||
|
// 1. Only the leader reconciles
|
||||||
|
status, err := r.getRQLiteStatus()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get status: %w", err)
|
||||||
|
}
|
||||||
|
if status.Store.Raft.State != "Leader" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Get all cluster nodes including non-voters
|
||||||
|
nodes, err := r.getAllClusterNodes()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get all nodes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(nodes) <= MaxDefaultVoters {
|
||||||
|
return nil // Small cluster — all nodes should be voters
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Only reconcile when every node is reachable (stable cluster)
|
||||||
|
for _, n := range nodes {
|
||||||
|
if !n.Reachable {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Compute desired voter set from raft addresses
|
||||||
|
raftAddrs := make([]string, 0, len(nodes))
|
||||||
|
for _, n := range nodes {
|
||||||
|
raftAddrs = append(raftAddrs, n.ID)
|
||||||
|
}
|
||||||
|
desiredVoters := computeVoterSet(raftAddrs, MaxDefaultVoters)
|
||||||
|
|
||||||
|
// 5. Safety: never demote ourselves (the current leader)
|
||||||
|
myRaftAddr := status.Store.Raft.LeaderID
|
||||||
|
if _, shouldBeVoter := desiredVoters[myRaftAddr]; !shouldBeVoter {
|
||||||
|
r.logger.Warn("Leader is not in computed voter set — skipping reconciliation",
|
||||||
|
zap.String("leader_id", myRaftAddr))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6. Find one mismatch to fix (one change per cycle)
|
||||||
|
for _, n := range nodes {
|
||||||
|
_, shouldBeVoter := desiredVoters[n.ID]
|
||||||
|
|
||||||
|
if n.Voter && !shouldBeVoter {
|
||||||
|
// Skip if this is the leader
|
||||||
|
if n.ID == myRaftAddr {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
r.logger.Info("Demoting excess voter to non-voter",
|
||||||
|
zap.String("node_id", n.ID))
|
||||||
|
|
||||||
|
if err := r.changeNodeVoterStatus(n.ID, false); err != nil {
|
||||||
|
r.logger.Warn("Failed to demote voter",
|
||||||
|
zap.String("node_id", n.ID),
|
||||||
|
zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
r.logger.Info("Successfully demoted voter to non-voter",
|
||||||
|
zap.String("node_id", n.ID))
|
||||||
|
return nil // One change per cycle
|
||||||
|
}
|
||||||
|
|
||||||
|
if !n.Voter && shouldBeVoter {
|
||||||
|
r.logger.Info("Promoting non-voter to voter",
|
||||||
|
zap.String("node_id", n.ID))
|
||||||
|
|
||||||
|
if err := r.changeNodeVoterStatus(n.ID, true); err != nil {
|
||||||
|
r.logger.Warn("Failed to promote non-voter",
|
||||||
|
zap.String("node_id", n.ID),
|
||||||
|
zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
r.logger.Info("Successfully promoted non-voter to voter",
|
||||||
|
zap.String("node_id", n.ID))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// changeNodeVoterStatus changes a node's voter status by removing it from the
|
||||||
|
// cluster and immediately re-adding it with the desired voter flag.
|
||||||
|
// This is necessary because RQLite's /join endpoint ignores voter flag changes
|
||||||
|
// for nodes that are already cluster members with the same ID and address.
|
||||||
|
func (r *RQLiteManager) changeNodeVoterStatus(nodeID string, voter bool) error {
|
||||||
|
// Step 1: Remove the node from the cluster
|
||||||
|
if err := r.removeClusterNode(nodeID); err != nil {
|
||||||
|
return fmt.Errorf("remove node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Brief pause for Raft to commit the configuration change
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
// Step 2: Re-add with the correct voter status
|
||||||
|
if err := r.joinClusterNode(nodeID, nodeID, voter); err != nil {
|
||||||
|
return fmt.Errorf("rejoin node: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getAllClusterNodes queries /nodes?nonvoters&ver=2 to get all cluster members
|
||||||
|
// including non-voters.
|
||||||
|
func (r *RQLiteManager) getAllClusterNodes() (RQLiteNodes, error) {
|
||||||
|
url := fmt.Sprintf("http://localhost:%d/nodes?nonvoters&ver=2&timeout=5s", r.config.RQLitePort)
|
||||||
|
client := &http.Client{Timeout: 10 * time.Second}
|
||||||
|
|
||||||
|
resp, err := client.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query nodes: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("nodes returned %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try ver=2 wrapped format first
|
||||||
|
var wrapped struct {
|
||||||
|
Nodes RQLiteNodes `json:"nodes"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(body, &wrapped); err == nil && wrapped.Nodes != nil {
|
||||||
|
return wrapped.Nodes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to plain array
|
||||||
|
var nodes RQLiteNodes
|
||||||
|
if err := json.Unmarshal(body, &nodes); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse nodes: %w", err)
|
||||||
|
}
|
||||||
|
return nodes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeClusterNode sends DELETE /remove to remove a node from the Raft cluster.
|
||||||
|
func (r *RQLiteManager) removeClusterNode(nodeID string) error {
|
||||||
|
url := fmt.Sprintf("http://localhost:%d/remove", r.config.RQLitePort)
|
||||||
|
payload, _ := json.Marshal(map[string]string{"id": nodeID})
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodDelete, url, bytes.NewReader(payload))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
client := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("remove request: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("remove returned %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// joinClusterNode sends POST /join to add a node to the Raft cluster
|
||||||
|
// with the specified voter status.
|
||||||
|
func (r *RQLiteManager) joinClusterNode(nodeID, raftAddr string, voter bool) error {
|
||||||
|
url := fmt.Sprintf("http://localhost:%d/join", r.config.RQLitePort)
|
||||||
|
payload, _ := json.Marshal(map[string]interface{}{
|
||||||
|
"id": nodeID,
|
||||||
|
"addr": raftAddr,
|
||||||
|
"voter": voter,
|
||||||
|
})
|
||||||
|
|
||||||
|
client := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
resp, err := client.Post(url, "application/json", bytes.NewReader(payload))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("join request: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("join returned %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user