mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 08:36:57 +00:00
fixed ipfs problem forming cluster
This commit is contained in:
parent
3343ade433
commit
765ce46ea7
3
migrations/015_ipfs_peer_ids.sql
Normal file
3
migrations/015_ipfs_peer_ids.sql
Normal file
@ -0,0 +1,3 @@
|
||||
-- Store IPFS peer IDs alongside WireGuard peers for automatic swarm discovery
|
||||
-- Each node registers its IPFS peer ID so other nodes can connect via ipfs swarm connect
|
||||
ALTER TABLE wireguard_peers ADD COLUMN ipfs_peer_id TEXT DEFAULT '';
|
||||
186
pkg/node/ipfs_swarm_sync.go
Normal file
186
pkg/node/ipfs_swarm_sync.go
Normal file
@ -0,0 +1,186 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// syncIPFSSwarmPeers queries all cluster nodes from RQLite and ensures
|
||||
// this node's IPFS daemon is connected to every other node's IPFS daemon.
|
||||
// Uses `ipfs swarm connect` for immediate connectivity without requiring
|
||||
// config file changes or IPFS restarts.
|
||||
func (n *Node) syncIPFSSwarmPeers(ctx context.Context) {
|
||||
if n.rqliteAdapter == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if IPFS is running
|
||||
if _, err := exec.LookPath("ipfs"); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Get this node's WG IP
|
||||
myWGIP := getLocalWGIP()
|
||||
if myWGIP == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// Query all peers with IPFS peer IDs from RQLite
|
||||
db := n.rqliteAdapter.GetSQLDB()
|
||||
rows, err := db.QueryContext(ctx,
|
||||
"SELECT wg_ip, ipfs_peer_id FROM wireguard_peers WHERE ipfs_peer_id != '' AND wg_ip != ?",
|
||||
myWGIP)
|
||||
if err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to query IPFS peers from RQLite", zap.Error(err))
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type ipfsPeer struct {
|
||||
wgIP string
|
||||
peerID string
|
||||
}
|
||||
|
||||
var peers []ipfsPeer
|
||||
for rows.Next() {
|
||||
var p ipfsPeer
|
||||
if err := rows.Scan(&p.wgIP, &p.peerID); err != nil {
|
||||
continue
|
||||
}
|
||||
peers = append(peers, p)
|
||||
}
|
||||
|
||||
if len(peers) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Get currently connected IPFS swarm peers via API
|
||||
connectedPeers := getConnectedIPFSPeers()
|
||||
|
||||
// Connect to any peer we're not already connected to
|
||||
connected := 0
|
||||
for _, p := range peers {
|
||||
if connectedPeers[p.peerID] {
|
||||
continue // already connected
|
||||
}
|
||||
|
||||
multiaddr := fmt.Sprintf("/ip4/%s/tcp/4101/p2p/%s", p.wgIP, p.peerID)
|
||||
if err := ipfsSwarmConnect(multiaddr); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect IPFS swarm peer",
|
||||
zap.String("peer", p.peerID[:12]+"..."),
|
||||
zap.String("wg_ip", p.wgIP),
|
||||
zap.Error(err))
|
||||
} else {
|
||||
connected++
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "Connected to IPFS swarm peer",
|
||||
zap.String("peer", p.peerID[:12]+"..."),
|
||||
zap.String("wg_ip", p.wgIP))
|
||||
}
|
||||
}
|
||||
|
||||
if connected > 0 {
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "IPFS swarm sync completed",
|
||||
zap.Int("new_connections", connected),
|
||||
zap.Int("total_cluster_peers", len(peers)))
|
||||
}
|
||||
}
|
||||
|
||||
// getConnectedIPFSPeers returns a set of currently connected IPFS peer IDs
|
||||
func getConnectedIPFSPeers() map[string]bool {
|
||||
peers := make(map[string]bool)
|
||||
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Post("http://localhost:4501/api/v0/swarm/peers", "", nil)
|
||||
if err != nil {
|
||||
return peers
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// The response contains Peers array with Peer field for each connected peer
|
||||
// We just need the peer IDs, which are the last component of each multiaddr
|
||||
var result struct {
|
||||
Peers []struct {
|
||||
Peer string `json:"Peer"`
|
||||
} `json:"Peers"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return peers
|
||||
}
|
||||
|
||||
for _, p := range result.Peers {
|
||||
peers[p.Peer] = true
|
||||
}
|
||||
return peers
|
||||
}
|
||||
|
||||
// ipfsSwarmConnect connects to an IPFS peer via the HTTP API
|
||||
func ipfsSwarmConnect(multiaddr string) error {
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
apiURL := fmt.Sprintf("http://localhost:4501/api/v0/swarm/connect?arg=%s", url.QueryEscape(multiaddr))
|
||||
resp, err := client.Post(apiURL, "", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("swarm connect returned status %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getLocalWGIP returns the WireGuard IP of this node
|
||||
func getLocalWGIP() string {
|
||||
out, err := exec.Command("ip", "-4", "addr", "show", "wg0").CombinedOutput()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
for _, line := range strings.Split(string(out), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if strings.HasPrefix(line, "inet ") {
|
||||
parts := strings.Fields(line)
|
||||
if len(parts) >= 2 {
|
||||
return strings.Split(parts[1], "/")[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// startIPFSSwarmSyncLoop periodically syncs IPFS swarm connections with cluster peers
|
||||
func (n *Node) startIPFSSwarmSyncLoop(ctx context.Context) {
|
||||
// Initial sync after a short delay (give IPFS time to start)
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(30 * time.Second):
|
||||
}
|
||||
|
||||
n.syncIPFSSwarmPeers(ctx)
|
||||
|
||||
// Then sync every 60 seconds
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
n.syncIPFSSwarmPeers(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "IPFS swarm sync loop started")
|
||||
}
|
||||
@ -106,6 +106,9 @@ func (n *Node) Start(ctx context.Context) error {
|
||||
// Sync WireGuard peers from RQLite (if WG is active on this node)
|
||||
n.startWireGuardSyncLoop(ctx)
|
||||
|
||||
// Sync IPFS swarm connections with all cluster peers
|
||||
n.startIPFSSwarmSyncLoop(ctx)
|
||||
|
||||
// Register this node in dns_nodes table for deployment routing
|
||||
if err := n.registerDNSNode(ctx); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to register DNS node", zap.Error(err))
|
||||
|
||||
@ -2,8 +2,10 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
@ -156,19 +158,41 @@ func (n *Node) ensureWireGuardSelfRegistered(ctx context.Context) {
|
||||
nodeID = fmt.Sprintf("node-%s", wgIP)
|
||||
}
|
||||
|
||||
// Query local IPFS peer ID
|
||||
ipfsPeerID := queryLocalIPFSPeerID()
|
||||
|
||||
db := n.rqliteAdapter.GetSQLDB()
|
||||
_, err = db.ExecContext(ctx,
|
||||
"INSERT OR REPLACE INTO wireguard_peers (node_id, wg_ip, public_key, public_ip, wg_port) VALUES (?, ?, ?, ?, ?)",
|
||||
nodeID, wgIP, localPubKey, publicIP, 51820)
|
||||
"INSERT OR REPLACE INTO wireguard_peers (node_id, wg_ip, public_key, public_ip, wg_port, ipfs_peer_id) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
nodeID, wgIP, localPubKey, publicIP, 51820, ipfsPeerID)
|
||||
if err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to self-register WG peer", zap.Error(err))
|
||||
} else {
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "WireGuard self-registered",
|
||||
zap.String("wg_ip", wgIP),
|
||||
zap.String("public_key", localPubKey[:8]+"..."))
|
||||
zap.String("public_key", localPubKey[:8]+"..."),
|
||||
zap.String("ipfs_peer_id", ipfsPeerID))
|
||||
}
|
||||
}
|
||||
|
||||
// queryLocalIPFSPeerID queries the local IPFS daemon for its peer ID
|
||||
func queryLocalIPFSPeerID() string {
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Post("http://localhost:4501/api/v0/id", "", nil)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var result struct {
|
||||
ID string `json:"ID"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return ""
|
||||
}
|
||||
return result.ID
|
||||
}
|
||||
|
||||
// startWireGuardSyncLoop runs syncWireGuardPeers periodically
|
||||
func (n *Node) startWireGuardSyncLoop(ctx context.Context) {
|
||||
// Ensure this node is registered in wireguard_peers (critical for join flow)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user