orama/pkg/rqlite/process.go
anonpenguin23 fd87eec476 feat(security): add manifest signing, TLS TOFU, refresh token migration
- Invalidate plaintext refresh tokens (migration 019)
- Add `--sign` flag to `orama build` for rootwallet manifest signing
- Add `--ca-fingerprint` TOFU verification for production joins/invites
- Save cluster secrets from join (RQLite auth, Olric key, IPFS peers)
- Add RQLite auth config fields
2026-02-28 15:40:43 +02:00

463 lines
14 KiB
Go

package rqlite
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/DeBrosOfficial/network/pkg/tlsutil"
"github.com/rqlite/gorqlite"
"go.uber.org/zap"
)
// killOrphanedRQLite kills any orphaned rqlited process still holding the port.
// This can happen when the parent node process crashes and rqlited keeps running.
func (r *RQLiteManager) killOrphanedRQLite() {
// Check if port is already in use by querying the status endpoint
url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort)
client := &http.Client{Timeout: 2 * time.Second}
resp, err := client.Get(url)
if err != nil {
return // Port not in use, nothing to clean up
}
resp.Body.Close()
// Port is in use — find and kill the orphaned process
r.logger.Warn("Found orphaned rqlited process on port, killing it",
zap.Int("port", r.config.RQLitePort))
// Use fuser to find and kill the process holding the port
cmd := exec.Command("fuser", "-k", fmt.Sprintf("%d/tcp", r.config.RQLitePort))
if err := cmd.Run(); err != nil {
r.logger.Warn("fuser failed, trying lsof", zap.Error(err))
// Fallback: use lsof
out, err := exec.Command("lsof", "-ti", fmt.Sprintf(":%d", r.config.RQLitePort)).Output()
if err == nil {
for _, pidStr := range strings.Split(strings.TrimSpace(string(out)), "\n") {
if pidStr != "" {
killCmd := exec.Command("kill", "-9", pidStr)
killCmd.Run()
}
}
}
}
// Wait for port to be released
for i := 0; i < 10; i++ {
time.Sleep(500 * time.Millisecond)
resp, err := client.Get(url)
if err != nil {
return // Port released
}
resp.Body.Close()
}
r.logger.Warn("Could not release port from orphaned process")
}
// launchProcess starts the RQLite process with appropriate arguments
func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) error {
// Kill any orphaned rqlited from a previous crash
r.killOrphanedRQLite()
// Remove stale peers.json from the raft directory to prevent rqlite v8
// from triggering automatic Raft recovery on normal restarts.
//
// Only delete when raft.db EXISTS (normal restart). If raft.db does NOT
// exist, peers.json was likely placed intentionally by ForceWritePeersJSON()
// as part of a recovery flow (clearRaftState + ForceWritePeersJSON + launch).
stalePeersPath := filepath.Join(rqliteDataDir, "raft", "peers.json")
raftDBPath := filepath.Join(rqliteDataDir, "raft.db")
if _, err := os.Stat(stalePeersPath); err == nil {
if _, err := os.Stat(raftDBPath); err == nil {
// raft.db exists → this is a normal restart, peers.json is stale
r.logger.Warn("Removing stale peers.json from raft directory to prevent accidental recovery",
zap.String("path", stalePeersPath))
_ = os.Remove(stalePeersPath)
_ = os.Remove(stalePeersPath + ".backup")
_ = os.Remove(stalePeersPath + ".tmp")
} else {
// raft.db missing → intentional recovery, keep peers.json for rqlited
r.logger.Info("Keeping peers.json in raft directory for intentional cluster recovery",
zap.String("path", stalePeersPath))
}
}
// Build RQLite command
args := []string{
"-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort),
"-http-adv-addr", r.discoverConfig.HttpAdvAddress,
"-raft-adv-addr", r.discoverConfig.RaftAdvAddress,
"-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort),
}
if r.config.NodeCert != "" && r.config.NodeKey != "" {
r.logger.Info("Enabling node-to-node TLS encryption",
zap.String("node_cert", r.config.NodeCert),
zap.String("node_key", r.config.NodeKey))
args = append(args, "-node-cert", r.config.NodeCert)
args = append(args, "-node-key", r.config.NodeKey)
if r.config.NodeCACert != "" {
args = append(args, "-node-ca-cert", r.config.NodeCACert)
}
if r.config.NodeNoVerify {
args = append(args, "-node-no-verify")
}
}
// Raft tuning — higher timeouts suit WireGuard latency
raftElection := r.config.RaftElectionTimeout
if raftElection == 0 {
raftElection = 5 * time.Second
}
raftHeartbeat := r.config.RaftHeartbeatTimeout
if raftHeartbeat == 0 {
raftHeartbeat = 2 * time.Second
}
raftApply := r.config.RaftApplyTimeout
if raftApply == 0 {
raftApply = 30 * time.Second
}
raftLeaderLease := r.config.RaftLeaderLeaseTimeout
if raftLeaderLease == 0 {
raftLeaderLease = 2 * time.Second
}
args = append(args,
"-raft-election-timeout", raftElection.String(),
"-raft-timeout", raftHeartbeat.String(),
"-raft-apply-timeout", raftApply.String(),
"-raft-leader-lease-timeout", raftLeaderLease.String(),
)
// RQLite HTTP Basic Auth — when auth file exists, enforce authentication
if r.config.RQLiteAuthFile != "" {
r.logger.Info("Enabling RQLite HTTP Basic Auth",
zap.String("auth_file", r.config.RQLiteAuthFile))
args = append(args, "-auth", r.config.RQLiteAuthFile)
}
if r.config.RQLiteJoinAddress != "" && !r.hasExistingState(rqliteDataDir) {
r.logger.Info("First-time join to RQLite cluster", zap.String("join_address", r.config.RQLiteJoinAddress))
joinArg := r.config.RQLiteJoinAddress
if strings.HasPrefix(joinArg, "http://") {
joinArg = strings.TrimPrefix(joinArg, "http://")
} else if strings.HasPrefix(joinArg, "https://") {
joinArg = strings.TrimPrefix(joinArg, "https://")
}
joinTimeout := 5 * time.Minute
if err := r.waitForJoinTarget(ctx, r.config.RQLiteJoinAddress, joinTimeout); err != nil {
r.logger.Warn("Join target did not become reachable within timeout; will still attempt to join",
zap.Error(err))
}
args = append(args, "-join", joinArg, "-join-as", r.discoverConfig.RaftAdvAddress, "-join-attempts", "30", "-join-interval", "10s")
// Check if this node should join as a non-voter (read replica).
// Query the join target's /nodes endpoint to count existing voters,
// rather than relying on LibP2P peer count which is incomplete at join time.
if shouldBeNonVoter := r.checkShouldBeNonVoter(r.config.RQLiteJoinAddress); shouldBeNonVoter {
r.logger.Info("Joining as non-voter (read replica) - cluster already has max voters",
zap.String("raft_address", r.discoverConfig.RaftAdvAddress),
zap.Int("max_voters", MaxDefaultVoters))
args = append(args, "-raft-non-voter")
}
}
args = append(args, rqliteDataDir)
r.cmd = exec.Command("rqlited", args...)
nodeType := r.nodeType
if nodeType == "" {
nodeType = "node"
}
logsDir := filepath.Join(filepath.Dir(r.dataDir), "logs")
_ = os.MkdirAll(logsDir, 0755)
logPath := filepath.Join(logsDir, fmt.Sprintf("rqlite-%s.log", nodeType))
logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to open log file: %w", err)
}
r.cmd.Stdout = logFile
r.cmd.Stderr = logFile
if err := r.cmd.Start(); err != nil {
logFile.Close()
return fmt.Errorf("failed to start RQLite: %w", err)
}
// Write PID file for reliable orphan detection
pidPath := filepath.Join(logsDir, "rqlited.pid")
_ = os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", r.cmd.Process.Pid)), 0644)
r.logger.Info("RQLite process started", zap.Int("pid", r.cmd.Process.Pid), zap.String("pid_file", pidPath))
// Reap the child process in the background to prevent zombies.
// Stop() waits on this channel instead of calling cmd.Wait() directly.
r.waitDone = make(chan struct{})
go func() {
_ = r.cmd.Wait()
logFile.Close()
close(r.waitDone)
}()
return nil
}
// waitForReadyAndConnect waits for RQLite to be ready and establishes connection
func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error {
if err := r.waitForReady(ctx); err != nil {
if r.cmd != nil && r.cmd.Process != nil {
_ = r.cmd.Process.Kill()
}
return err
}
var conn *gorqlite.Connection
var err error
maxConnectAttempts := 10
connectBackoff := 1 * time.Second
// Use disableClusterDiscovery=true to avoid gorqlite calling /nodes on Open().
// The /nodes endpoint probes all cluster members including unreachable ones,
// which can block for the full HTTP timeout (~10s per attempt).
// This is safe because rqlited followers automatically forward writes to the leader.
connURL := fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort)
for attempt := 0; attempt < maxConnectAttempts; attempt++ {
conn, err = gorqlite.Open(connURL)
if err == nil {
r.connection = conn
break
}
errMsg := err.Error()
if strings.Contains(errMsg, "store is not open") {
r.logger.Debug("RQLite not ready yet, retrying",
zap.Int("attempt", attempt+1),
zap.Error(err))
time.Sleep(connectBackoff)
connectBackoff = time.Duration(float64(connectBackoff) * 1.5)
if connectBackoff > 5*time.Second {
connectBackoff = 5 * time.Second
}
continue
}
if r.cmd != nil && r.cmd.Process != nil {
_ = r.cmd.Process.Kill()
}
return fmt.Errorf("failed to connect to RQLite: %w", err)
}
if conn == nil {
return fmt.Errorf("failed to connect to RQLite after max attempts")
}
_ = r.validateNodeID()
return nil
}
// waitForReady waits for RQLite to be ready to accept connections
func (r *RQLiteManager) waitForReady(ctx context.Context) error {
url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort)
client := tlsutil.NewHTTPClient(2 * time.Second)
for i := 0; i < 180; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}
resp, err := client.Get(url)
if err == nil && resp.StatusCode == http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
var statusResp map[string]interface{}
if err := json.Unmarshal(body, &statusResp); err == nil {
if raft, ok := statusResp["raft"].(map[string]interface{}); ok {
state, _ := raft["state"].(string)
if state == "leader" || state == "follower" {
return nil
}
} else {
return nil // Backwards compatibility
}
}
}
}
return fmt.Errorf("RQLite did not become ready within timeout")
}
// waitForSQLAvailable waits until a simple query succeeds
func (r *RQLiteManager) waitForSQLAvailable(ctx context.Context) error {
r.logger.Info("Waiting for SQL to become available...")
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
attempts := 0
for {
select {
case <-ctx.Done():
r.logger.Error("waitForSQLAvailable timed out", zap.Int("attempts", attempts))
return ctx.Err()
case <-ticker.C:
attempts++
if r.connection == nil {
r.logger.Warn("connection is nil in waitForSQLAvailable")
continue
}
_, err := r.connection.QueryOne("SELECT 1")
if err == nil {
r.logger.Info("SQL is available", zap.Int("attempts", attempts))
return nil
}
if attempts <= 5 || attempts%10 == 0 {
r.logger.Debug("SQL not yet available", zap.Int("attempt", attempts), zap.Error(err))
}
}
}
}
// testJoinAddress tests if a join address is reachable
func (r *RQLiteManager) testJoinAddress(joinAddress string) error {
client := tlsutil.NewHTTPClient(5 * time.Second)
var statusURL string
if strings.HasPrefix(joinAddress, "http://") || strings.HasPrefix(joinAddress, "https://") {
statusURL = strings.TrimRight(joinAddress, "/") + "/status"
} else {
host := joinAddress
if idx := strings.Index(joinAddress, ":"); idx != -1 {
host = joinAddress[:idx]
}
statusURL = fmt.Sprintf("http://%s:%d/status", host, 5001)
}
resp, err := client.Get(statusURL)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("leader returned status %d", resp.StatusCode)
}
return nil
}
// checkShouldBeNonVoter queries the join target's /nodes endpoint to count
// existing voters. Returns true if the cluster already has MaxDefaultVoters
// voters, meaning this node should join as a non-voter.
func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool {
// Derive HTTP API URL from the join address (which is a raft address like 10.0.0.1:7001)
host := joinAddress
if strings.HasPrefix(host, "http://") || strings.HasPrefix(host, "https://") {
host = strings.TrimPrefix(host, "http://")
host = strings.TrimPrefix(host, "https://")
}
if idx := strings.Index(host, ":"); idx != -1 {
host = host[:idx]
}
nodesURL := fmt.Sprintf("http://%s:%d/nodes?timeout=2s", host, r.config.RQLitePort)
// Retry with backoff — network (WireGuard) may not be ready immediately.
// Defaulting to voter on failure is dangerous: it creates excess voters
// that can cause split-brain during leader failover.
const maxRetries = 5
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
delay := time.Duration(attempt*2) * time.Second
r.logger.Info("Retrying voter check",
zap.Int("attempt", attempt+1),
zap.Duration("delay", delay))
time.Sleep(delay)
}
voterCount, err := r.queryVoterCount(nodesURL)
if err != nil {
lastErr = err
continue
}
r.logger.Info("Checked existing voter count from join target",
zap.Int("reachable_voters", voterCount),
zap.Int("max_voters", MaxDefaultVoters))
return voterCount >= MaxDefaultVoters
}
r.logger.Warn("Could not determine voter count after retries, defaulting to voter",
zap.Int("attempts", maxRetries),
zap.Error(lastErr))
return false
}
// queryVoterCount queries the /nodes endpoint and returns the number of reachable voters.
func (r *RQLiteManager) queryVoterCount(nodesURL string) (int, error) {
client := tlsutil.NewHTTPClient(5 * time.Second)
resp, err := client.Get(nodesURL)
if err != nil {
return 0, fmt.Errorf("query /nodes: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("read /nodes response: %w", err)
}
var nodes map[string]struct {
Voter bool `json:"voter"`
Reachable bool `json:"reachable"`
}
if err := json.Unmarshal(body, &nodes); err != nil {
return 0, fmt.Errorf("parse /nodes response: %w", err)
}
voterCount := 0
for _, n := range nodes {
if n.Voter && n.Reachable {
voterCount++
}
}
return voterCount, nil
}
// waitForJoinTarget waits until the join target's HTTP status becomes reachable
func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
var lastErr error
for time.Now().Before(deadline) {
if err := r.testJoinAddress(joinAddress); err == nil {
return nil
} else {
lastErr = err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
}
}
return lastErr
}