From 02b5c095d06319cf34675f2943166ae72d28e037 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Thu, 5 Feb 2026 16:12:52 +0200 Subject: [PATCH] More bug fixing --- pkg/client/client.go | 7 + pkg/client/interface.go | 5 + pkg/gateway/gateway.go | 50 ++++ pkg/gateway/peer_discovery.go | 433 ++++++++++++++++++++++++++++++++++ 4 files changed, 495 insertions(+) create mode 100644 pkg/gateway/peer_discovery.go diff --git a/pkg/client/client.go b/pkg/client/client.go index 82e844e..d152962 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -113,6 +113,13 @@ func (c *Client) Config() *ClientConfig { return &cp } +// Host returns the underlying libp2p host for advanced usage +func (c *Client) Host() host.Host { + c.mu.RLock() + defer c.mu.RUnlock() + return c.host +} + // Connect establishes connection to the network func (c *Client) Connect() error { c.mu.Lock() diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 944ebc3..2c7e40b 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -4,6 +4,8 @@ import ( "context" "io" "time" + + "github.com/libp2p/go-libp2p/core/host" ) // NetworkClient provides the main interface for applications to interact with the network @@ -27,6 +29,9 @@ type NetworkClient interface { // Config access (snapshot copy) Config() *ClientConfig + + // Host returns the underlying libp2p host (for advanced usage like peer discovery) + Host() host.Host } // DatabaseClient provides database operations for applications diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 20a19e9..0bb8d8b 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -13,6 +13,8 @@ import ( "net/http" "path/filepath" "reflect" + "strconv" + "strings" "sync" "time" @@ -120,6 +122,9 @@ type Gateway struct { // Namespace delete handler namespaceDeleteHandler http.Handler + + // Peer discovery for namespace gateways (libp2p mesh formation) + peerDiscovery *PeerDiscovery } // localSubscriber represents a WebSocket subscriber for local message delivery @@ -450,6 +455,51 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { gw.startOlricReconnectLoop(olricCfg) } + // Initialize peer discovery for namespace gateways + // This allows the 3 namespace gateway instances to discover each other + if cfg.ClientNamespace != "" && cfg.ClientNamespace != "default" && deps.Client != nil { + logger.ComponentInfo(logging.ComponentGeneral, "Initializing peer discovery for namespace gateway...", + zap.String("namespace", cfg.ClientNamespace)) + + // Get libp2p host from client + host := deps.Client.Host() + if host != nil { + // Parse listen port from ListenAddr (format: ":port" or "addr:port") + listenPort := 0 + if cfg.ListenAddr != "" { + parts := strings.Split(cfg.ListenAddr, ":") + if len(parts) > 0 { + portStr := parts[len(parts)-1] + if p, err := strconv.Atoi(portStr); err == nil { + listenPort = p + } + } + } + + // Create peer discovery manager + gw.peerDiscovery = NewPeerDiscovery( + host, + deps.SQLDB, + cfg.NodePeerID, + listenPort, + cfg.ClientNamespace, + logger.Logger, + ) + + // Start peer discovery + ctx := context.Background() + if err := gw.peerDiscovery.Start(ctx); err != nil { + logger.ComponentWarn(logging.ComponentGeneral, "Failed to start peer discovery", + zap.Error(err)) + } else { + logger.ComponentInfo(logging.ComponentGeneral, "Peer discovery started successfully", + zap.String("namespace", cfg.ClientNamespace)) + } + } else { + logger.ComponentWarn(logging.ComponentGeneral, "Cannot initialize peer discovery: libp2p host not available") + } + } + logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed") return gw, nil } diff --git a/pkg/gateway/peer_discovery.go b/pkg/gateway/peer_discovery.go new file mode 100644 index 0000000..e6f82d9 --- /dev/null +++ b/pkg/gateway/peer_discovery.go @@ -0,0 +1,433 @@ +package gateway + +import ( + "context" + "database/sql" + "fmt" + "os" + "os/exec" + "strings" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "go.uber.org/zap" +) + +// PeerDiscovery manages namespace gateway peer discovery via RQLite +type PeerDiscovery struct { + host host.Host + rqliteDB *sql.DB + nodeID string + listenPort int + namespace string + logger *zap.Logger + + // Stop channel for background goroutines + stopCh chan struct{} +} + +// NewPeerDiscovery creates a new peer discovery manager +func NewPeerDiscovery(h host.Host, rqliteDB *sql.DB, nodeID string, listenPort int, namespace string, logger *zap.Logger) *PeerDiscovery { + return &PeerDiscovery{ + host: h, + rqliteDB: rqliteDB, + nodeID: nodeID, + listenPort: listenPort, + namespace: namespace, + logger: logger, + stopCh: make(chan struct{}), + } +} + +// Start initializes the peer discovery system +func (pd *PeerDiscovery) Start(ctx context.Context) error { + pd.logger.Info("Starting peer discovery", + zap.String("namespace", pd.namespace), + zap.String("peer_id", pd.host.ID().String()), + zap.String("node_id", pd.nodeID)) + + // 1. Create discovery table if it doesn't exist + if err := pd.initTable(ctx); err != nil { + return fmt.Errorf("failed to initialize discovery table: %w", err) + } + + // 2. Register ourselves + if err := pd.registerSelf(ctx); err != nil { + return fmt.Errorf("failed to register self: %w", err) + } + + // 3. Discover and connect to existing peers + if err := pd.discoverPeers(ctx); err != nil { + pd.logger.Warn("Initial peer discovery failed (will retry in background)", + zap.Error(err)) + } + + // 4. Start background goroutines + go pd.heartbeatLoop(ctx) + go pd.discoveryLoop(ctx) + + pd.logger.Info("Peer discovery started successfully", + zap.String("namespace", pd.namespace)) + + return nil +} + +// Stop stops the peer discovery system +func (pd *PeerDiscovery) Stop(ctx context.Context) error { + pd.logger.Info("Stopping peer discovery", + zap.String("namespace", pd.namespace)) + + // Signal background goroutines to stop + close(pd.stopCh) + + // Unregister ourselves from the discovery table + if err := pd.unregisterSelf(ctx); err != nil { + pd.logger.Warn("Failed to unregister self from discovery table", + zap.Error(err)) + } + + pd.logger.Info("Peer discovery stopped", + zap.String("namespace", pd.namespace)) + + return nil +} + +// initTable creates the peer discovery table if it doesn't exist +func (pd *PeerDiscovery) initTable(ctx context.Context) error { + query := ` + CREATE TABLE IF NOT EXISTS _namespace_libp2p_peers ( + peer_id TEXT PRIMARY KEY, + multiaddr TEXT NOT NULL, + node_id TEXT NOT NULL, + listen_port INTEGER NOT NULL, + namespace TEXT NOT NULL, + last_seen TIMESTAMP NOT NULL + ) + ` + + _, err := pd.rqliteDB.ExecContext(ctx, query) + if err != nil { + return fmt.Errorf("failed to create discovery table: %w", err) + } + + pd.logger.Debug("Peer discovery table initialized", + zap.String("namespace", pd.namespace)) + + return nil +} + +// registerSelf registers this gateway in the discovery table +func (pd *PeerDiscovery) registerSelf(ctx context.Context) error { + peerID := pd.host.ID().String() + + // Get WireGuard IP from host addresses + wireguardIP, err := pd.getWireGuardIP() + if err != nil { + return fmt.Errorf("failed to get WireGuard IP: %w", err) + } + + // Build multiaddr: /ip4//tcp//p2p/ + multiaddr := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", wireguardIP, pd.listenPort, peerID) + + query := ` + INSERT OR REPLACE INTO _namespace_libp2p_peers + (peer_id, multiaddr, node_id, listen_port, namespace, last_seen) + VALUES (?, ?, ?, ?, ?, ?) + ` + + _, err = pd.rqliteDB.ExecContext(ctx, query, + peerID, + multiaddr, + pd.nodeID, + pd.listenPort, + pd.namespace, + time.Now().UTC()) + + if err != nil { + return fmt.Errorf("failed to register self in discovery table: %w", err) + } + + pd.logger.Info("Registered self in peer discovery", + zap.String("peer_id", peerID), + zap.String("multiaddr", multiaddr), + zap.String("node_id", pd.nodeID)) + + return nil +} + +// unregisterSelf removes this gateway from the discovery table +func (pd *PeerDiscovery) unregisterSelf(ctx context.Context) error { + peerID := pd.host.ID().String() + + query := `DELETE FROM _namespace_libp2p_peers WHERE peer_id = ?` + + _, err := pd.rqliteDB.ExecContext(ctx, query, peerID) + if err != nil { + return fmt.Errorf("failed to unregister self: %w", err) + } + + pd.logger.Info("Unregistered self from peer discovery", + zap.String("peer_id", peerID)) + + return nil +} + +// discoverPeers queries RQLite for other namespace gateways and connects to them +func (pd *PeerDiscovery) discoverPeers(ctx context.Context) error { + myPeerID := pd.host.ID().String() + + // Query for peers that have been seen in the last 5 minutes + query := ` + SELECT peer_id, multiaddr, node_id + FROM _namespace_libp2p_peers + WHERE peer_id != ? + AND namespace = ? + AND last_seen > datetime('now', '-5 minutes') + ` + + rows, err := pd.rqliteDB.QueryContext(ctx, query, myPeerID, pd.namespace) + if err != nil { + return fmt.Errorf("failed to query peers: %w", err) + } + defer rows.Close() + + discoveredCount := 0 + connectedCount := 0 + + for rows.Next() { + var peerID, multiaddrStr, nodeID string + if err := rows.Scan(&peerID, &multiaddrStr, &nodeID); err != nil { + pd.logger.Warn("Failed to scan peer row", zap.Error(err)) + continue + } + + discoveredCount++ + + // Parse peer ID + remotePeerID, err := peer.Decode(peerID) + if err != nil { + pd.logger.Warn("Failed to decode peer ID", + zap.String("peer_id", peerID), + zap.Error(err)) + continue + } + + // Parse multiaddr + maddr, err := multiaddr.NewMultiaddr(multiaddrStr) + if err != nil { + pd.logger.Warn("Failed to parse multiaddr", + zap.String("multiaddr", multiaddrStr), + zap.Error(err)) + continue + } + + // Check if already connected + connectedness := pd.host.Network().Connectedness(remotePeerID) + if connectedness == 1 { // Connected + pd.logger.Debug("Already connected to peer", + zap.String("peer_id", peerID), + zap.String("node_id", nodeID)) + connectedCount++ + continue + } + + // Convert multiaddr to peer.AddrInfo + addrInfo, err := peer.AddrInfoFromP2pAddr(maddr) + if err != nil { + pd.logger.Warn("Failed to create AddrInfo", + zap.String("multiaddr", multiaddrStr), + zap.Error(err)) + continue + } + + // Connect to peer + connectCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + err = pd.host.Connect(connectCtx, *addrInfo) + cancel() + + if err != nil { + pd.logger.Warn("Failed to connect to peer", + zap.String("peer_id", peerID), + zap.String("node_id", nodeID), + zap.String("multiaddr", multiaddrStr), + zap.Error(err)) + continue + } + + pd.logger.Info("Connected to namespace gateway peer", + zap.String("peer_id", peerID), + zap.String("node_id", nodeID), + zap.String("multiaddr", multiaddrStr)) + + connectedCount++ + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("error iterating peer rows: %w", err) + } + + pd.logger.Info("Peer discovery completed", + zap.Int("discovered", discoveredCount), + zap.Int("connected", connectedCount)) + + return nil +} + +// heartbeatLoop periodically updates the last_seen timestamp +func (pd *PeerDiscovery) heartbeatLoop(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-pd.stopCh: + return + case <-ctx.Done(): + return + case <-ticker.C: + if err := pd.updateHeartbeat(ctx); err != nil { + pd.logger.Warn("Failed to update heartbeat", + zap.Error(err)) + } + } + } +} + +// discoveryLoop periodically discovers new peers +func (pd *PeerDiscovery) discoveryLoop(ctx context.Context) { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + + for { + select { + case <-pd.stopCh: + return + case <-ctx.Done(): + return + case <-ticker.C: + if err := pd.discoverPeers(ctx); err != nil { + pd.logger.Warn("Periodic peer discovery failed", + zap.Error(err)) + } + } + } +} + +// updateHeartbeat updates the last_seen timestamp for this gateway +func (pd *PeerDiscovery) updateHeartbeat(ctx context.Context) error { + peerID := pd.host.ID().String() + + query := ` + UPDATE _namespace_libp2p_peers + SET last_seen = ? + WHERE peer_id = ? + ` + + _, err := pd.rqliteDB.ExecContext(ctx, query, time.Now().UTC(), peerID) + if err != nil { + return fmt.Errorf("failed to update heartbeat: %w", err) + } + + pd.logger.Debug("Updated heartbeat", + zap.String("peer_id", peerID)) + + return nil +} + +// getWireGuardIP extracts the WireGuard IP from the WireGuard interface +func (pd *PeerDiscovery) getWireGuardIP() (string, error) { + // Method 1: Use 'ip addr show wg0' command (works without root) + ip, err := pd.getWireGuardIPFromInterface() + if err == nil { + pd.logger.Info("Found WireGuard IP from network interface", + zap.String("ip", ip)) + return ip, nil + } + pd.logger.Debug("Failed to get WireGuard IP from interface", zap.Error(err)) + + // Method 2: Try to read from WireGuard config file (requires root, may fail) + configPath := "/etc/wireguard/wg0.conf" + data, err := os.ReadFile(configPath) + if err == nil { + // Parse Address line from config + lines := strings.Split(string(data), "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "Address") { + // Format: Address = 10.0.0.X/24 + parts := strings.Split(line, "=") + if len(parts) == 2 { + addrWithCIDR := strings.TrimSpace(parts[1]) + // Remove /24 suffix + ip := strings.Split(addrWithCIDR, "/")[0] + ip = strings.TrimSpace(ip) + pd.logger.Info("Found WireGuard IP from config", + zap.String("ip", ip)) + return ip, nil + } + } + } + } + pd.logger.Debug("Failed to read WireGuard config", zap.Error(err)) + + // Method 3: Fallback - Try to get from libp2p host addresses + for _, addr := range pd.host.Addrs() { + addrStr := addr.String() + // Look for /ip4/10.0.0.x pattern + if len(addrStr) > 10 && addrStr[:9] == "/ip4/10.0" { + // Extract IP address + parts := addr.String() + // Parse /ip4//... format + if len(parts) > 5 { + // Find the IP between /ip4/ and next / + start := 5 // after "/ip4/" + end := start + for end < len(parts) && parts[end] != '/' { + end++ + } + if end > start { + ip := parts[start:end] + pd.logger.Info("Found WireGuard IP from libp2p addresses", + zap.String("ip", ip)) + return ip, nil + } + } + } + } + + return "", fmt.Errorf("could not determine WireGuard IP") +} + +// getWireGuardIPFromInterface gets the WireGuard IP using 'ip addr show wg0' +func (pd *PeerDiscovery) getWireGuardIPFromInterface() (string, error) { + cmd := exec.Command("ip", "addr", "show", "wg0") + output, err := cmd.Output() + if err != nil { + return "", fmt.Errorf("failed to run 'ip addr show wg0': %w", err) + } + + // Parse output to find inet line + // Example: " inet 10.0.0.4/24 scope global wg0" + lines := strings.Split(string(output), "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "inet ") && !strings.Contains(line, "inet6") { + // Extract IP address (first field after "inet ") + fields := strings.Fields(line) + if len(fields) >= 2 { + // Remove CIDR notation (/24) + addrWithCIDR := fields[1] + ip := strings.Split(addrWithCIDR, "/")[0] + + // Verify it's a 10.0.0.x address + if strings.HasPrefix(ip, "10.0.0.") { + return ip, nil + } + } + } + } + + return "", fmt.Errorf("could not find WireGuard IP in 'ip addr show wg0' output") +}