orama/pkg/node/wireguard_sync.go
2026-02-25 08:37:55 +02:00

262 lines
7.5 KiB
Go

package node
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os/exec"
"strings"
"time"
"github.com/DeBrosOfficial/network/pkg/environments/production"
"github.com/DeBrosOfficial/network/pkg/logging"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"go.uber.org/zap"
)
// syncWireGuardPeers reads all peers from RQLite and reconciles the local
// WireGuard interface so it matches the cluster state. This is called on
// startup after RQLite is ready and periodically thereafter.
func (n *Node) syncWireGuardPeers(ctx context.Context) error {
if n.rqliteAdapter == nil {
return fmt.Errorf("rqlite adapter not initialized")
}
// Check if WireGuard is installed and active
if _, err := exec.LookPath("wg"); err != nil {
n.logger.ComponentInfo(logging.ComponentNode, "WireGuard not installed, skipping peer sync")
return nil
}
// Check if wg0 interface exists
out, err := exec.CommandContext(ctx, "wg", "show", "wg0").CombinedOutput()
if err != nil {
n.logger.ComponentInfo(logging.ComponentNode, "WireGuard interface wg0 not active, skipping peer sync")
return nil
}
// Parse current peers from wg show output
currentPeers := parseWGShowPeers(string(out))
localPubKey := parseWGShowLocalKey(string(out))
// Query all peers from RQLite
db := n.rqliteAdapter.GetSQLDB()
rows, err := db.QueryContext(ctx,
"SELECT node_id, wg_ip, public_key, public_ip, wg_port FROM wireguard_peers ORDER BY wg_ip")
if err != nil {
return fmt.Errorf("failed to query wireguard_peers: %w", err)
}
defer rows.Close()
// Build desired peer set (excluding self)
desiredPeers := make(map[string]production.WireGuardPeer)
for rows.Next() {
var nodeID, wgIP, pubKey, pubIP string
var wgPort int
if err := rows.Scan(&nodeID, &wgIP, &pubKey, &pubIP, &wgPort); err != nil {
continue
}
if pubKey == localPubKey {
continue // skip self
}
if wgPort == 0 {
wgPort = 51820
}
desiredPeers[pubKey] = production.WireGuardPeer{
PublicKey: pubKey,
Endpoint: fmt.Sprintf("%s:%d", pubIP, wgPort),
AllowedIP: wgIP + "/32",
}
}
wp := &production.WireGuardProvisioner{}
// Add missing peers
for pubKey, peer := range desiredPeers {
if _, exists := currentPeers[pubKey]; !exists {
if err := wp.AddPeer(peer); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "failed to add WG peer",
zap.String("public_key", pubKey[:8]+"..."),
zap.Error(err))
} else {
n.logger.ComponentInfo(logging.ComponentNode, "added WG peer",
zap.String("allowed_ip", peer.AllowedIP))
}
}
}
// Remove peers not in the desired set
for pubKey := range currentPeers {
if _, exists := desiredPeers[pubKey]; !exists {
if err := wp.RemovePeer(pubKey); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "failed to remove stale WG peer",
zap.String("public_key", pubKey[:8]+"..."),
zap.Error(err))
} else {
n.logger.ComponentInfo(logging.ComponentNode, "removed stale WG peer",
zap.String("public_key", pubKey[:8]+"..."))
}
}
}
n.logger.ComponentInfo(logging.ComponentNode, "WireGuard peer sync completed",
zap.Int("desired_peers", len(desiredPeers)),
zap.Int("current_peers", len(currentPeers)))
return nil
}
// ensureWireGuardSelfRegistered ensures this node's WireGuard info is in the
// wireguard_peers table. Without this, joining nodes get an empty peer list
// from the /v1/internal/join endpoint and can't establish WG tunnels.
func (n *Node) ensureWireGuardSelfRegistered(ctx context.Context) {
if n.rqliteAdapter == nil {
return
}
// Check if wg0 is active
out, err := exec.CommandContext(ctx, "wg", "show", "wg0").CombinedOutput()
if err != nil {
return // WG not active, nothing to register
}
// Get local public key
localPubKey := parseWGShowLocalKey(string(out))
if localPubKey == "" {
return
}
// Get WG IP from interface
wgIP := ""
iface, err := net.InterfaceByName("wg0")
if err != nil {
return
}
addrs, err := iface.Addrs()
if err != nil {
return
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
wgIP = ipnet.IP.String()
break
}
}
if wgIP == "" {
return
}
// Get public IP
publicIP, err := n.getNodeIPAddress()
if err != nil {
return
}
nodeID := n.GetPeerID()
if nodeID == "" {
nodeID = fmt.Sprintf("node-%s", wgIP)
}
// Query local IPFS peer ID
ipfsPeerID := queryLocalIPFSPeerID()
db := n.rqliteAdapter.GetSQLDB()
// Clean up stale entries for this public IP with a different node_id.
// This prevents ghost peers from previous installs or from the temporary
// "node-10.0.0.X" ID that the join handler creates.
if _, err := rqlite.SafeExecContext(db, ctx,
"DELETE FROM wireguard_peers WHERE public_ip = ? AND node_id != ?",
publicIP, nodeID); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "Failed to clean stale WG entries", zap.Error(err))
}
_, err = rqlite.SafeExecContext(db, ctx,
"INSERT OR REPLACE INTO wireguard_peers (node_id, wg_ip, public_key, public_ip, wg_port, ipfs_peer_id) VALUES (?, ?, ?, ?, ?, ?)",
nodeID, wgIP, localPubKey, publicIP, 51820, ipfsPeerID)
if err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "Failed to self-register WG peer", zap.Error(err))
} else {
n.logger.ComponentInfo(logging.ComponentNode, "WireGuard self-registered",
zap.String("wg_ip", wgIP),
zap.String("public_key", localPubKey[:8]+"..."),
zap.String("ipfs_peer_id", ipfsPeerID))
}
}
// queryLocalIPFSPeerID queries the local IPFS daemon for its peer ID
func queryLocalIPFSPeerID() string {
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Post("http://localhost:4501/api/v0/id", "", nil)
if err != nil {
return ""
}
defer resp.Body.Close()
var result struct {
ID string `json:"ID"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return ""
}
return result.ID
}
// startWireGuardSyncLoop runs syncWireGuardPeers periodically
func (n *Node) startWireGuardSyncLoop(ctx context.Context) {
// Ensure this node is registered in wireguard_peers (critical for join flow)
n.ensureWireGuardSelfRegistered(ctx)
// Run initial sync
if err := n.syncWireGuardPeers(ctx); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "initial WireGuard peer sync failed", zap.Error(err))
}
// Periodic sync every 60 seconds
go func() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Re-register self on every tick to pick up IPFS peer ID if it wasn't
// ready at startup (INSERT OR REPLACE is idempotent)
n.ensureWireGuardSelfRegistered(ctx)
if err := n.syncWireGuardPeers(ctx); err != nil {
n.logger.ComponentWarn(logging.ComponentNode, "WireGuard peer sync failed", zap.Error(err))
}
}
}
}()
}
// parseWGShowPeers extracts public keys of current peers from `wg show wg0` output
func parseWGShowPeers(output string) map[string]struct{} {
peers := make(map[string]struct{})
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "peer:") {
key := strings.TrimSpace(strings.TrimPrefix(line, "peer:"))
if key != "" {
peers[key] = struct{}{}
}
}
}
return peers
}
// parseWGShowLocalKey extracts the local public key from `wg show wg0` output
func parseWGShowLocalKey(output string) string {
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "public key:") {
return strings.TrimSpace(strings.TrimPrefix(line, "public key:"))
}
}
return ""
}