mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 11:46:58 +00:00
154 lines
4.3 KiB
Go
154 lines
4.3 KiB
Go
package rqlite
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/DeBrosOfficial/network/migrations"
|
|
"github.com/DeBrosOfficial/network/pkg/config"
|
|
"github.com/rqlite/gorqlite"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// RQLiteManager manages an RQLite node instance
|
|
type RQLiteManager struct {
|
|
config *config.DatabaseConfig
|
|
discoverConfig *config.DiscoveryConfig
|
|
dataDir string
|
|
nodeType string // Node type identifier
|
|
logger *zap.Logger
|
|
cmd *exec.Cmd
|
|
connection *gorqlite.Connection
|
|
discoveryService *ClusterDiscoveryService
|
|
waitDone chan struct{} // closed when cmd.Wait() completes (reaps zombie)
|
|
}
|
|
|
|
// NewRQLiteManager creates a new RQLite manager
|
|
func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.DiscoveryConfig, dataDir string, logger *zap.Logger) *RQLiteManager {
|
|
return &RQLiteManager{
|
|
config: cfg,
|
|
discoverConfig: discoveryCfg,
|
|
dataDir: dataDir,
|
|
logger: logger.With(zap.String("component", "rqlite-manager")),
|
|
}
|
|
}
|
|
|
|
// Start starts the RQLite node
|
|
func (r *RQLiteManager) Start(ctx context.Context) error {
|
|
rqliteDataDir, err := r.prepareDataDir()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if r.discoverConfig.HttpAdvAddress == "" {
|
|
return fmt.Errorf("discovery config HttpAdvAddress is empty")
|
|
}
|
|
|
|
if r.discoveryService != nil {
|
|
if err := r.waitForMinClusterSizeBeforeStart(ctx, rqliteDataDir); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if needsClusterRecovery, err := r.checkNeedsClusterRecovery(rqliteDataDir); err == nil && needsClusterRecovery {
|
|
if err := r.performPreStartClusterDiscovery(ctx, rqliteDataDir); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := r.launchProcess(ctx, rqliteDataDir); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := r.waitForReadyAndConnect(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if r.discoveryService != nil {
|
|
go r.startHealthMonitoring(ctx)
|
|
go r.startVoterReconciliation(ctx)
|
|
go r.startOrphanedNodeRecovery(ctx) // C1 fix: recover nodes orphaned by failed voter changes
|
|
}
|
|
|
|
// Start child process watchdog to detect and recover from crashes
|
|
go r.startProcessWatchdog(ctx)
|
|
|
|
// Start periodic RQLite backup loop (leader-only, self-checking)
|
|
go r.startBackupLoop(ctx)
|
|
|
|
if err := r.establishLeadershipOrJoin(ctx, rqliteDataDir); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Apply embedded migrations - these are compiled into the binary
|
|
if err := r.ApplyEmbeddedMigrations(ctx, migrations.FS); err != nil {
|
|
r.logger.Error("Failed to apply embedded migrations", zap.Error(err))
|
|
// Don't fail startup - migrations may have already been applied by another node
|
|
// or we may be joining an existing cluster
|
|
} else {
|
|
r.logger.Info("Database migrations applied successfully")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetConnection returns the RQLite connection
|
|
func (r *RQLiteManager) GetConnection() *gorqlite.Connection {
|
|
return r.connection
|
|
}
|
|
|
|
// Stop stops the RQLite node gracefully.
|
|
// If this node is the Raft leader, it attempts a leadership transfer first
|
|
// to minimize cluster disruption.
|
|
func (r *RQLiteManager) Stop() error {
|
|
if r.connection != nil {
|
|
r.connection.Close()
|
|
r.connection = nil
|
|
}
|
|
|
|
if r.cmd == nil || r.cmd.Process == nil {
|
|
return nil
|
|
}
|
|
|
|
// Attempt leadership transfer if we are the leader
|
|
r.transferLeadershipIfLeader()
|
|
|
|
_ = r.cmd.Process.Signal(syscall.SIGTERM)
|
|
|
|
// Wait for the background reaper goroutine (started in launchProcess) to
|
|
// collect the child process. This avoids a double cmd.Wait() panic.
|
|
if r.waitDone != nil {
|
|
select {
|
|
case <-r.waitDone:
|
|
case <-time.After(30 * time.Second):
|
|
r.logger.Warn("RQLite did not stop within 30s, sending SIGKILL")
|
|
_ = r.cmd.Process.Kill()
|
|
<-r.waitDone // wait for reaper after kill
|
|
}
|
|
}
|
|
|
|
// Clean up PID file
|
|
r.cleanupPIDFile()
|
|
|
|
return nil
|
|
}
|
|
|
|
// transferLeadershipIfLeader checks if this node is the Raft leader and
|
|
// requests a leadership transfer to minimize election disruption.
|
|
func (r *RQLiteManager) transferLeadershipIfLeader() {
|
|
if err := TransferLeadership(r.config.RQLitePort, r.logger); err != nil {
|
|
r.logger.Warn("Leadership transfer failed, relying on SIGTERM", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// cleanupPIDFile removes the PID file on shutdown
|
|
func (r *RQLiteManager) cleanupPIDFile() {
|
|
logsDir := fmt.Sprintf("%s/../logs", r.dataDir)
|
|
pidPath := logsDir + "/rqlited.pid"
|
|
_ = os.Remove(pidPath)
|
|
}
|