mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 07:03:01 +00:00
fixes
This commit is contained in:
parent
4b3b7b3458
commit
73dfe22438
@ -289,7 +289,7 @@ func (o *Orchestrator) executeJoinFlow() error {
|
||||
fmt.Printf("\n⚙️ Generating configurations...\n")
|
||||
enableHTTPS := false
|
||||
rqliteJoin := joinResp.RQLiteJoinAddress
|
||||
if err := o.setup.Phase4GenerateConfigs(joinResp.BootstrapPeers, joinResp.WGIP, enableHTTPS, o.flags.Domain, joinResp.BaseDomain, rqliteJoin); err != nil {
|
||||
if err := o.setup.Phase4GenerateConfigs(joinResp.BootstrapPeers, joinResp.WGIP, enableHTTPS, o.flags.Domain, joinResp.BaseDomain, rqliteJoin, joinResp.OlricPeers); err != nil {
|
||||
return fmt.Errorf("configuration generation failed: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@ -208,13 +208,15 @@ func (cg *ConfigGenerator) GenerateGatewayConfig(peerAddresses []string, enableH
|
||||
}
|
||||
|
||||
// GenerateOlricConfig generates Olric configuration
|
||||
func (cg *ConfigGenerator) GenerateOlricConfig(serverBindAddr string, httpPort int, memberlistBindAddr string, memberlistPort int, memberlistEnv string) (string, error) {
|
||||
func (cg *ConfigGenerator) GenerateOlricConfig(serverBindAddr string, httpPort int, memberlistBindAddr string, memberlistPort int, memberlistEnv string, advertiseAddr string, peers []string) (string, error) {
|
||||
data := templates.OlricConfigData{
|
||||
ServerBindAddr: serverBindAddr,
|
||||
HTTPPort: httpPort,
|
||||
MemberlistBindAddr: memberlistBindAddr,
|
||||
MemberlistPort: memberlistPort,
|
||||
MemberlistEnvironment: memberlistEnv,
|
||||
MemberlistAdvertiseAddr: advertiseAddr,
|
||||
Peers: peers,
|
||||
}
|
||||
return templates.RenderOlricConfig(data)
|
||||
}
|
||||
|
||||
@ -471,7 +471,7 @@ func (ps *ProductionSetup) Phase3GenerateSecrets() error {
|
||||
}
|
||||
|
||||
// Phase4GenerateConfigs generates node, gateway, and service configs
|
||||
func (ps *ProductionSetup) Phase4GenerateConfigs(peerAddresses []string, vpsIP string, enableHTTPS bool, domain string, baseDomain string, joinAddress string) error {
|
||||
func (ps *ProductionSetup) Phase4GenerateConfigs(peerAddresses []string, vpsIP string, enableHTTPS bool, domain string, baseDomain string, joinAddress string, olricPeers ...[]string) error {
|
||||
if ps.IsUpdate() {
|
||||
ps.logf("Phase 4: Updating configurations...")
|
||||
ps.logf(" (Existing configs will be updated to latest format)")
|
||||
@ -496,14 +496,21 @@ func (ps *ProductionSetup) Phase4GenerateConfigs(peerAddresses []string, vpsIP s
|
||||
|
||||
// Olric config:
|
||||
// - HTTP API binds to localhost for security (accessed via gateway)
|
||||
// - Memberlist binds to 0.0.0.0 for cluster communication across nodes
|
||||
// - Environment "lan" for production multi-node clustering
|
||||
// - Memberlist binds to WG IP for cluster communication across nodes
|
||||
// - Advertise WG IP so peers can reach this node
|
||||
// - Seed peers from join response for initial cluster formation
|
||||
var olricSeedPeers []string
|
||||
if len(olricPeers) > 0 {
|
||||
olricSeedPeers = olricPeers[0]
|
||||
}
|
||||
olricConfig, err := ps.configGenerator.GenerateOlricConfig(
|
||||
"127.0.0.1", // HTTP API on localhost
|
||||
3320,
|
||||
"0.0.0.0", // Memberlist on all interfaces for clustering
|
||||
vpsIP, // Memberlist on WG IP for clustering
|
||||
3322,
|
||||
"lan", // Production environment
|
||||
vpsIP, // Advertise WG IP
|
||||
olricSeedPeers,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate olric config: %w", err)
|
||||
|
||||
@ -6,3 +6,12 @@ memberlist:
|
||||
environment: {{.MemberlistEnvironment}}
|
||||
bindAddr: "{{.MemberlistBindAddr}}"
|
||||
bindPort: {{.MemberlistPort}}
|
||||
{{- if .MemberlistAdvertiseAddr}}
|
||||
advertiseAddr: "{{.MemberlistAdvertiseAddr}}"
|
||||
{{- end}}
|
||||
{{- if .Peers}}
|
||||
peers:
|
||||
{{- range .Peers}}
|
||||
- "{{.}}"
|
||||
{{- end}}
|
||||
{{- end}}
|
||||
|
||||
@ -58,9 +58,11 @@ type GatewayConfigData struct {
|
||||
type OlricConfigData struct {
|
||||
ServerBindAddr string // HTTP API bind address (127.0.0.1 for security)
|
||||
HTTPPort int
|
||||
MemberlistBindAddr string // Memberlist bind address (0.0.0.0 for clustering)
|
||||
MemberlistBindAddr string // Memberlist bind address (WG IP for clustering)
|
||||
MemberlistPort int
|
||||
MemberlistEnvironment string // "local", "lan", or "wan"
|
||||
MemberlistAdvertiseAddr string // Advertise address (WG IP) so other nodes can reach us
|
||||
Peers []string // Seed peers for memberlist (host:port)
|
||||
}
|
||||
|
||||
// SystemdIPFSData holds parameters for systemd IPFS service rendering
|
||||
|
||||
@ -37,6 +37,9 @@ type JoinResponse struct {
|
||||
IPFSClusterPeer PeerInfo `json:"ipfs_cluster_peer"`
|
||||
BootstrapPeers []string `json:"bootstrap_peers"`
|
||||
|
||||
// Olric seed peers (WG IP:port for memberlist)
|
||||
OlricPeers []string `json:"olric_peers,omitempty"`
|
||||
|
||||
// Domain
|
||||
BaseDomain string `json:"base_domain"`
|
||||
}
|
||||
@ -163,6 +166,15 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
|
||||
// 10. Read base domain from config
|
||||
baseDomain := h.readBaseDomain()
|
||||
|
||||
// Build Olric seed peers from all existing WG peer IPs (memberlist port 3322)
|
||||
var olricPeers []string
|
||||
for _, p := range wgPeers {
|
||||
peerIP := strings.TrimSuffix(p.AllowedIP, "/32")
|
||||
olricPeers = append(olricPeers, fmt.Sprintf("%s:3322", peerIP))
|
||||
}
|
||||
// Include this node too
|
||||
olricPeers = append(olricPeers, fmt.Sprintf("%s:3322", myWGIP))
|
||||
|
||||
resp := JoinResponse{
|
||||
WGIP: wgIP,
|
||||
WGPeers: wgPeers,
|
||||
@ -172,6 +184,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) {
|
||||
IPFSPeer: ipfsPeer,
|
||||
IPFSClusterPeer: ipfsClusterPeer,
|
||||
BootstrapPeers: bootstrapPeers,
|
||||
OlricPeers: olricPeers,
|
||||
BaseDomain: baseDomain,
|
||||
}
|
||||
|
||||
@ -361,7 +374,7 @@ func (h *Handler) queryIPFSPeerInfo(myWGIP string) PeerInfo {
|
||||
return PeerInfo{
|
||||
ID: result.ID,
|
||||
Addrs: []string{
|
||||
fmt.Sprintf("/ip4/%s/tcp/4101", myWGIP),
|
||||
fmt.Sprintf("/ip4/%s/tcp/4101/p2p/%s", myWGIP, result.ID),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
@ -319,6 +320,26 @@ func (cm *ClusterConfigManager) UpdateIPFSPeeringConfig(peers []IPFSPeerEntry) e
|
||||
return fmt.Errorf("failed to write IPFS config: %w", err)
|
||||
}
|
||||
|
||||
// Also add peers via the live IPFS API so the running daemon picks them up
|
||||
// immediately without requiring a restart. The config file write above
|
||||
// ensures persistence across restarts.
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
for _, p := range peers {
|
||||
for _, addr := range p.Addrs {
|
||||
peeringMA := addr
|
||||
if !strings.Contains(addr, "/p2p/") {
|
||||
peeringMA = fmt.Sprintf("%s/p2p/%s", addr, p.ID)
|
||||
}
|
||||
addURL := fmt.Sprintf("http://localhost:4501/api/v0/swarm/peering/add?arg=%s", url.QueryEscape(peeringMA))
|
||||
if resp, err := client.Post(addURL, "", nil); err == nil {
|
||||
resp.Body.Close()
|
||||
cm.logger.Debug("Added IPFS peering via live API", zap.String("multiaddr", peeringMA))
|
||||
} else {
|
||||
cm.logger.Debug("Failed to add IPFS peering via live API", zap.String("multiaddr", peeringMA), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -119,6 +119,14 @@ func (c *ClusterDiscoveryService) Stop() {
|
||||
c.logger.Info("Cluster discovery service stopped")
|
||||
}
|
||||
|
||||
// IsVoter returns true if the given raft address should be a voter
|
||||
// in the default cluster based on the current known peers.
|
||||
func (c *ClusterDiscoveryService) IsVoter(raftAddress string) bool {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.IsVoterLocked(raftAddress)
|
||||
}
|
||||
|
||||
// periodicSync runs periodic cluster membership synchronization
|
||||
func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) {
|
||||
c.logger.Debug("periodicSync goroutine started, waiting for RQLite readiness")
|
||||
|
||||
@ -3,8 +3,10 @@ package rqlite
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -12,6 +14,12 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// MaxDefaultVoters is the maximum number of voter nodes in the default cluster.
|
||||
// Additional nodes join as non-voters (read replicas). Voter election is
|
||||
// deterministic: all peers sorted by the IP component of their raft address,
|
||||
// and the first MaxDefaultVoters are voters.
|
||||
const MaxDefaultVoters = 5
|
||||
|
||||
// collectPeerMetadata collects RQLite metadata from LibP2P peers
|
||||
func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeMetadata {
|
||||
connectedPeers := c.host.Network().Peers()
|
||||
@ -240,13 +248,22 @@ func (c *ClusterDiscoveryService) getPeersJSON() []map[string]interface{} {
|
||||
}
|
||||
|
||||
func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{} {
|
||||
peers := make([]map[string]interface{}, 0, len(c.knownPeers))
|
||||
|
||||
// Collect all raft addresses
|
||||
raftAddrs := make([]string, 0, len(c.knownPeers))
|
||||
for _, peer := range c.knownPeers {
|
||||
raftAddrs = append(raftAddrs, peer.RaftAddress)
|
||||
}
|
||||
|
||||
// Determine voter set
|
||||
voterSet := computeVoterSet(raftAddrs, MaxDefaultVoters)
|
||||
|
||||
peers := make([]map[string]interface{}, 0, len(c.knownPeers))
|
||||
for _, peer := range c.knownPeers {
|
||||
_, isVoter := voterSet[peer.RaftAddress]
|
||||
peerEntry := map[string]interface{}{
|
||||
"id": peer.RaftAddress,
|
||||
"address": peer.RaftAddress,
|
||||
"non_voter": false,
|
||||
"non_voter": !isVoter,
|
||||
}
|
||||
peers = append(peers, peerEntry)
|
||||
}
|
||||
@ -254,6 +271,50 @@ func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{
|
||||
return peers
|
||||
}
|
||||
|
||||
// computeVoterSet returns the set of raft addresses that should be voters.
|
||||
// It sorts addresses by their IP component and selects the first maxVoters.
|
||||
// This is deterministic — all nodes compute the same voter set from the same peer list.
|
||||
func computeVoterSet(raftAddrs []string, maxVoters int) map[string]struct{} {
|
||||
sorted := make([]string, len(raftAddrs))
|
||||
copy(sorted, raftAddrs)
|
||||
|
||||
sort.Slice(sorted, func(i, j int) bool {
|
||||
ipI := extractIPForSort(sorted[i])
|
||||
ipJ := extractIPForSort(sorted[j])
|
||||
return ipI < ipJ
|
||||
})
|
||||
|
||||
voters := make(map[string]struct{})
|
||||
for i, addr := range sorted {
|
||||
if i >= maxVoters {
|
||||
break
|
||||
}
|
||||
voters[addr] = struct{}{}
|
||||
}
|
||||
return voters
|
||||
}
|
||||
|
||||
// extractIPForSort extracts the IP string from a raft address (host:port) for sorting.
|
||||
func extractIPForSort(raftAddr string) string {
|
||||
host, _, err := net.SplitHostPort(raftAddr)
|
||||
if err != nil {
|
||||
return raftAddr
|
||||
}
|
||||
return host
|
||||
}
|
||||
|
||||
// IsVoter returns true if the given raft address is in the voter set
|
||||
// based on the current known peers. Must be called with c.mu held.
|
||||
func (c *ClusterDiscoveryService) IsVoterLocked(raftAddress string) bool {
|
||||
raftAddrs := make([]string, 0, len(c.knownPeers))
|
||||
for _, peer := range c.knownPeers {
|
||||
raftAddrs = append(raftAddrs, peer.RaftAddress)
|
||||
}
|
||||
voterSet := computeVoterSet(raftAddrs, MaxDefaultVoters)
|
||||
_, isVoter := voterSet[raftAddress]
|
||||
return isVoter
|
||||
}
|
||||
|
||||
func (c *ClusterDiscoveryService) writePeersJSON() error {
|
||||
c.mu.RLock()
|
||||
peers := c.getPeersJSONUnlocked()
|
||||
|
||||
@ -116,6 +116,14 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string)
|
||||
}
|
||||
|
||||
args = append(args, "-join", joinArg, "-join-as", r.discoverConfig.RaftAdvAddress, "-join-attempts", "30", "-join-interval", "10s")
|
||||
|
||||
// Check if this node should join as a non-voter (read replica).
|
||||
// The discovery service determines voter status based on WG IP ordering.
|
||||
if r.discoveryService != nil && !r.discoveryService.IsVoter(r.discoverConfig.RaftAdvAddress) {
|
||||
r.logger.Info("Joining as non-voter (read replica)",
|
||||
zap.String("raft_address", r.discoverConfig.RaftAdvAddress))
|
||||
args = append(args, "-non-voter")
|
||||
}
|
||||
}
|
||||
|
||||
args = append(args, rqliteDataDir)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user