diff --git a/pkg/cli/production/install/orchestrator.go b/pkg/cli/production/install/orchestrator.go index 24f9380..e1f6c45 100644 --- a/pkg/cli/production/install/orchestrator.go +++ b/pkg/cli/production/install/orchestrator.go @@ -289,7 +289,7 @@ func (o *Orchestrator) executeJoinFlow() error { fmt.Printf("\n⚙️ Generating configurations...\n") enableHTTPS := false rqliteJoin := joinResp.RQLiteJoinAddress - if err := o.setup.Phase4GenerateConfigs(joinResp.BootstrapPeers, joinResp.WGIP, enableHTTPS, o.flags.Domain, joinResp.BaseDomain, rqliteJoin); err != nil { + if err := o.setup.Phase4GenerateConfigs(joinResp.BootstrapPeers, joinResp.WGIP, enableHTTPS, o.flags.Domain, joinResp.BaseDomain, rqliteJoin, joinResp.OlricPeers); err != nil { return fmt.Errorf("configuration generation failed: %w", err) } diff --git a/pkg/environments/production/config.go b/pkg/environments/production/config.go index a503dec..6561427 100644 --- a/pkg/environments/production/config.go +++ b/pkg/environments/production/config.go @@ -208,13 +208,15 @@ func (cg *ConfigGenerator) GenerateGatewayConfig(peerAddresses []string, enableH } // GenerateOlricConfig generates Olric configuration -func (cg *ConfigGenerator) GenerateOlricConfig(serverBindAddr string, httpPort int, memberlistBindAddr string, memberlistPort int, memberlistEnv string) (string, error) { +func (cg *ConfigGenerator) GenerateOlricConfig(serverBindAddr string, httpPort int, memberlistBindAddr string, memberlistPort int, memberlistEnv string, advertiseAddr string, peers []string) (string, error) { data := templates.OlricConfigData{ - ServerBindAddr: serverBindAddr, - HTTPPort: httpPort, - MemberlistBindAddr: memberlistBindAddr, - MemberlistPort: memberlistPort, - MemberlistEnvironment: memberlistEnv, + ServerBindAddr: serverBindAddr, + HTTPPort: httpPort, + MemberlistBindAddr: memberlistBindAddr, + MemberlistPort: memberlistPort, + MemberlistEnvironment: memberlistEnv, + MemberlistAdvertiseAddr: advertiseAddr, + Peers: peers, } return templates.RenderOlricConfig(data) } diff --git a/pkg/environments/production/orchestrator.go b/pkg/environments/production/orchestrator.go index 07347fe..a844eae 100644 --- a/pkg/environments/production/orchestrator.go +++ b/pkg/environments/production/orchestrator.go @@ -471,7 +471,7 @@ func (ps *ProductionSetup) Phase3GenerateSecrets() error { } // Phase4GenerateConfigs generates node, gateway, and service configs -func (ps *ProductionSetup) Phase4GenerateConfigs(peerAddresses []string, vpsIP string, enableHTTPS bool, domain string, baseDomain string, joinAddress string) error { +func (ps *ProductionSetup) Phase4GenerateConfigs(peerAddresses []string, vpsIP string, enableHTTPS bool, domain string, baseDomain string, joinAddress string, olricPeers ...[]string) error { if ps.IsUpdate() { ps.logf("Phase 4: Updating configurations...") ps.logf(" (Existing configs will be updated to latest format)") @@ -496,14 +496,21 @@ func (ps *ProductionSetup) Phase4GenerateConfigs(peerAddresses []string, vpsIP s // Olric config: // - HTTP API binds to localhost for security (accessed via gateway) - // - Memberlist binds to 0.0.0.0 for cluster communication across nodes - // - Environment "lan" for production multi-node clustering + // - Memberlist binds to WG IP for cluster communication across nodes + // - Advertise WG IP so peers can reach this node + // - Seed peers from join response for initial cluster formation + var olricSeedPeers []string + if len(olricPeers) > 0 { + olricSeedPeers = olricPeers[0] + } olricConfig, err := ps.configGenerator.GenerateOlricConfig( "127.0.0.1", // HTTP API on localhost 3320, - "0.0.0.0", // Memberlist on all interfaces for clustering + vpsIP, // Memberlist on WG IP for clustering 3322, "lan", // Production environment + vpsIP, // Advertise WG IP + olricSeedPeers, ) if err != nil { return fmt.Errorf("failed to generate olric config: %w", err) diff --git a/pkg/environments/templates/olric.yaml b/pkg/environments/templates/olric.yaml index c1d00cc..57f15c7 100644 --- a/pkg/environments/templates/olric.yaml +++ b/pkg/environments/templates/olric.yaml @@ -1,8 +1,17 @@ server: bindAddr: "{{.ServerBindAddr}}" - bindPort: { { .HTTPPort } } + bindPort: {{.HTTPPort}} memberlist: - environment: { { .MemberlistEnvironment } } + environment: {{.MemberlistEnvironment}} bindAddr: "{{.MemberlistBindAddr}}" - bindPort: { { .MemberlistPort } } + bindPort: {{.MemberlistPort}} +{{- if .MemberlistAdvertiseAddr}} + advertiseAddr: "{{.MemberlistAdvertiseAddr}}" +{{- end}} +{{- if .Peers}} + peers: +{{- range .Peers}} + - "{{.}}" +{{- end}} +{{- end}} diff --git a/pkg/environments/templates/render.go b/pkg/environments/templates/render.go index 8e03721..74b1de0 100644 --- a/pkg/environments/templates/render.go +++ b/pkg/environments/templates/render.go @@ -56,11 +56,13 @@ type GatewayConfigData struct { // OlricConfigData holds parameters for olric.yaml rendering type OlricConfigData struct { - ServerBindAddr string // HTTP API bind address (127.0.0.1 for security) - HTTPPort int - MemberlistBindAddr string // Memberlist bind address (0.0.0.0 for clustering) - MemberlistPort int - MemberlistEnvironment string // "local", "lan", or "wan" + ServerBindAddr string // HTTP API bind address (127.0.0.1 for security) + HTTPPort int + MemberlistBindAddr string // Memberlist bind address (WG IP for clustering) + MemberlistPort int + MemberlistEnvironment string // "local", "lan", or "wan" + MemberlistAdvertiseAddr string // Advertise address (WG IP) so other nodes can reach us + Peers []string // Seed peers for memberlist (host:port) } // SystemdIPFSData holds parameters for systemd IPFS service rendering diff --git a/pkg/gateway/handlers/join/handler.go b/pkg/gateway/handlers/join/handler.go index 792700e..ddaf3a3 100644 --- a/pkg/gateway/handlers/join/handler.go +++ b/pkg/gateway/handlers/join/handler.go @@ -37,6 +37,9 @@ type JoinResponse struct { IPFSClusterPeer PeerInfo `json:"ipfs_cluster_peer"` BootstrapPeers []string `json:"bootstrap_peers"` + // Olric seed peers (WG IP:port for memberlist) + OlricPeers []string `json:"olric_peers,omitempty"` + // Domain BaseDomain string `json:"base_domain"` } @@ -163,6 +166,15 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { // 10. Read base domain from config baseDomain := h.readBaseDomain() + // Build Olric seed peers from all existing WG peer IPs (memberlist port 3322) + var olricPeers []string + for _, p := range wgPeers { + peerIP := strings.TrimSuffix(p.AllowedIP, "/32") + olricPeers = append(olricPeers, fmt.Sprintf("%s:3322", peerIP)) + } + // Include this node too + olricPeers = append(olricPeers, fmt.Sprintf("%s:3322", myWGIP)) + resp := JoinResponse{ WGIP: wgIP, WGPeers: wgPeers, @@ -172,6 +184,7 @@ func (h *Handler) HandleJoin(w http.ResponseWriter, r *http.Request) { IPFSPeer: ipfsPeer, IPFSClusterPeer: ipfsClusterPeer, BootstrapPeers: bootstrapPeers, + OlricPeers: olricPeers, BaseDomain: baseDomain, } @@ -361,7 +374,7 @@ func (h *Handler) queryIPFSPeerInfo(myWGIP string) PeerInfo { return PeerInfo{ ID: result.ID, Addrs: []string{ - fmt.Sprintf("/ip4/%s/tcp/4101", myWGIP), + fmt.Sprintf("/ip4/%s/tcp/4101/p2p/%s", myWGIP, result.ID), }, } } diff --git a/pkg/ipfs/cluster_peer.go b/pkg/ipfs/cluster_peer.go index 339ae71..cdc0a7f 100644 --- a/pkg/ipfs/cluster_peer.go +++ b/pkg/ipfs/cluster_peer.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "os" "os/exec" "path/filepath" @@ -319,6 +320,26 @@ func (cm *ClusterConfigManager) UpdateIPFSPeeringConfig(peers []IPFSPeerEntry) e return fmt.Errorf("failed to write IPFS config: %w", err) } + // Also add peers via the live IPFS API so the running daemon picks them up + // immediately without requiring a restart. The config file write above + // ensures persistence across restarts. + client := &http.Client{Timeout: 5 * time.Second} + for _, p := range peers { + for _, addr := range p.Addrs { + peeringMA := addr + if !strings.Contains(addr, "/p2p/") { + peeringMA = fmt.Sprintf("%s/p2p/%s", addr, p.ID) + } + addURL := fmt.Sprintf("http://localhost:4501/api/v0/swarm/peering/add?arg=%s", url.QueryEscape(peeringMA)) + if resp, err := client.Post(addURL, "", nil); err == nil { + resp.Body.Close() + cm.logger.Debug("Added IPFS peering via live API", zap.String("multiaddr", peeringMA)) + } else { + cm.logger.Debug("Failed to add IPFS peering via live API", zap.String("multiaddr", peeringMA), zap.Error(err)) + } + } + } + return nil } diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index 72d3da3..d411a5b 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -119,6 +119,14 @@ func (c *ClusterDiscoveryService) Stop() { c.logger.Info("Cluster discovery service stopped") } +// IsVoter returns true if the given raft address should be a voter +// in the default cluster based on the current known peers. +func (c *ClusterDiscoveryService) IsVoter(raftAddress string) bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.IsVoterLocked(raftAddress) +} + // periodicSync runs periodic cluster membership synchronization func (c *ClusterDiscoveryService) periodicSync(ctx context.Context) { c.logger.Debug("periodicSync goroutine started, waiting for RQLite readiness") diff --git a/pkg/rqlite/cluster_discovery_membership.go b/pkg/rqlite/cluster_discovery_membership.go index 55065f3..ec5bf08 100644 --- a/pkg/rqlite/cluster_discovery_membership.go +++ b/pkg/rqlite/cluster_discovery_membership.go @@ -3,8 +3,10 @@ package rqlite import ( "encoding/json" "fmt" + "net" "os" "path/filepath" + "sort" "strings" "time" @@ -12,6 +14,12 @@ import ( "go.uber.org/zap" ) +// MaxDefaultVoters is the maximum number of voter nodes in the default cluster. +// Additional nodes join as non-voters (read replicas). Voter election is +// deterministic: all peers sorted by the IP component of their raft address, +// and the first MaxDefaultVoters are voters. +const MaxDefaultVoters = 5 + // collectPeerMetadata collects RQLite metadata from LibP2P peers func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeMetadata { connectedPeers := c.host.Network().Peers() @@ -240,13 +248,22 @@ func (c *ClusterDiscoveryService) getPeersJSON() []map[string]interface{} { } func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{} { - peers := make([]map[string]interface{}, 0, len(c.knownPeers)) - + // Collect all raft addresses + raftAddrs := make([]string, 0, len(c.knownPeers)) for _, peer := range c.knownPeers { + raftAddrs = append(raftAddrs, peer.RaftAddress) + } + + // Determine voter set + voterSet := computeVoterSet(raftAddrs, MaxDefaultVoters) + + peers := make([]map[string]interface{}, 0, len(c.knownPeers)) + for _, peer := range c.knownPeers { + _, isVoter := voterSet[peer.RaftAddress] peerEntry := map[string]interface{}{ "id": peer.RaftAddress, "address": peer.RaftAddress, - "non_voter": false, + "non_voter": !isVoter, } peers = append(peers, peerEntry) } @@ -254,6 +271,50 @@ func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{ return peers } +// computeVoterSet returns the set of raft addresses that should be voters. +// It sorts addresses by their IP component and selects the first maxVoters. +// This is deterministic — all nodes compute the same voter set from the same peer list. +func computeVoterSet(raftAddrs []string, maxVoters int) map[string]struct{} { + sorted := make([]string, len(raftAddrs)) + copy(sorted, raftAddrs) + + sort.Slice(sorted, func(i, j int) bool { + ipI := extractIPForSort(sorted[i]) + ipJ := extractIPForSort(sorted[j]) + return ipI < ipJ + }) + + voters := make(map[string]struct{}) + for i, addr := range sorted { + if i >= maxVoters { + break + } + voters[addr] = struct{}{} + } + return voters +} + +// extractIPForSort extracts the IP string from a raft address (host:port) for sorting. +func extractIPForSort(raftAddr string) string { + host, _, err := net.SplitHostPort(raftAddr) + if err != nil { + return raftAddr + } + return host +} + +// IsVoter returns true if the given raft address is in the voter set +// based on the current known peers. Must be called with c.mu held. +func (c *ClusterDiscoveryService) IsVoterLocked(raftAddress string) bool { + raftAddrs := make([]string, 0, len(c.knownPeers)) + for _, peer := range c.knownPeers { + raftAddrs = append(raftAddrs, peer.RaftAddress) + } + voterSet := computeVoterSet(raftAddrs, MaxDefaultVoters) + _, isVoter := voterSet[raftAddress] + return isVoter +} + func (c *ClusterDiscoveryService) writePeersJSON() error { c.mu.RLock() peers := c.getPeersJSONUnlocked() diff --git a/pkg/rqlite/process.go b/pkg/rqlite/process.go index d6e5c00..b8edeef 100644 --- a/pkg/rqlite/process.go +++ b/pkg/rqlite/process.go @@ -116,6 +116,14 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) } args = append(args, "-join", joinArg, "-join-as", r.discoverConfig.RaftAdvAddress, "-join-attempts", "30", "-join-interval", "10s") + + // Check if this node should join as a non-voter (read replica). + // The discovery service determines voter status based on WG IP ordering. + if r.discoveryService != nil && !r.discoveryService.IsVoter(r.discoverConfig.RaftAdvAddress) { + r.logger.Info("Joining as non-voter (read replica)", + zap.String("raft_address", r.discoverConfig.RaftAdvAddress)) + args = append(args, "-non-voter") + } } args = append(args, rqliteDataDir)