orama/pkg/rqlite/instance_spawner.go
anonpenguin23 fd87eec476 feat(security): add manifest signing, TLS TOFU, refresh token migration
- Invalidate plaintext refresh tokens (migration 019)
- Add `--sign` flag to `orama build` for rootwallet manifest signing
- Add `--ca-fingerprint` TOFU verification for production joins/invites
- Save cluster secrets from join (RQLite auth, Olric key, IPFS peers)
- Add RQLite auth config fields
2026-02-28 15:40:43 +02:00

313 lines
9.0 KiB
Go

package rqlite
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"time"
"go.uber.org/zap"
)
// RaftPeer represents a peer entry in RQLite's peers.json recovery file
type RaftPeer struct {
ID string `json:"id"`
Address string `json:"address"`
NonVoter bool `json:"non_voter"`
}
// InstanceConfig contains configuration for spawning a RQLite instance
type InstanceConfig struct {
Namespace string // Namespace this instance belongs to
NodeID string // Node ID where this instance runs
HTTPPort int // HTTP API port
RaftPort int // Raft consensus port
HTTPAdvAddress string // Advertised HTTP address (e.g., "192.168.1.1:10000")
RaftAdvAddress string // Advertised Raft address (e.g., "192.168.1.1:10001")
JoinAddresses []string // Addresses to join (e.g., ["192.168.1.2:10001"])
DataDir string // Data directory for this instance
IsLeader bool // Whether this is the first node (creates cluster)
AuthFile string // Path to RQLite auth JSON file. Empty = no auth enforcement.
}
// Instance represents a running RQLite instance
type Instance struct {
Config InstanceConfig
Process *os.Process
PID int
}
// InstanceSpawner manages RQLite instance lifecycle for namespaces
type InstanceSpawner struct {
baseDataDir string // Base directory for namespace data (e.g., ~/.orama/data/namespaces)
rqlitePath string // Path to rqlited binary
logger *zap.Logger
}
// NewInstanceSpawner creates a new RQLite instance spawner
func NewInstanceSpawner(baseDataDir string, logger *zap.Logger) *InstanceSpawner {
// Find rqlited binary
rqlitePath := "rqlited" // Will use PATH
if path, err := exec.LookPath("rqlited"); err == nil {
rqlitePath = path
}
return &InstanceSpawner{
baseDataDir: baseDataDir,
rqlitePath: rqlitePath,
logger: logger,
}
}
// SpawnInstance starts a new RQLite instance with the given configuration
func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig) (*Instance, error) {
// Create data directory
dataDir := cfg.DataDir
if dataDir == "" {
dataDir = filepath.Join(is.baseDataDir, cfg.Namespace, "rqlite", cfg.NodeID)
}
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create data directory: %w", err)
}
// Build command arguments
// Note: All flags must come BEFORE the data directory argument
args := []string{
"-http-addr", fmt.Sprintf("0.0.0.0:%d", cfg.HTTPPort),
"-raft-addr", fmt.Sprintf("0.0.0.0:%d", cfg.RaftPort),
"-http-adv-addr", cfg.HTTPAdvAddress,
"-raft-adv-addr", cfg.RaftAdvAddress,
}
// Raft tuning — match the global node's tuning for consistency
args = append(args,
"-raft-election-timeout", "5s",
"-raft-timeout", "2s",
"-raft-apply-timeout", "30s",
"-raft-leader-lease-timeout", "2s",
)
// RQLite HTTP Basic Auth
if cfg.AuthFile != "" {
args = append(args, "-auth", cfg.AuthFile)
}
// Add join addresses if not the leader (must be before data directory)
if !cfg.IsLeader && len(cfg.JoinAddresses) > 0 {
for _, addr := range cfg.JoinAddresses {
args = append(args, "-join", addr)
}
// Retry joining for up to 5 minutes (default is 5 attempts / 3s = 15s which is too short
// when all namespace nodes restart simultaneously and the leader isn't ready yet)
args = append(args, "-join-attempts", "30", "-join-interval", "10s")
}
// Data directory must be the last argument
args = append(args, dataDir)
is.logger.Info("Spawning RQLite instance",
zap.String("namespace", cfg.Namespace),
zap.String("node_id", cfg.NodeID),
zap.Int("http_port", cfg.HTTPPort),
zap.Int("raft_port", cfg.RaftPort),
zap.Bool("is_leader", cfg.IsLeader),
zap.Strings("join_addresses", cfg.JoinAddresses),
)
// Start the process
cmd := exec.CommandContext(ctx, is.rqlitePath, args...)
cmd.Dir = dataDir
// Log output
logFile, err := os.OpenFile(
filepath.Join(dataDir, "rqlite.log"),
os.O_CREATE|os.O_WRONLY|os.O_APPEND,
0644,
)
if err == nil {
cmd.Stdout = logFile
cmd.Stderr = logFile
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start rqlited: %w", err)
}
instance := &Instance{
Config: cfg,
Process: cmd.Process,
PID: cmd.Process.Pid,
}
// Wait for the instance to be ready
if err := is.waitForReady(ctx, cfg.HTTPPort); err != nil {
// Kill the process if it didn't start properly
cmd.Process.Kill()
return nil, fmt.Errorf("instance failed to become ready: %w", err)
}
is.logger.Info("RQLite instance started successfully",
zap.String("namespace", cfg.Namespace),
zap.Int("pid", instance.PID),
)
return instance, nil
}
// waitForReady waits for the RQLite instance to be ready to accept connections
func (is *InstanceSpawner) waitForReady(ctx context.Context, httpPort int) error {
url := fmt.Sprintf("http://localhost:%d/status", httpPort)
client := &http.Client{Timeout: 2 * time.Second}
// 6 minutes: must exceed the join retry window (30 attempts * 10s = 5min)
// so we don't kill followers that are still waiting for the leader
deadline := time.Now().Add(6 * time.Minute)
for time.Now().Before(deadline) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
resp, err := client.Get(url)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
}
time.Sleep(500 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for RQLite to be ready on port %d", httpPort)
}
// StopInstance stops a running RQLite instance
func (is *InstanceSpawner) StopInstance(ctx context.Context, instance *Instance) error {
if instance == nil || instance.Process == nil {
return nil
}
is.logger.Info("Stopping RQLite instance",
zap.String("namespace", instance.Config.Namespace),
zap.Int("pid", instance.PID),
)
// Send SIGTERM for graceful shutdown
if err := instance.Process.Signal(os.Interrupt); err != nil {
// If SIGTERM fails, try SIGKILL
if err := instance.Process.Kill(); err != nil {
return fmt.Errorf("failed to kill process: %w", err)
}
}
// Wait for process to exit
done := make(chan error, 1)
go func() {
_, err := instance.Process.Wait()
done <- err
}()
select {
case <-ctx.Done():
instance.Process.Kill()
return ctx.Err()
case err := <-done:
if err != nil {
is.logger.Warn("Process exited with error", zap.Error(err))
}
case <-time.After(10 * time.Second):
instance.Process.Kill()
}
is.logger.Info("RQLite instance stopped",
zap.String("namespace", instance.Config.Namespace),
)
return nil
}
// StopInstanceByPID stops a RQLite instance by its PID
func (is *InstanceSpawner) StopInstanceByPID(pid int) error {
process, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("process not found: %w", err)
}
// Send SIGTERM
if err := process.Signal(os.Interrupt); err != nil {
// Try SIGKILL
if err := process.Kill(); err != nil {
return fmt.Errorf("failed to kill process: %w", err)
}
}
return nil
}
// IsInstanceRunning checks if a RQLite instance is running
func (is *InstanceSpawner) IsInstanceRunning(httpPort int) bool {
url := fmt.Sprintf("http://localhost:%d/status", httpPort)
client := &http.Client{Timeout: 2 * time.Second}
resp, err := client.Get(url)
if err != nil {
return false
}
resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
// HasExistingData checks if a RQLite instance has existing data (raft.db indicates prior startup)
func (is *InstanceSpawner) HasExistingData(namespace, nodeID string) bool {
dataDir := is.GetDataDir(namespace, nodeID)
if _, err := os.Stat(filepath.Join(dataDir, "raft.db")); err == nil {
return true
}
return false
}
// WritePeersJSON writes a peers.json recovery file into the Raft directory.
// This is RQLite's official mechanism for recovering a cluster when all nodes are down.
// On startup, rqlited reads this file, overwrites the Raft peer configuration,
// and renames it to peers.info after recovery.
func (is *InstanceSpawner) WritePeersJSON(dataDir string, peers []RaftPeer) error {
raftDir := filepath.Join(dataDir, "raft")
if err := os.MkdirAll(raftDir, 0755); err != nil {
return fmt.Errorf("failed to create raft directory: %w", err)
}
data, err := json.MarshalIndent(peers, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal peers.json: %w", err)
}
peersPath := filepath.Join(raftDir, "peers.json")
if err := os.WriteFile(peersPath, data, 0644); err != nil {
return fmt.Errorf("failed to write peers.json: %w", err)
}
is.logger.Info("Wrote peers.json for cluster recovery",
zap.String("path", peersPath),
zap.Int("peer_count", len(peers)),
)
return nil
}
// GetDataDir returns the data directory path for a namespace RQLite instance
func (is *InstanceSpawner) GetDataDir(namespace, nodeID string) string {
return filepath.Join(is.baseDataDir, namespace, "rqlite", nodeID)
}
// CleanupDataDir removes the data directory for a namespace RQLite instance
func (is *InstanceSpawner) CleanupDataDir(namespace, nodeID string) error {
dataDir := is.GetDataDir(namespace, nodeID)
return os.RemoveAll(dataDir)
}