mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 08:56:58 +00:00
Bug fixing
This commit is contained in:
parent
765ce46ea7
commit
e3dd359e55
@ -60,6 +60,12 @@ type MCPServer struct {
|
||||
}
|
||||
|
||||
func NewMCPServer(rqliteURL string) (*MCPServer, error) {
|
||||
// Disable gorqlite cluster discovery to avoid /nodes timeouts from unreachable peers
|
||||
if strings.Contains(rqliteURL, "?") {
|
||||
rqliteURL += "&disableClusterDiscovery=true"
|
||||
} else {
|
||||
rqliteURL += "?disableClusterDiscovery=true"
|
||||
}
|
||||
conn, err := gorqlite.Open(rqliteURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -224,7 +224,14 @@ func (d *DatabaseClientImpl) connectToAvailableNode() (*gorqlite.Connection, err
|
||||
var conn *gorqlite.Connection
|
||||
var err error
|
||||
|
||||
conn, err = gorqlite.Open(rqliteURL)
|
||||
// Disable gorqlite cluster discovery to avoid /nodes timeouts from unreachable peers
|
||||
openURL := rqliteURL
|
||||
if strings.Contains(openURL, "?") {
|
||||
openURL += "&disableClusterDiscovery=true"
|
||||
} else {
|
||||
openURL += "?disableClusterDiscovery=true"
|
||||
}
|
||||
conn, err = gorqlite.Open(openURL)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
|
||||
@ -256,6 +256,9 @@ func (ii *IPFSInstaller) configureAddresses(ipfsRepoPath string, apiPort, gatewa
|
||||
addresses["Swarm"] = []string{
|
||||
fmt.Sprintf("/ip4/%s/tcp/%d", bindIP, swarmPort),
|
||||
}
|
||||
// Clear NoAnnounce — the server profile blocks private IPs (10.0.0.0/8, etc.)
|
||||
// which prevents nodes from advertising their WireGuard swarm addresses via DHT
|
||||
addresses["NoAnnounce"] = []string{}
|
||||
|
||||
config["Addresses"] = addresses
|
||||
|
||||
|
||||
@ -235,10 +235,7 @@ SyslogIdentifier=debros-node
|
||||
|
||||
PrivateTmp=yes
|
||||
ProtectHome=read-only
|
||||
ProtectKernelTunables=yes
|
||||
ProtectKernelModules=yes
|
||||
ProtectControlGroups=yes
|
||||
RestrictRealtime=yes
|
||||
ReadWritePaths=%[2]s /etc/systemd/system
|
||||
|
||||
[Install]
|
||||
|
||||
@ -126,6 +126,11 @@ func initializeRQLite(logger *logging.ColoredLogger, cfg *Config, deps *Dependen
|
||||
dsn = "http://localhost:5001"
|
||||
}
|
||||
|
||||
if strings.Contains(dsn, "?") {
|
||||
dsn += "&disableClusterDiscovery=true"
|
||||
} else {
|
||||
dsn += "?disableClusterDiscovery=true"
|
||||
}
|
||||
db, err := sql.Open("rqlite", dsn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open rqlite sql db: %w", err)
|
||||
|
||||
@ -212,6 +212,9 @@ func (n *Node) startWireGuardSyncLoop(ctx context.Context) {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Re-register self on every tick to pick up IPFS peer ID if it wasn't
|
||||
// ready at startup (INSERT OR REPLACE is idempotent)
|
||||
n.ensureWireGuardSelfRegistered(ctx)
|
||||
if err := n.syncWireGuardPeers(ctx); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "WireGuard peer sync failed", zap.Error(err))
|
||||
}
|
||||
|
||||
@ -17,7 +17,7 @@ type RQLiteAdapter struct {
|
||||
// NewRQLiteAdapter creates a new adapter that provides sql.DB interface for RQLite
|
||||
func NewRQLiteAdapter(manager *RQLiteManager) (*RQLiteAdapter, error) {
|
||||
// Use the gorqlite database/sql driver
|
||||
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", manager.config.RQLitePort))
|
||||
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", manager.config.RQLitePort))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open RQLite SQL connection: %w", err)
|
||||
}
|
||||
|
||||
@ -119,7 +119,7 @@ func ApplyMigrationsDirs(ctx context.Context, db *sql.DB, dirs []string, logger
|
||||
|
||||
// ApplyMigrationsFromManager is a convenience helper bound to RQLiteManager.
|
||||
func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error {
|
||||
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
|
||||
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort))
|
||||
if err != nil {
|
||||
return fmt.Errorf("open rqlite db: %w", err)
|
||||
}
|
||||
@ -130,7 +130,7 @@ func (r *RQLiteManager) ApplyMigrations(ctx context.Context, dir string) error {
|
||||
|
||||
// ApplyMigrationsDirs is the multi-dir variant on RQLiteManager.
|
||||
func (r *RQLiteManager) ApplyMigrationsDirs(ctx context.Context, dirs []string) error {
|
||||
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
|
||||
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort))
|
||||
if err != nil {
|
||||
return fmt.Errorf("open rqlite db: %w", err)
|
||||
}
|
||||
@ -474,7 +474,7 @@ func ApplyEmbeddedMigrations(ctx context.Context, db *sql.DB, fsys fs.FS, logger
|
||||
|
||||
// ApplyEmbeddedMigrations is a convenience helper bound to RQLiteManager.
|
||||
func (r *RQLiteManager) ApplyEmbeddedMigrations(ctx context.Context, fsys fs.FS) error {
|
||||
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
|
||||
db, err := sql.Open("rqlite", fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort))
|
||||
if err != nil {
|
||||
return fmt.Errorf("open rqlite db: %w", err)
|
||||
}
|
||||
|
||||
@ -118,10 +118,12 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string)
|
||||
args = append(args, "-join", joinArg, "-join-as", r.discoverConfig.RaftAdvAddress, "-join-attempts", "30", "-join-interval", "10s")
|
||||
|
||||
// Check if this node should join as a non-voter (read replica).
|
||||
// The discovery service determines voter status based on WG IP ordering.
|
||||
if r.discoveryService != nil && !r.discoveryService.IsVoter(r.discoverConfig.RaftAdvAddress) {
|
||||
r.logger.Info("Joining as non-voter (read replica)",
|
||||
zap.String("raft_address", r.discoverConfig.RaftAdvAddress))
|
||||
// Query the join target's /nodes endpoint to count existing voters,
|
||||
// rather than relying on LibP2P peer count which is incomplete at join time.
|
||||
if shouldBeNonVoter := r.checkShouldBeNonVoter(r.config.RQLiteJoinAddress); shouldBeNonVoter {
|
||||
r.logger.Info("Joining as non-voter (read replica) - cluster already has max voters",
|
||||
zap.String("raft_address", r.discoverConfig.RaftAdvAddress),
|
||||
zap.Int("max_voters", MaxDefaultVoters))
|
||||
args = append(args, "-raft-non-voter")
|
||||
}
|
||||
}
|
||||
@ -168,16 +170,26 @@ func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error {
|
||||
var conn *gorqlite.Connection
|
||||
var err error
|
||||
maxConnectAttempts := 10
|
||||
connectBackoff := 500 * time.Millisecond
|
||||
connectBackoff := 1 * time.Second
|
||||
|
||||
// Use disableClusterDiscovery=true to avoid gorqlite calling /nodes on Open().
|
||||
// The /nodes endpoint probes all cluster members including unreachable ones,
|
||||
// which can block for the full HTTP timeout (~10s per attempt).
|
||||
// This is safe because rqlited followers automatically forward writes to the leader.
|
||||
connURL := fmt.Sprintf("http://localhost:%d?disableClusterDiscovery=true", r.config.RQLitePort)
|
||||
|
||||
for attempt := 0; attempt < maxConnectAttempts; attempt++ {
|
||||
conn, err = gorqlite.Open(fmt.Sprintf("http://localhost:%d", r.config.RQLitePort))
|
||||
conn, err = gorqlite.Open(connURL)
|
||||
if err == nil {
|
||||
r.connection = conn
|
||||
break
|
||||
}
|
||||
|
||||
if strings.Contains(err.Error(), "store is not open") {
|
||||
errMsg := err.Error()
|
||||
if strings.Contains(errMsg, "store is not open") {
|
||||
r.logger.Debug("RQLite not ready yet, retrying",
|
||||
zap.Int("attempt", attempt+1),
|
||||
zap.Error(err))
|
||||
time.Sleep(connectBackoff)
|
||||
connectBackoff = time.Duration(float64(connectBackoff) * 1.5)
|
||||
if connectBackoff > 5*time.Second {
|
||||
@ -288,6 +300,58 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkShouldBeNonVoter queries the join target's /nodes endpoint to count
|
||||
// existing voters. Returns true if the cluster already has MaxDefaultVoters
|
||||
// voters, meaning this node should join as a non-voter.
|
||||
func (r *RQLiteManager) checkShouldBeNonVoter(joinAddress string) bool {
|
||||
// Derive HTTP API URL from the join address (which is a raft address like 10.0.0.1:7001)
|
||||
host := joinAddress
|
||||
if strings.HasPrefix(host, "http://") || strings.HasPrefix(host, "https://") {
|
||||
host = strings.TrimPrefix(host, "http://")
|
||||
host = strings.TrimPrefix(host, "https://")
|
||||
}
|
||||
if idx := strings.Index(host, ":"); idx != -1 {
|
||||
host = host[:idx]
|
||||
}
|
||||
nodesURL := fmt.Sprintf("http://%s:%d/nodes?timeout=2s", host, r.config.RQLitePort)
|
||||
|
||||
client := tlsutil.NewHTTPClient(5 * time.Second)
|
||||
resp, err := client.Get(nodesURL)
|
||||
if err != nil {
|
||||
r.logger.Warn("Could not query /nodes to check voter count, defaulting to voter", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
r.logger.Warn("Could not read /nodes response, defaulting to voter", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
var nodes map[string]struct {
|
||||
Voter bool `json:"voter"`
|
||||
Reachable bool `json:"reachable"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &nodes); err != nil {
|
||||
r.logger.Warn("Could not parse /nodes response, defaulting to voter", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
voterCount := 0
|
||||
for _, n := range nodes {
|
||||
if n.Voter && n.Reachable {
|
||||
voterCount++
|
||||
}
|
||||
}
|
||||
|
||||
r.logger.Info("Checked existing voter count from join target",
|
||||
zap.Int("reachable_voters", voterCount),
|
||||
zap.Int("max_voters", MaxDefaultVoters))
|
||||
|
||||
return voterCount >= MaxDefaultVoters
|
||||
}
|
||||
|
||||
// waitForJoinTarget waits until the join target's HTTP status becomes reachable
|
||||
func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress string, timeout time.Duration) error {
|
||||
deadline := time.Now().Add(timeout)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user