diff --git a/README.md b/README.md index 907382f..fcdfa6b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Network - Distributed P2P Database System v0.12.5-beta +# Network - Distributed P2P Database System v0.19.0-beta A distributed peer-to-peer network built with Go and LibP2P, providing decentralized database capabilities with RQLite consensus and replication. @@ -147,7 +147,7 @@ The system uses these ports by default: - **5001**: RQLite HTTP API - **7001**: RQLite Raft consensus -Ensure these ports are available or configure firewall rules accordingly. +Ensure these ports are available or configure firewall rules accordingly. The system will also use +1 for each extra node started. For example 4002, 5002, 7002 ## Quick Start diff --git a/cli b/cli new file mode 100755 index 0000000..ab18183 Binary files /dev/null and b/cli differ diff --git a/cmd/node/configmap.go b/cmd/node/configmap.go deleted file mode 100644 index 0da90cd..0000000 --- a/cmd/node/configmap.go +++ /dev/null @@ -1,163 +0,0 @@ -package main - -import ( - "fmt" - "os" - "strings" - - "git.debros.io/DeBros/network/pkg/config" - "git.debros.io/DeBros/network/pkg/client" - "git.debros.io/DeBros/network/pkg/constants" - "git.debros.io/DeBros/network/pkg/logging" -) - -// NodeFlagValues holds parsed CLI flag values in a structured form. -type NodeFlagValues struct { - DataDir string - NodeID string - Bootstrap string - Role string - P2PPort int - RqlHTTP int - RqlRaft int - Advertise string -} - -// isTruthyEnv returns true if the environment variable is set to a common truthy value -func isTruthyEnv(key string) bool { - v := strings.ToLower(strings.TrimSpace(os.Getenv(key))) - switch v { - case "1", "true", "yes", "on": - return true - default: - return false - } -} - -// MapFlagsAndEnvToConfig applies environment overrides and CLI flags to cfg. -// Precedence: flags > env > defaults. Behavior mirrors previous inline logic in main.go. -// Returns the derived RQLite Raft join address for non-bootstrap nodes (empty for bootstrap nodes). -func MapFlagsAndEnvToConfig(cfg *config.Config, fv NodeFlagValues, isBootstrap bool, logger *logging.StandardLogger) string { - // Apply environment variable overrides first so that flags can override them after - config.ApplyEnvOverrides(cfg) - - // Detect dev-local mode (set via -dev-local -> NETWORK_DEV_LOCAL=1) - devLocal := isTruthyEnv("NETWORK_DEV_LOCAL") - - // Determine data directory if not provided - if fv.DataDir == "" { - if isBootstrap { - fv.DataDir = "./data/bootstrap" - } else { - if fv.NodeID != "" { - fv.DataDir = fmt.Sprintf("./data/node-%s", fv.NodeID) - } else { - fv.DataDir = "./data/node" - } - } - } - - // Node basics - cfg.Node.DataDir = fv.DataDir - cfg.Node.ListenAddresses = []string{ - fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", fv.P2PPort), - fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", fv.P2PPort), - } - - // Database port settings - cfg.Database.RQLitePort = fv.RqlHTTP - cfg.Database.RQLiteRaftPort = fv.RqlRaft - cfg.Database.AdvertiseMode = strings.ToLower(fv.Advertise) - logger.Printf("RQLite advertise mode: %s", cfg.Database.AdvertiseMode) - - // Bootstrap-specific vs regular-node logic - if isBootstrap { - if devLocal { - // In dev-local, run a primary bootstrap locally - cfg.Database.RQLiteJoinAddress = "" - // Do not set bootstrap peers to avoid including self; clients can still - // derive DB endpoints via DefaultDatabaseEndpoints in dev-local. - logger.Printf("Dev-local: Primary bootstrap node - localhost defaults enabled (no bootstrap peers set to avoid self)") - return "" - } - bootstrapPeers := constants.GetBootstrapPeers() - isSecondaryBootstrap := false - if len(bootstrapPeers) > 1 { - for i := 1; i < len(bootstrapPeers); i++ { - host := parseHostFromMultiaddr(bootstrapPeers[i]) - if host != "" && isLocalIP(host) { - isSecondaryBootstrap = true - break - } - } - } - - if isSecondaryBootstrap { - primaryBootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0]) - cfg.Database.RQLiteJoinAddress = fmt.Sprintf("%s:%d", primaryBootstrapHost, 7001) - logger.Printf("Secondary bootstrap node - joining primary bootstrap (raft) at: %s", cfg.Database.RQLiteJoinAddress) - } else { - cfg.Database.RQLiteJoinAddress = "" - logger.Printf("Primary bootstrap node - starting new RQLite cluster") - } - - return "" - } - - // Regular node: compute bootstrap peers and join address - var rqliteJoinAddr string - if fv.Bootstrap != "" { - cfg.Discovery.BootstrapPeers = []string{fv.Bootstrap} - bootstrapHost := parseHostFromMultiaddr(fv.Bootstrap) - if bootstrapHost != "" { - if (bootstrapHost == "127.0.0.1" || strings.EqualFold(bootstrapHost, "localhost")) && cfg.Database.AdvertiseMode != "localhost" { - if extIP, err := getPreferredLocalIP(); err == nil && extIP != "" { - logger.Printf("Translating localhost bootstrap to external IP %s for RQLite join", extIP) - bootstrapHost = extIP - } else { - logger.Printf("Warning: Failed to resolve external IP, keeping localhost for RQLite join") - } - } - rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001) - logger.Printf("Using extracted bootstrap host %s for RQLite Raft join (port 7001)", bootstrapHost) - } else { - logger.Printf("Warning: Could not extract host from bootstrap peer %s, using localhost fallback", fv.Bootstrap) - rqliteJoinAddr = fmt.Sprintf("localhost:%d", 7001) - } - logger.Printf("Using command line bootstrap peer: %s", fv.Bootstrap) - } else { - bootstrapPeers := cfg.Discovery.BootstrapPeers - if devLocal { - // Force localhost bootstrap for development - bootstrapPeers = client.DefaultBootstrapPeers() - logger.Printf("Dev-local: overriding bootstrap peers to %v", bootstrapPeers) - } - if len(bootstrapPeers) == 0 { - bootstrapPeers = constants.GetBootstrapPeers() - } - if len(bootstrapPeers) > 0 { - cfg.Discovery.BootstrapPeers = bootstrapPeers - bootstrapHost := parseHostFromMultiaddr(bootstrapPeers[0]) - if bootstrapHost != "" { - rqliteJoinAddr = fmt.Sprintf("%s:%d", bootstrapHost, 7001) - logger.Printf("Using extracted bootstrap host %s for RQLite Raft join", bootstrapHost) - } else { - logger.Printf("Warning: Could not extract host from bootstrap peer %s", bootstrapPeers[0]) - rqliteJoinAddr = "localhost:7001" - } - logger.Printf("Using environment bootstrap peers: %v", bootstrapPeers) - } else { - logger.Printf("Warning: No bootstrap peers configured") - rqliteJoinAddr = "localhost:7001" - logger.Printf("Using localhost fallback for RQLite Raft join") - } - - logger.Printf("=== NETWORK DIAGNOSTICS ===") - logger.Printf("Target RQLite Raft join address: %s", rqliteJoinAddr) - runNetworkDiagnostics(rqliteJoinAddr, logger) - } - - cfg.Database.RQLiteJoinAddress = rqliteJoinAddr - logger.Printf("Regular node - joining RQLite cluster (raft) at: %s", cfg.Database.RQLiteJoinAddress) - return rqliteJoinAddr -} diff --git a/cmd/node/main.go b/cmd/node/main.go index 6700e14..a10c8c7 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -5,132 +5,170 @@ import ( "flag" "fmt" "log" - "net" "os" - "os/exec" "os/signal" "path/filepath" - "strings" "syscall" "git.debros.io/DeBros/network/pkg/anyoneproxy" - "git.debros.io/DeBros/network/pkg/client" "git.debros.io/DeBros/network/pkg/config" - "git.debros.io/DeBros/network/pkg/constants" "git.debros.io/DeBros/network/pkg/logging" "git.debros.io/DeBros/network/pkg/node" + "go.uber.org/zap" ) -func main() { - var ( - dataDir = flag.String("data", "", "Data directory (auto-detected if not provided)") - nodeID = flag.String("id", "", "Node identifier (for running multiple local nodes)") - bootstrap = flag.String("bootstrap", "", "Bootstrap peer address (for manual override)") - role = flag.String("role", "auto", "Node role: auto|bootstrap|node (auto detects based on config)") - p2pPort = flag.Int("p2p-port", 4001, "LibP2P listen port") - rqlHTTP = flag.Int("rqlite-http-port", 5001, "RQLite HTTP API port") - rqlRaft = flag.Int("rqlite-raft-port", 7001, "RQLite Raft port") - advertise = flag.String("advertise", "auto", "Advertise mode: auto|localhost|ip") - devLocal = flag.Bool("dev-local", false, "Enable development localhost defaults for the client library (sets NETWORK_DEV_LOCAL=1)") - disableAnon = flag.Bool("disable-anonrc", false, "Disable Anyone proxy routing (defaults to enabled on 127.0.0.1:9050)") - help = flag.Bool("help", false, "Show help") - ) - flag.Parse() - - // Apply proxy disable flag early - anyoneproxy.SetDisabled(*disableAnon) - - if *help { - flag.Usage() - return - } - - // Enable development localhost defaults for the client library if requested - if *devLocal { - os.Setenv("NETWORK_DEV_LOCAL", "1") - log.Printf("Development local mode enabled (NETWORK_DEV_LOCAL=1)") - } - - // Determine node role - var isBootstrap bool - switch strings.ToLower(*role) { - case "bootstrap": - isBootstrap = true - case "node": - isBootstrap = false - default: - // Auto-detect if this is a bootstrap node based on configuration - isBootstrap = isBootstrapNode() - } - - // Set default data directory if not provided - if *dataDir == "" { - if isBootstrap { - *dataDir = "./data/bootstrap" - } else { - if *nodeID != "" { - *dataDir = fmt.Sprintf("./data/node-%s", *nodeID) - } else { - *dataDir = "./data/node" - } - } - } - - // LibP2P uses configurable port (default 4001); RQLite uses 5001 (HTTP) and 7001 (Raft) - port := *p2pPort - - // Create logger with appropriate component type - var logger *logging.StandardLogger +func setup_logger(component logging.Component) (logger *logging.ColoredLogger) { var err error - if isBootstrap { - logger, err = logging.NewStandardLogger(logging.ComponentBootstrap) - } else { - logger, err = logging.NewStandardLogger(logging.ComponentNode) - } + + logger, err = logging.NewColoredLogger(component, true) if err != nil { log.Fatalf("Failed to create logger: %v", err) } - // Load configuration based on node type - var cfg *config.Config - if isBootstrap { - cfg = config.BootstrapConfig() - logger.Printf("Starting bootstrap node...") + return logger +} + +func parse_and_return_network_flags() (dataDir, nodeID *string, p2pPort, rqlHTTP, rqlRaft *int, disableAnon *bool, rqlJoinAddr *string, help *bool) { + logger := setup_logger(logging.ComponentNode) + + dataDir = flag.String("data", "", "Data directory (auto-detected if not provided)") + nodeID = flag.String("id", "", "Node identifier (for running multiple local nodes)") + p2pPort = flag.Int("p2p-port", 4001, "LibP2P listen port") + rqlHTTP = flag.Int("rqlite-http-port", 5001, "RQLite HTTP API port") + rqlRaft = flag.Int("rqlite-raft-port", 7001, "RQLite Raft port") + disableAnon = flag.Bool("disable-anonrc", false, "Disable Anyone proxy routing (defaults to enabled on 127.0.0.1:9050)") + rqlJoinAddr = flag.String("rqlite-join-address", "", "RQLite address to join (e.g., /ip4/)") + help = flag.Bool("help", false, "Show help") + flag.Parse() + + logger.Info("Successfully parsed all flags and arguments.") + + return +} + +func disable_anon_proxy(disableAnon *bool) bool { + anyoneproxy.SetDisabled(*disableAnon) + logger := setup_logger(logging.ComponentAnyone) + + if *disableAnon { + logger.Info("Anyone proxy routing is disabled. This means the node will not use the default Tor proxy for anonymous routing.\n") + } + + return true +} + +func check_if_should_open_help(help *bool) { + if *help { + flag.Usage() + return + } +} + +func select_data_dir(dataDir *string, nodeID *string) { + logger := setup_logger(logging.ComponentNode) + + if *nodeID == "" { + *dataDir = "./data/node" + } + + logger.Info("Successfully selected Data Directory of: %s", zap.String("dataDir", *dataDir)) +} + +func startNode(ctx context.Context, cfg *config.Config, port int) error { + logger := setup_logger(logging.ComponentNode) + + // Create and start node using the unified node implementation + n, err := node.NewNode(cfg) + if err != nil { + logger.Error("failed to create node: %v", zap.Error(err)) + } + + if err := n.Start(ctx); err != nil { + logger.Error("failed to start node: %v", zap.Error(err)) + } + + // Save the peer ID to a file for CLI access (especially useful for bootstrap) + peerID := n.GetPeerID() + peerInfoFile := filepath.Join(cfg.Node.DataDir, "peer.info") + peerMultiaddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d/p2p/%s", port, peerID) + + if err := os.WriteFile(peerInfoFile, []byte(peerMultiaddr), 0644); err != nil { + logger.Error("Failed to save peer info: %v", zap.Error(err)) } else { - cfg = config.DefaultConfig() - logger.Printf("Starting regular node...") + logger.Info("Peer info saved to: %s", zap.String("path", peerInfoFile)) + logger.Info("Bootstrap multiaddr: %s", zap.String("path", peerMultiaddr)) } - // Centralized mapping from flags/env to config (flags > env > defaults) - _ = MapFlagsAndEnvToConfig(cfg, NodeFlagValues{ - DataDir: *dataDir, - NodeID: *nodeID, - Bootstrap: *bootstrap, - Role: *role, - P2PPort: port, - RqlHTTP: *rqlHTTP, - RqlRaft: *rqlRaft, - Advertise: *advertise, - }, isBootstrap, logger) + logger.Info("Node started successfully") - logger.Printf("Data directory: %s", cfg.Node.DataDir) - logger.Printf("Listen addresses: %v", cfg.Node.ListenAddresses) - logger.Printf("RQLite HTTP port: %d", cfg.Database.RQLitePort) - logger.Printf("RQLite Raft port: %d", cfg.Database.RQLiteRaftPort) + // Wait for context cancellation + <-ctx.Done() - // For development visibility, print what the CLIENT library will return by default - clientBootstrap := client.DefaultBootstrapPeers() - clientDB := client.DefaultDatabaseEndpoints() - logger.Printf("[Client Defaults] Bootstrap peers: %v", clientBootstrap) - logger.Printf("[Client Defaults] Database endpoints: %v", clientDB) - // Also show node-configured values - logger.Printf("[Node Config] Bootstrap peers: %v", cfg.Discovery.BootstrapPeers) - if cfg.Database.RQLiteJoinAddress != "" { - logger.Printf("[Node Config] RQLite Raft join: %s", cfg.Database.RQLiteJoinAddress) - } else if isBootstrap { - logger.Printf("[Node Config] Bootstrap node: starting new RQLite cluster (no join)") + // Stop node + return n.Stop() +} + +// load_args_into_config applies command line argument overrides to the config +func load_args_into_config(cfg *config.Config, p2pPort, rqlHTTP, rqlRaft *int, rqlJoinAddr *string) { + logger := setup_logger(logging.ComponentNode) + + // Apply RQLite HTTP port override + if *rqlHTTP != 5001 { + cfg.Database.RQLitePort = *rqlHTTP + logger.ComponentInfo(logging.ComponentNode, "Overriding RQLite HTTP port", zap.Int("port", *rqlHTTP)) } + // Apply RQLite Raft port override + if *rqlRaft != 7001 { + cfg.Database.RQLiteRaftPort = *rqlRaft + logger.ComponentInfo(logging.ComponentNode, "Overriding RQLite Raft port", zap.Int("port", *rqlRaft)) + } + + // Apply P2P port override + if *p2pPort != 4001 { + cfg.Node.ListenAddresses = []string{ + fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", *p2pPort), + } + logger.ComponentInfo(logging.ComponentNode, "Overriding P2P port", zap.Int("port", *p2pPort)) + } + + // Apply RQLite join address + if *rqlJoinAddr != "" { + cfg.Database.RQLiteJoinAddress = *rqlJoinAddr + logger.ComponentInfo(logging.ComponentNode, "Setting RQLite join address", zap.String("address", *rqlJoinAddr)) + } +} + +func main() { + logger := setup_logger(logging.ComponentNode) + + dataDir, nodeID, p2pPort, rqlHTTP, rqlRaft, disableAnon, rqlJoinAddr, help := parse_and_return_network_flags() + + disable_anon_proxy(disableAnon) + check_if_should_open_help(help) + select_data_dir(dataDir, nodeID) + + // Load Node Configuration + var cfg *config.Config + cfg = config.DefaultConfig() + logger.ComponentInfo(logging.ComponentNode, "Default configuration loaded successfully") + + // Apply command line argument overrides + load_args_into_config(cfg, p2pPort, rqlHTTP, rqlRaft, rqlJoinAddr) + logger.ComponentInfo(logging.ComponentNode, "Command line arguments applied to configuration") + + // LibP2P uses configurable port (default 4001); RQLite uses 5001 (HTTP) and 7001 (Raft) + port := *p2pPort + + logger.ComponentInfo(logging.ComponentNode, "Node configuration summary", + zap.Strings("listen_addresses", cfg.Node.ListenAddresses), + zap.Int("rqlite_http_port", cfg.Database.RQLitePort), + zap.Int("rqlite_raft_port", cfg.Database.RQLiteRaftPort), + zap.Int("p2p_port", port), + zap.Strings("bootstrap_peers", cfg.Discovery.BootstrapPeers), + zap.String("rqlite_join_address", cfg.Database.RQLiteJoinAddress), + zap.String("data_directory", *dataDir)) + // Create context for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -139,7 +177,7 @@ func main() { errChan := make(chan error, 1) doneChan := make(chan struct{}) go func() { - if err := startNode(ctx, cfg, port, isBootstrap, logger); err != nil { + if err := startNode(ctx, cfg, port); err != nil { errChan <- err } close(doneChan) @@ -151,236 +189,13 @@ func main() { select { case err := <-errChan: - logger.Printf("Failed to start node: %v", err) + logger.ComponentError(logging.ComponentNode, "Failed to start node", zap.Error(err)) os.Exit(1) case <-c: - logger.Printf("Shutting down node...") + logger.ComponentInfo(logging.ComponentNode, "Shutting down node...") cancel() // Wait for node goroutine to finish cleanly <-doneChan - logger.Printf("Node shutdown complete") + logger.ComponentInfo(logging.ComponentNode, "Node shutdown complete") } } - -// isBootstrapNode determines if this should be a bootstrap node -// by checking the local machine's configuration and bootstrap peer list -func isBootstrapNode() bool { - // Get the bootstrap peer addresses to check if this machine should be a bootstrap - bootstrapPeers := constants.GetBootstrapPeers() - - // Check if any bootstrap peer is localhost/127.0.0.1 (development) - // or if we're running on a production bootstrap server - hostname, _ := os.Hostname() - - for _, peerAddr := range bootstrapPeers { - // Parse the multiaddr to extract the host - host := parseHostFromMultiaddr(peerAddr) - - // Check if this is a local bootstrap (development) - if host == "127.0.0.1" || host == "localhost" { - return true // In development, assume we're running the bootstrap - } - - // Check if this is a production bootstrap server by IP - if host != "" && isLocalIP(host) { - return true - } - - // Check if this is a production bootstrap server by hostname - if hostname != "" && strings.Contains(peerAddr, hostname) { - return true - } - } - - // Default: if no specific match, run as regular node - return false -} - -// getPreferredLocalIP returns a non-loopback IPv4 address of this machine -func getPreferredLocalIP() (string, error) { - ifaces, err := net.Interfaces() - if err != nil { - return "", err - } - for _, iface := range ifaces { - if (iface.Flags&net.FlagUp) == 0 || (iface.Flags&net.FlagLoopback) != 0 { - continue - } - addrs, err := iface.Addrs() - if err != nil { - continue - } - for _, addr := range addrs { - var ip net.IP - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - } - if ip == nil || ip.IsLoopback() { - continue - } - ip = ip.To4() - if ip == nil { - continue - } - return ip.String(), nil - } - } - return "", fmt.Errorf("no non-loopback IPv4 found") -} - -// isLocalIP checks if the given IP address belongs to this machine -func isLocalIP(ip string) bool { - if ip == "127.0.0.1" || strings.EqualFold(ip, "localhost") { - return true - } - ifaces, err := net.Interfaces() - if err != nil { - return false - } - for _, iface := range ifaces { - if (iface.Flags & net.FlagUp) == 0 { - continue - } - addrs, err := iface.Addrs() - if err != nil { - continue - } - for _, addr := range addrs { - var a net.IP - switch v := addr.(type) { - case *net.IPNet: - a = v.IP - case *net.IPAddr: - a = v.IP - } - if a != nil && a.String() == ip { - return true - } - } - } - return false -} - -// parseHostFromMultiaddr extracts the host from a multiaddr -func parseHostFromMultiaddr(multiaddr string) string { - // Simple parsing for /ip4/host/tcp/port/p2p/peerid format - parts := strings.Split(multiaddr, "/") - - // Look for ip4/ip6/dns host in the multiaddr - for i, part := range parts { - if (part == "ip4" || part == "ip6" || part == "dns" || part == "dns4" || part == "dns6") && i+1 < len(parts) { - return parts[i+1] - } - } - return "" -} - -func startNode(ctx context.Context, cfg *config.Config, port int, isBootstrap bool, logger *logging.StandardLogger) error { - // Create and start node using the unified node implementation - n, err := node.NewNode(cfg) - if err != nil { - return fmt.Errorf("failed to create node: %w", err) - } - - if err := n.Start(ctx); err != nil { - return fmt.Errorf("failed to start node: %w", err) - } - - // Save the peer ID to a file for CLI access (especially useful for bootstrap) - if isBootstrap { - peerID := n.GetPeerID() - peerInfoFile := filepath.Join(cfg.Node.DataDir, "peer.info") - peerMultiaddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/%s", port, peerID) - - if err := os.WriteFile(peerInfoFile, []byte(peerMultiaddr), 0644); err != nil { - logger.Printf("Warning: Failed to save peer info: %v", err) - } else { - logger.Printf("Peer info saved to: %s", peerInfoFile) - logger.Printf("Bootstrap multiaddr: %s", peerMultiaddr) - } - } - - logger.Printf("Node started successfully") - - // Wait for context cancellation - <-ctx.Done() - - // Stop node - return n.Stop() -} - -// runNetworkDiagnostics performs network connectivity tests -func runNetworkDiagnostics(target string, logger *logging.StandardLogger) { - // If target has scheme, treat as HTTP URL. Otherwise treat as host:port raft. - var host, port string - if strings.HasPrefix(target, "http://") || strings.HasPrefix(target, "https://") { - url := strings.TrimPrefix(strings.TrimPrefix(target, "http://"), "https://") - parts := strings.Split(url, ":") - if len(parts) == 2 { - host, port = parts[0], parts[1] - } - } else { - parts := strings.Split(target, ":") - if len(parts) == 2 { - host, port = parts[0], parts[1] - } - } - if host == "" || port == "" { - logger.Printf("Cannot parse host:port from %s", target) - return - } - - logger.Printf("Testing TCP connectivity to %s:%s", host, port) - if output, err := exec.Command("timeout", "5", "nc", "-z", "-v", host, port).CombinedOutput(); err == nil { - logger.Printf("✅ Port %s:%s is reachable", host, port) - logger.Printf("netcat output: %s", strings.TrimSpace(string(output))) - } else { - logger.Printf("❌ Port %s:%s is NOT reachable", host, port) - logger.Printf("netcat error: %v", err) - logger.Printf("netcat output: %s", strings.TrimSpace(string(output))) - } - - // Also probe HTTP status on port 5001 of the same host, which is the default HTTP API - httpURL := fmt.Sprintf("http://%s:%d/status", host, 5001) - if output, err := exec.Command("timeout", "5", "curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", httpURL).Output(); err == nil { - httpCode := strings.TrimSpace(string(output)) - if httpCode == "200" { - logger.Printf("✅ HTTP service on %s is responding correctly (status: %s)", httpURL, httpCode) - } else { - logger.Printf("⚠️ HTTP service on %s responded with status: %s", httpURL, httpCode) - } - } else { - logger.Printf("❌ HTTP request to %s failed: %v", httpURL, err) - } - - // Ping test - if output, err := exec.Command("ping", "-c", "3", "-W", "2", host).Output(); err == nil { - lines := strings.Split(string(output), "\n") - for _, line := range lines { - if strings.Contains(line, "packet loss") { - logger.Printf("🏓 Ping result: %s", strings.TrimSpace(line)) - break - } - } - } else { - logger.Printf("❌ Ping test failed: %v", err) - } - - // DNS resolution - if output, err := exec.Command("nslookup", host).Output(); err == nil { - logger.Printf("🔍 DNS resolution successful") - lines := strings.Split(string(output), "\n") - for _, line := range lines { - if strings.Contains(line, "Address:") && !strings.Contains(line, "#53") { - logger.Printf("DNS result: %s", strings.TrimSpace(line)) - } - } - } else { - logger.Printf("❌ DNS resolution failed: %v", err) - } - - logger.Printf("=== END DIAGNOSTICS ===") -} diff --git a/go.mod b/go.mod index 0fc8415..5c1bbdd 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( github.com/libp2p/go-netroute v0.2.2 // indirect github.com/libp2p/go-reuseport v0.4.0 // indirect github.com/libp2p/go-yamux/v5 v5.0.0 // indirect + github.com/libp2p/zeroconf/v2 v2.2.0 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/miekg/dns v1.1.66 // indirect diff --git a/go.sum b/go.sum index 858917b..24332ef 100644 --- a/go.sum +++ b/go.sum @@ -184,6 +184,8 @@ github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQsc github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU= github.com/libp2p/go-yamux/v5 v5.0.0 h1:2djUh96d3Jiac/JpGkKs4TO49YhsfLopAoryfPmf+Po= github.com/libp2p/go-yamux/v5 v5.0.0/go.mod h1:en+3cdX51U0ZslwRdRLrvQsdayFt3TSUKvBGErzpWbU= +github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv53Q= +github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk= @@ -192,6 +194,7 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= +github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= github.com/miekg/dns v1.1.66 h1:FeZXOS3VCVsKnEAd+wBkjMC3D2K+ww66Cq3VnCINuJE= github.com/miekg/dns v1.1.66/go.mod h1:jGFzBsSNbJw6z1HYut1RKBKHA9PBdxeHrZG8J+gC2WE= github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c h1:bzE/A84HN25pxAuk9Eej1Kz9OUelF97nAc82bDquQI8= @@ -445,6 +448,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= @@ -482,6 +486,9 @@ golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -503,6 +510,7 @@ golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= diff --git a/node b/node new file mode 100755 index 0000000..4519283 Binary files /dev/null and b/node differ diff --git a/pkg/config/config.go b/pkg/config/config.go index a7a3103..0683a8f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,9 +1,6 @@ package config import ( - "os" - "strconv" - "strings" "time" "github.com/multiformats/go-multiaddr" @@ -42,16 +39,15 @@ type DatabaseConfig struct { RQLitePort int `yaml:"rqlite_port"` // RQLite HTTP API port RQLiteRaftPort int `yaml:"rqlite_raft_port"` // RQLite Raft consensus port RQLiteJoinAddress string `yaml:"rqlite_join_address"` // Address to join RQLite cluster - AdvertiseMode string `yaml:"advertise_mode"` // Advertise mode: "auto" (default), "localhost", or "ip" } // DiscoveryConfig contains peer discovery configuration type DiscoveryConfig struct { BootstrapPeers []string `yaml:"bootstrap_peers"` // Bootstrap peer addresses - EnableMDNS bool `yaml:"enable_mdns"` // Enable mDNS discovery EnableDHT bool `yaml:"enable_dht"` // Enable DHT discovery DHTPrefix string `yaml:"dht_prefix"` // DHT protocol prefix DiscoveryInterval time.Duration `yaml:"discovery_interval"` // Discovery announcement interval + BootstrapPort int `yaml:"bootstrap_port"` // Default port for bootstrap nodes } // SecurityConfig contains security-related configuration @@ -91,27 +87,13 @@ func (c *Config) ParseMultiaddrs() ([]multiaddr.Multiaddr, error) { return addrs, nil } -// GetBootstrapMultiaddrs converts bootstrap peer strings to multiaddr objects -func (c *Config) GetBootstrapMultiaddrs() ([]multiaddr.Multiaddr, error) { - var addrs []multiaddr.Multiaddr - for _, addr := range c.Discovery.BootstrapPeers { - ma, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return nil, err - } - addrs = append(addrs, ma) - } - return addrs, nil -} - // DefaultConfig returns a default configuration func DefaultConfig() *Config { return &Config{ Node: NodeConfig{ Type: "node", ListenAddresses: []string{ - "/ip4/0.0.0.0/tcp/0", - "/ip4/0.0.0.0/udp/0/quic", + "/ip4/0.0.0.0/tcp/0", // TCP only - compatible with Anyone proxy/SOCKS5 }, DataDir: "./data", MaxConnections: 50, @@ -128,14 +110,15 @@ func DefaultConfig() *Config { RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "", // Empty for bootstrap node - AdvertiseMode: "auto", }, Discovery: DiscoveryConfig{ - BootstrapPeers: []string{}, - EnableMDNS: true, - EnableDHT: true, + BootstrapPeers: []string{ + "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWDL6LSjwwP5FwboV9JaTZzuxr8EhjbcZGFfnyFMDt1UDx", + }, + BootstrapPort: 4001, // Default LibP2P port + EnableDHT: false, // Disabled - conflicts with Anyone protocol anonymity DHTPrefix: "/network/kad/1.0.0", - DiscoveryInterval: time.Minute * 5, + DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing }, Security: SecurityConfig{ EnableTLS: false, @@ -147,196 +130,3 @@ func DefaultConfig() *Config { }, } } - -// BootstrapConfig returns a default configuration for bootstrap nodes -func BootstrapConfig() *Config { - config := DefaultConfig() - config.Node.Type = "bootstrap" - config.Node.IsBootstrap = true - config.Node.ListenAddresses = []string{ - "/ip4/0.0.0.0/tcp/4001", - "/ip4/0.0.0.0/udp/4001/quic", - } - return config -} - -// NewConfigFromEnv constructs a config (bootstrap or regular) and applies environment overrides. -// If isBootstrap is true, starts from BootstrapConfig; otherwise from DefaultConfig. -func NewConfigFromEnv(isBootstrap bool) *Config { - var cfg *Config - if isBootstrap { - cfg = BootstrapConfig() - } else { - cfg = DefaultConfig() - } - ApplyEnvOverrides(cfg) - return cfg -} - -// ApplyEnvOverrides mutates cfg based on environment variables. -// Precedence: CLI flags (outside this function) > ENV variables > defaults in code. -func ApplyEnvOverrides(cfg *Config) { - // Node - if v := os.Getenv("NODE_ID"); v != "" { - cfg.Node.ID = v - } - if v := os.Getenv("NODE_TYPE"); v != "" { // "bootstrap" or "node" - cfg.Node.Type = strings.ToLower(v) - cfg.Node.IsBootstrap = cfg.Node.Type == "bootstrap" - } - if v := os.Getenv("NODE_LISTEN_ADDRESSES"); v != "" { - parts := splitAndTrim(v) - if len(parts) > 0 { - cfg.Node.ListenAddresses = parts - } - } - if v := os.Getenv("DATA_DIR"); v != "" { - cfg.Node.DataDir = v - } - if v := os.Getenv("MAX_CONNECTIONS"); v != "" { - if n, err := strconv.Atoi(v); err == nil { - cfg.Node.MaxConnections = n - } - } - - // Database - if v := os.Getenv("DB_DATA_DIR"); v != "" { - cfg.Database.DataDir = v - } - if v := os.Getenv("REPLICATION_FACTOR"); v != "" { - if n, err := strconv.Atoi(v); err == nil { - cfg.Database.ReplicationFactor = n - } - } - if v := os.Getenv("SHARD_COUNT"); v != "" { - if n, err := strconv.Atoi(v); err == nil { - cfg.Database.ShardCount = n - } - } - if v := os.Getenv("MAX_DB_SIZE"); v != "" { // bytes - if n, err := parseInt64(v); err == nil { - cfg.Database.MaxDatabaseSize = n - } - } - if v := os.Getenv("BACKUP_INTERVAL"); v != "" { // duration, e.g. 24h - if d, err := time.ParseDuration(v); err == nil { - cfg.Database.BackupInterval = d - } - } - if v := os.Getenv("RQLITE_HTTP_PORT"); v != "" { - if n, err := strconv.Atoi(v); err == nil { - cfg.Database.RQLitePort = n - } - } - if v := os.Getenv("RQLITE_RAFT_PORT"); v != "" { - if n, err := strconv.Atoi(v); err == nil { - cfg.Database.RQLiteRaftPort = n - } - } - if v := os.Getenv("RQLITE_JOIN_ADDRESS"); v != "" { - cfg.Database.RQLiteJoinAddress = v - } - if v := os.Getenv("ADVERTISE_MODE"); v != "" { // auto | localhost | ip - cfg.Database.AdvertiseMode = strings.ToLower(v) - } - - // Discovery - if v := os.Getenv("BOOTSTRAP_PEERS"); v != "" { - parts := splitAndTrim(v) - if len(parts) > 0 { - cfg.Discovery.BootstrapPeers = parts - } - } - if v := os.Getenv("ENABLE_MDNS"); v != "" { - if b, err := parseBool(v); err == nil { - cfg.Discovery.EnableMDNS = b - } - } - if v := os.Getenv("ENABLE_DHT"); v != "" { - if b, err := parseBool(v); err == nil { - cfg.Discovery.EnableDHT = b - } - } - if v := os.Getenv("DHT_PREFIX"); v != "" { - cfg.Discovery.DHTPrefix = v - } - if v := os.Getenv("DISCOVERY_INTERVAL"); v != "" { // e.g. 5m - if d, err := time.ParseDuration(v); err == nil { - cfg.Discovery.DiscoveryInterval = d - } - } - - // Security - if v := os.Getenv("ENABLE_TLS"); v != "" { - if b, err := parseBool(v); err == nil { - cfg.Security.EnableTLS = b - } - } - if v := os.Getenv("PRIVATE_KEY_FILE"); v != "" { - cfg.Security.PrivateKeyFile = v - } - if v := os.Getenv("CERT_FILE"); v != "" { - cfg.Security.CertificateFile = v - } - if v := os.Getenv("AUTH_ENABLED"); v != "" { - if b, err := parseBool(v); err == nil { - cfg.Security.AuthEnabled = b - } - } - - // Logging - if v := os.Getenv("LOG_LEVEL"); v != "" { - cfg.Logging.Level = strings.ToLower(v) - } - if v := os.Getenv("LOG_FORMAT"); v != "" { - cfg.Logging.Format = strings.ToLower(v) - } - if v := os.Getenv("LOG_OUTPUT_FILE"); v != "" { - cfg.Logging.OutputFile = v - } -} - -// Helpers -func splitAndTrim(csv string) []string { - parts := strings.Split(csv, ",") - out := make([]string, 0, len(parts)) - for _, p := range parts { - s := strings.TrimSpace(p) - if s != "" { - out = append(out, s) - } - } - return out -} - -func parseBool(s string) (bool, error) { - switch strings.ToLower(strings.TrimSpace(s)) { - case "1", "true", "t", "yes", "y", "on": - return true, nil - case "0", "false", "f", "no", "n", "off": - return false, nil - default: - return strconv.ParseBool(s) - } -} - -func parseInt64(s string) (int64, error) { - // Allow plain int or with optional suffixes k, m, g (base-1024) - s = strings.TrimSpace(strings.ToLower(s)) - mul := int64(1) - if strings.HasSuffix(s, "k") { - mul = 1024 - s = strings.TrimSuffix(s, "k") - } else if strings.HasSuffix(s, "m") { - mul = 1024 * 1024 - s = strings.TrimSuffix(s, "m") - } else if strings.HasSuffix(s, "g") { - mul = 1024 * 1024 * 1024 - s = strings.TrimSuffix(s, "g") - } - n, err := strconv.ParseInt(strings.TrimSpace(s), 10, 64) - if err != nil { - return 0, err - } - return n * mul, nil -} diff --git a/pkg/constants/bootstrap.go b/pkg/constants/bootstrap.go index f9824ca..1aae153 100644 --- a/pkg/constants/bootstrap.go +++ b/pkg/constants/bootstrap.go @@ -2,24 +2,21 @@ package constants import ( "os" + + "git.debros.io/DeBros/network/pkg/config" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" ) // Bootstrap node configuration var ( - // BootstrapPeerIDs are the fixed peer IDs for bootstrap nodes - // Each corresponds to a specific Ed25519 private key - BootstrapPeerIDs []string - // BootstrapAddresses are the full multiaddrs for bootstrap nodes BootstrapAddresses []string // BootstrapPort is the default port for bootstrap nodes (LibP2P) BootstrapPort int = 4001 - // Primary bootstrap peer ID (first in the list) - BootstrapPeerID string - - // Primary bootstrap address (first in the list) + // Primary bootstrap address (first in the list) - for backward compatibility BootstrapAddress string ) @@ -29,28 +26,15 @@ func init() { updateBackwardCompatibilityConstants() } -// setDefaultBootstrapConfig sets default bootstrap configuration +// setDefaultBootstrapConfig sets default bootstrap configuration for local development func setDefaultBootstrapConfig() { - // Check if we're in production environment - BootstrapPeerIDs = []string{ - // "12D3KooWNxt9bNvqftdqXg98JcUHreGxedWSZRUbyqXJ6CW7GaD4", - // "12D3KooWGbdnA22bN24X2gyY1o9jozwTBq9wbfvwtJ7G4XQ9JgFm", - "12D3KooWDL6LSjwwP5FwboV9JaTZzuxr8EhjbcZGFfnyFMDt1UDx", - } - BootstrapAddresses = []string{ - // "/ip4/57.129.81.31/tcp/4001/p2p/12D3KooWNxt9bNvqftdqXg98JcUHreGxedWSZRUbyqXJ6CW7GaD4", - // "/ip4/38.242.250.186/tcp/4001/p2p/12D3KooWGbdnA22bN24X2gyY1o9jozwTBq9wbfvwtJ7G4XQ9JgFm", - "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWDL6LSjwwP5FwboV9JaTZzuxr8EhjbcZGFfnyFMDt1UDx", - } - - BootstrapPort = 4001 + var cfg *config.Config + BootstrapAddresses = cfg.Discovery.BootstrapPeers + BootstrapPort = cfg.Discovery.BootstrapPort } // updateBackwardCompatibilityConstants updates the single constants for backward compatibility func updateBackwardCompatibilityConstants() { - if len(BootstrapPeerIDs) > 0 { - BootstrapPeerID = BootstrapPeerIDs[0] - } if len(BootstrapAddresses) > 0 { BootstrapAddress = BootstrapAddresses[0] } @@ -67,20 +51,26 @@ func GetBootstrapPeers() []string { return peers } -// GetBootstrapPeerIDs returns a copy of all bootstrap peer IDs +// GetBootstrapPeerIDs extracts and returns peer IDs from bootstrap addresses func GetBootstrapPeerIDs() []string { - if len(BootstrapPeerIDs) == 0 { + if len(BootstrapAddresses) == 0 { setDefaultBootstrapConfig() updateBackwardCompatibilityConstants() } - ids := make([]string, len(BootstrapPeerIDs)) - copy(ids, BootstrapPeerIDs) + + var ids []string + for _, addr := range BootstrapAddresses { + if ma, err := multiaddr.NewMultiaddr(addr); err == nil { + if pi, err := peer.AddrInfoFromP2pAddr(ma); err == nil { + ids = append(ids, pi.ID.String()) + } + } + } return ids } -// AddBootstrapPeer adds a new bootstrap peer to the lists (runtime only) -func AddBootstrapPeer(peerID, address string) { - BootstrapPeerIDs = append(BootstrapPeerIDs, peerID) +// AddBootstrapPeer adds a new bootstrap peer address (runtime only) +func AddBootstrapPeer(address string) { BootstrapAddresses = append(BootstrapAddresses, address) updateBackwardCompatibilityConstants() } diff --git a/pkg/database/rqlite.go b/pkg/database/rqlite.go index fc10031..a1c38be 100644 --- a/pkg/database/rqlite.go +++ b/pkg/database/rqlite.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "net" "net/http" "os" "os/exec" @@ -30,30 +29,31 @@ type RQLiteManager struct { // waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served. func (r *RQLiteManager) waitForSQLAvailable(ctx context.Context) error { - if r.connection == nil { - return fmt.Errorf("no rqlite connection") - } + if r.connection == nil { + r.logger.Error("No rqlite connection") + return errors.New("no rqlite connection") + } - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() - attempts := 0 - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-ticker.C: - attempts++ - _, err := r.connection.QueryOne("SELECT 1") - if err == nil { - r.logger.Info("RQLite SQL is available") - return nil - } - if attempts%5 == 0 { // log every ~5s to reduce noise - r.logger.Debug("Waiting for RQLite SQL availability", zap.Error(err)) - } - } - } + attempts := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + attempts++ + _, err := r.connection.QueryOne("SELECT 1") + if err == nil { + r.logger.Info("RQLite SQL is available") + return nil + } + if attempts%5 == 0 { // log every ~5s to reduce noise + r.logger.Debug("Waiting for RQLite SQL availability", zap.Error(err)) + } + } + } } // NewRQLiteManager creates a new RQLite manager @@ -73,40 +73,12 @@ func (r *RQLiteManager) Start(ctx context.Context) error { return fmt.Errorf("failed to create RQLite data directory: %w", err) } - // Determine advertise host based on configuration - advertiseHost := "127.0.0.1" // default - mode := strings.ToLower(r.config.AdvertiseMode) - switch mode { - case "localhost": - advertiseHost = "127.0.0.1" - r.logger.Info("Using localhost for RQLite advertising (dev mode)") - case "ip": - if ip, err := r.getExternalIP(); err == nil && ip != "" { - advertiseHost = ip - r.logger.Info("Using external IP for RQLite advertising (forced)", zap.String("ip", ip)) - } else { - r.logger.Warn("Failed to get external IP, falling back to localhost", zap.Error(err)) - } - default: // auto - if ip, err := r.getExternalIP(); err == nil && ip != "" { - advertiseHost = ip - r.logger.Info("Using external IP for RQLite advertising (auto)", zap.String("ip", ip)) - } else { - r.logger.Info("No external IP found, using localhost for RQLite advertising (auto)") - } - } - // Build RQLite command args := []string{ "-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort), "-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort), - // Auth disabled for testing } - // Always set advertised addresses explicitly to avoid 0.0.0.0 announcements - args = append(args, "-http-adv-addr", fmt.Sprintf("%s:%d", advertiseHost, r.config.RQLitePort)) - args = append(args, "-raft-adv-addr", fmt.Sprintf("%s:%d", advertiseHost, r.config.RQLiteRaftPort)) - // Add join address if specified (for non-bootstrap or secondary bootstrap nodes) if r.config.RQLiteJoinAddress != "" { r.logger.Info("Joining RQLite cluster", zap.String("join_address", r.config.RQLiteJoinAddress)) @@ -120,11 +92,11 @@ func (r *RQLiteManager) Start(ctx context.Context) error { } // Wait for join target to become reachable to avoid forming a separate cluster (wait indefinitely) - if err := r.waitForJoinTarget(ctx, joinArg, 0); err != nil { - r.logger.Warn("Join target did not become reachable within timeout; will still attempt to join", - zap.String("join_address", r.config.RQLiteJoinAddress), - zap.Error(err)) - } + if err := r.waitForJoinTarget(ctx, joinArg, 0); err != nil { + r.logger.Warn("Join target did not become reachable within timeout; will still attempt to join", + zap.String("join_address", r.config.RQLiteJoinAddress), + zap.Error(err)) + } // Always add the join parameter in host:port form - let rqlited handle the rest args = append(args, "-join", joinArg) @@ -140,7 +112,6 @@ func (r *RQLiteManager) Start(ctx context.Context) error { zap.Int("http_port", r.config.RQLitePort), zap.Int("raft_port", r.config.RQLiteRaftPort), zap.String("join_address", r.config.RQLiteJoinAddress), - zap.String("advertise_host", advertiseHost), zap.Strings("full_args", args), ) @@ -309,130 +280,37 @@ func (r *RQLiteManager) Stop() error { // waitForJoinTarget waits until the join target's HTTP status becomes reachable, or until timeout func (r *RQLiteManager) waitForJoinTarget(ctx context.Context, joinAddress string, timeout time.Duration) error { - var deadline time.Time - if timeout > 0 { - deadline = time.Now().Add(timeout) - } - var lastErr error + var deadline time.Time + if timeout > 0 { + deadline = time.Now().Add(timeout) + } + var lastErr error - for { - if err := r.testJoinAddress(joinAddress); err == nil { - r.logger.Info("Join target is reachable, proceeding with cluster join") - return nil - } else { - lastErr = err - r.logger.Debug("Join target not yet reachable; waiting...", zap.String("join_address", joinAddress), zap.Error(err)) - } + for { + if err := r.testJoinAddress(joinAddress); err == nil { + r.logger.Info("Join target is reachable, proceeding with cluster join") + return nil + } else { + lastErr = err + r.logger.Debug("Join target not yet reachable; waiting...", zap.String("join_address", joinAddress), zap.Error(err)) + } - // Check context - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(2 * time.Second): - } + // Check context + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + } - if !deadline.IsZero() && time.Now().After(deadline) { - break - } - } - - if lastErr == nil { - lastErr = fmt.Errorf("join target not reachable within %s", timeout) - } - return lastErr -} - -// getExternalIP attempts to get the external IP address of this machine -func (r *RQLiteManager) getExternalIP() (string, error) { - // Method 1: Try using `ip route get` to find the IP used to reach the internet - if output, err := exec.Command("ip", "route", "get", "8.8.8.8").Output(); err == nil { - lines := strings.Split(string(output), "\n") - for _, line := range lines { - if strings.Contains(line, "src") { - parts := strings.Fields(line) - for i, part := range parts { - if part == "src" && i+1 < len(parts) { - ip := parts[i+1] - if net.ParseIP(ip) != nil { - r.logger.Debug("Found external IP via ip route", zap.String("ip", ip)) - return ip, nil - } - } - } - } + if !deadline.IsZero() && time.Now().After(deadline) { + break } } - // Method 2: Get all network interfaces and find non-localhost, non-private IPs - interfaces, err := net.Interfaces() - if err != nil { - return "", err + if lastErr == nil { + lastErr = fmt.Errorf("join target not reachable within %s", timeout) } - - for _, iface := range interfaces { - if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 { - continue - } - - addrs, err := iface.Addrs() - if err != nil { - continue - } - - for _, addr := range addrs { - var ip net.IP - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - } - - if ip == nil || ip.IsLoopback() { - continue - } - - // Prefer public IPs over private IPs - if ip.To4() != nil && !ip.IsPrivate() { - r.logger.Debug("Found public IP", zap.String("ip", ip.String())) - return ip.String(), nil - } - } - } - - // Method 3: Fall back to private IPs if no public IP found - for _, iface := range interfaces { - if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 { - continue - } - - addrs, err := iface.Addrs() - if err != nil { - continue - } - - for _, addr := range addrs { - var ip net.IP - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - } - - if ip == nil || ip.IsLoopback() { - continue - } - - // Use any IPv4 address - if ip.To4() != nil { - r.logger.Debug("Found private IP", zap.String("ip", ip.String())) - return ip.String(), nil - } - } - } - - return "", fmt.Errorf("no suitable IP address found") + return lastErr } // testJoinAddress tests if a join address is reachable @@ -467,4 +345,3 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error { r.logger.Info("Leader HTTP reachable", zap.String("status_url", statusURL)) return nil } - diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go index 01d9200..b8d6641 100644 --- a/pkg/logging/logger.go +++ b/pkg/logging/logger.go @@ -46,22 +46,20 @@ type ColoredLogger struct { type Component string const ( - ComponentBootstrap Component = "BOOTSTRAP" - ComponentNode Component = "NODE" - ComponentRQLite Component = "RQLITE" - ComponentLibP2P Component = "LIBP2P" - ComponentStorage Component = "STORAGE" - ComponentDatabase Component = "DATABASE" - ComponentClient Component = "CLIENT" - ComponentDHT Component = "DHT" - ComponentGeneral Component = "GENERAL" + ComponentNode Component = "NODE" + ComponentRQLite Component = "RQLITE" + ComponentLibP2P Component = "LIBP2P" + ComponentStorage Component = "STORAGE" + ComponentDatabase Component = "DATABASE" + ComponentClient Component = "CLIENT" + ComponentDHT Component = "DHT" + ComponentGeneral Component = "GENERAL" + ComponentAnyone Component = "ANYONE" ) // getComponentColor returns the color for a specific component func getComponentColor(component Component) string { switch component { - case ComponentBootstrap: - return BrightGreen case ComponentNode: return BrightBlue case ComponentRQLite: @@ -75,6 +73,10 @@ func getComponentColor(component Component) string { case ComponentClient: return Blue case ComponentDHT: + return Magenta + case ComponentGeneral: + return Yellow + case ComponentAnyone: return Cyan default: return White @@ -278,3 +280,9 @@ func (s *StandardLogger) Println(v ...interface{}) { msg = strings.TrimSuffix(msg, "\n") s.logger.ComponentInfo(s.component, msg) } + +func (s *StandardLogger) Errorf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + msg = strings.TrimSuffix(msg, "\n") + s.logger.ComponentError(s.component, msg) +} diff --git a/pkg/node/node.go b/pkg/node/node.go index caae5c4..2d26cf8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -10,30 +10,29 @@ import ( "time" "github.com/libp2p/go-libp2p" - dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" + noise "github.com/libp2p/go-libp2p/p2p/security/noise" - libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" "github.com/multiformats/go-multiaddr" "go.uber.org/zap" + "git.debros.io/DeBros/network/pkg/anyoneproxy" "git.debros.io/DeBros/network/pkg/config" "git.debros.io/DeBros/network/pkg/database" - "git.debros.io/DeBros/network/pkg/anyoneproxy" "git.debros.io/DeBros/network/pkg/logging" "git.debros.io/DeBros/network/pkg/storage" ) // Node represents a network node with RQLite database type Node struct { - config *config.Config - logger *logging.ColoredLogger - host host.Host - dht *dht.IpfsDHT + config *config.Config + logger *logging.ColoredLogger + host host.Host + rqliteManager *database.RQLiteManager rqliteAdapter *database.RQLiteAdapter storageService *storage.Service @@ -45,7 +44,7 @@ type Node struct { // NewNode creates a new network node func NewNode(cfg *config.Config) (*Node, error) { // Create colored logger - logger, err := logging.NewDefaultLogger(logging.ComponentNode) + logger, err := logging.NewColoredLogger(logging.ComponentNode, true) if err != nil { return nil, fmt.Errorf("failed to create logger: %w", err) } @@ -56,49 +55,9 @@ func NewNode(cfg *config.Config) (*Node, error) { }, nil } -// Start starts the network node -func (n *Node) Start(ctx context.Context) error { - n.logger.ComponentInfo(logging.ComponentNode, "Starting network node", - zap.String("data_dir", n.config.Node.DataDir), - ) - - // Create data directory - if err := os.MkdirAll(n.config.Node.DataDir, 0755); err != nil { - return fmt.Errorf("failed to create data directory: %w", err) - } - - // Start RQLite - if err := n.startRQLite(ctx); err != nil { - return fmt.Errorf("failed to start RQLite: %w", err) - } - - // Start LibP2P host - if err := n.startLibP2P(); err != nil { - return fmt.Errorf("failed to start LibP2P: %w", err) - } - - // Start storage service - if err := n.startStorageService(); err != nil { - return fmt.Errorf("failed to start storage service: %w", err) - } - - // Get listen addresses for logging - var listenAddrs []string - for _, addr := range n.host.Addrs() { - listenAddrs = append(listenAddrs, addr.String()) - } - - n.logger.ComponentInfo(logging.ComponentNode, "Network node started successfully", - zap.String("peer_id", n.host.ID().String()), - zap.Strings("listen_addrs", listenAddrs), - ) - - return nil -} - // startRQLite initializes and starts the RQLite database func (n *Node) startRQLite(ctx context.Context) error { - n.logger.ComponentInfo(logging.ComponentDatabase, "Starting RQLite database") + n.logger.Info("Starting RQLite database") // Create RQLite manager n.rqliteManager = database.NewRQLiteManager(&n.config.Database, n.config.Node.DataDir, n.logger.Logger) @@ -118,6 +77,61 @@ func (n *Node) startRQLite(ctx context.Context) error { return nil } +// connectToBootstrapPeer connects to a single bootstrap peer +func (n *Node) connectToBootstrapPeer(ctx context.Context, addr string) error { + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return fmt.Errorf("invalid multiaddr: %w", err) + } + + // Extract peer info from multiaddr + peerInfo, err := peer.AddrInfoFromP2pAddr(ma) + if err != nil { + return fmt.Errorf("failed to extract peer info: %w", err) + } + + // Log resolved peer info prior to connect + n.logger.ComponentDebug(logging.ComponentNode, "Resolved bootstrap peer", + zap.String("peer_id", peerInfo.ID.String()), + zap.String("addr", addr), + zap.Int("addr_count", len(peerInfo.Addrs)), + ) + + // Connect to the peer + if err := n.host.Connect(ctx, *peerInfo); err != nil { + return fmt.Errorf("failed to connect to peer: %w", err) + } + + n.logger.Info("Connected to bootstrap peer", + zap.String("peer", peerInfo.ID.String()), + zap.String("addr", addr)) + + return nil +} + +// connectToBootstrapPeers connects to configured LibP2P bootstrap peers +func (n *Node) connectToBootstrapPeers(ctx context.Context) error { + if len(n.config.Discovery.BootstrapPeers) == 0 { + n.logger.ComponentDebug(logging.ComponentNode, "No bootstrap peers configured") + return nil + } + + // Use passed context with a reasonable timeout for bootstrap connections + connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers { + if err := n.connectToBootstrapPeer(connectCtx, bootstrapAddr); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to bootstrap peer", + zap.String("addr", bootstrapAddr), + zap.Error(err)) + continue + } + } + + return nil +} + // startLibP2P initializes the LibP2P host func (n *Node) startLibP2P() error { n.logger.ComponentInfo(logging.ComponentLibP2P, "Starting LibP2P host") @@ -147,7 +161,7 @@ func (n *Node) startLibP2P() error { } // Create LibP2P host with persistent identity - // Build options allowing conditional proxying via Anyone SOCKS5 and optional QUIC disable + // Build options allowing conditional proxying via Anyone SOCKS5 var opts []libp2p.Option opts = append(opts, libp2p.Identity(identity), @@ -163,14 +177,6 @@ func (n *Node) startLibP2P() error { opts = append(opts, libp2p.Transport(tcp.NewTCPTransport)) } - // QUIC transport: disabled when proxy is enabled (default), - // enabled only when not proxying. - if !anyoneproxy.Enabled() { - opts = append(opts, libp2p.Transport(libp2pquic.NewTransport)) - } else { - n.logger.ComponentDebug(logging.ComponentLibP2P, "QUIC disabled due to proxy being enabled") - } - h, err := libp2p.New(opts...) if err != nil { return err @@ -178,35 +184,19 @@ func (n *Node) startLibP2P() error { n.host = h - // Create DHT for peer discovery - Use server mode for better peer discovery - // Use configured protocol prefix to ensure we discover peers on the correct DHT namespace - dhtPrefix := n.config.Discovery.DHTPrefix - if strings.TrimSpace(dhtPrefix) == "" { - dhtPrefix = "/network/kad/1.0.0" - } - n.logger.ComponentInfo(logging.ComponentDHT, "Using DHT protocol prefix", zap.String("prefix", dhtPrefix)) - kademliaDHT, err := dht.New( - context.Background(), - h, - dht.Mode(dht.ModeServer), - dht.ProtocolPrefix(protocol.ID(dhtPrefix)), - ) - if err != nil { - return fmt.Errorf("failed to create DHT: %w", err) - } - n.dht = kademliaDHT + // DHT removed - incompatible with Anyone proxy anonymity architecture // Log configured bootstrap peers if len(n.config.Discovery.BootstrapPeers) > 0 { - n.logger.ComponentInfo(logging.ComponentDHT, "Configured bootstrap peers", + n.logger.ComponentInfo(logging.ComponentNode, "Configured bootstrap peers", zap.Strings("peers", n.config.Discovery.BootstrapPeers)) } else { - n.logger.ComponentDebug(logging.ComponentDHT, "No bootstrap peers configured") + n.logger.ComponentDebug(logging.ComponentNode, "No bootstrap peers configured") } // Connect to LibP2P bootstrap peers if configured - if err := n.connectToBootstrapPeers(); err != nil { - n.logger.Warn("Failed to connect to bootstrap peers", zap.Error(err)) + if err := n.connectToBootstrapPeers(context.Background()); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to bootstrap peers", zap.Error(err)) // Don't fail - continue without bootstrap connections } @@ -215,103 +205,38 @@ func (n *Node) startLibP2P() error { if len(n.config.Discovery.BootstrapPeers) > 0 { go func() { for i := 0; i < 12; i++ { // ~60s total - if n.host == nil || n.dht == nil { + if n.host == nil { return } - // If we already have peers or DHT table entries, stop retrying - if len(n.host.Network().Peers()) > 0 || len(n.dht.RoutingTable().ListPeers()) > 0 { + // If we already have peers, stop retrying + if len(n.host.Network().Peers()) > 0 { return } - if err := n.connectToBootstrapPeers(); err == nil { - n.logger.Debug("Bootstrap reconnect attempt completed") + if err := n.connectToBootstrapPeers(context.Background()); err == nil { + n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap reconnect attempt completed") } time.Sleep(5 * time.Second) } }() } - // Add bootstrap peers to DHT routing table BEFORE bootstrapping + // Add bootstrap peers to peerstore for peer exchange if len(n.config.Discovery.BootstrapPeers) > 0 { - n.logger.Info("Adding bootstrap peers to DHT routing table") + n.logger.ComponentInfo(logging.ComponentNode, "Adding bootstrap peers to peerstore") for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers { if ma, err := multiaddr.NewMultiaddr(bootstrapAddr); err == nil { if peerInfo, err := peer.AddrInfoFromP2pAddr(ma); err == nil { - // Add to peerstore with longer TTL + // Add to peerstore with longer TTL for peer exchange n.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24) - - // Force add to DHT routing table - added, err := n.dht.RoutingTable().TryAddPeer(peerInfo.ID, true, true) - if err != nil { - n.logger.Debug("Failed to add bootstrap peer to DHT routing table", - zap.String("peer", peerInfo.ID.String()), - zap.Error(err)) - } else if added { - n.logger.Info("Successfully added bootstrap peer to DHT routing table", - zap.String("peer", peerInfo.ID.String())) - } + n.logger.ComponentDebug(logging.ComponentNode, "Added bootstrap peer to peerstore", + zap.String("peer", peerInfo.ID.String())) } } } } - // Bootstrap the DHT AFTER connecting to bootstrap peers and adding them to routing table - if err = kademliaDHT.Bootstrap(context.Background()); err != nil { - n.logger.Warn("Failed to bootstrap DHT", zap.Error(err)) - // Don't fail - continue without DHT - } else { - n.logger.ComponentInfo(logging.ComponentDHT, "DHT bootstrap initiated successfully") - } - - // Give DHT a moment to initialize, then add connected peers to routing table - go func() { - time.Sleep(2 * time.Second) - connectedPeers := n.host.Network().Peers() - for _, peerID := range connectedPeers { - if peerID != n.host.ID() { - addrs := n.host.Peerstore().Addrs(peerID) - if len(addrs) > 0 { - n.host.Peerstore().AddAddrs(peerID, addrs, time.Hour*24) - n.logger.Info("Added connected peer to DHT peerstore", - zap.String("peer", peerID.String())) - - // Try to add this peer to DHT routing table explicitly - if n.dht != nil { - added, err := n.dht.RoutingTable().TryAddPeer(peerID, true, true) - if err != nil { - n.logger.Debug("Failed to add peer to DHT routing table", - zap.String("peer", peerID.String()), - zap.Error(err)) - } else if added { - n.logger.Info("Successfully added peer to DHT routing table", - zap.String("peer", peerID.String())) - } else { - n.logger.Debug("Peer already in DHT routing table or rejected", - zap.String("peer", peerID.String())) - } - } - } - } - } - - // Force multiple DHT refresh attempts to populate routing table - if n.dht != nil { - n.logger.Info("Forcing DHT refresh to discover peers") - for i := 0; i < 3; i++ { - time.Sleep(1 * time.Second) - n.dht.RefreshRoutingTable() - - // Check if routing table is populated - routingPeers := n.dht.RoutingTable().ListPeers() - n.logger.Info("DHT routing table status after refresh", - zap.Int("attempt", i+1), - zap.Int("peers_in_table", len(routingPeers))) - - if len(routingPeers) > 0 { - break // Success! - } - } - } - }() + // DHT and routing table logic removed - using simplified peer exchange instead + n.logger.ComponentInfo(logging.ComponentNode, "LibP2P host started successfully - using bootstrap + peer exchange discovery") // Start peer discovery and monitoring n.startPeerDiscovery() @@ -323,56 +248,20 @@ func (n *Node) startLibP2P() error { return nil } -// connectToBootstrapPeers connects to configured LibP2P bootstrap peers -func (n *Node) connectToBootstrapPeers() error { - if len(n.config.Discovery.BootstrapPeers) == 0 { - n.logger.ComponentDebug(logging.ComponentDHT, "No bootstrap peers configured") - return nil - } +// startStorageService initializes the storage service +func (n *Node) startStorageService() error { + n.logger.ComponentInfo(logging.ComponentStorage, "Starting storage service") - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers { - if err := n.connectToBootstrapPeer(ctx, bootstrapAddr); err != nil { - n.logger.Warn("Failed to connect to bootstrap peer", - zap.String("addr", bootstrapAddr), - zap.Error(err)) - continue - } - } - - return nil -} - -// connectToBootstrapPeer connects to a single bootstrap peer -func (n *Node) connectToBootstrapPeer(ctx context.Context, addr string) error { - ma, err := multiaddr.NewMultiaddr(addr) + // Create storage service using the RQLite SQL adapter + service, err := storage.NewService(n.rqliteAdapter.GetSQLDB(), n.logger.Logger) if err != nil { - return fmt.Errorf("invalid multiaddr: %w", err) + return err } - // Extract peer info from multiaddr - peerInfo, err := peer.AddrInfoFromP2pAddr(ma) - if err != nil { - return fmt.Errorf("failed to extract peer info: %w", err) - } + n.storageService = service - // Log resolved peer info prior to connect - n.logger.ComponentDebug(logging.ComponentDHT, "Resolved bootstrap peer", - zap.String("peer_id", peerInfo.ID.String()), - zap.String("addr", addr), - zap.Int("addr_count", len(peerInfo.Addrs)), - ) - - // Connect to the peer - if err := n.host.Connect(ctx, *peerInfo); err != nil { - return fmt.Errorf("failed to connect to peer: %w", err) - } - - n.logger.Info("Connected to bootstrap peer", - zap.String("peer", peerInfo.ID.String()), - zap.String("addr", addr)) + // Set up stream handler for storage protocol + n.host.SetStreamHandler("/network/storage/1.0.0", n.storageService.HandleStorageStream) return nil } @@ -418,59 +307,6 @@ func (n *Node) loadOrCreateIdentity() (crypto.PrivKey, error) { return priv, nil } -// startStorageService initializes the storage service -func (n *Node) startStorageService() error { - n.logger.ComponentInfo(logging.ComponentStorage, "Starting storage service") - - // Create storage service using the RQLite SQL adapter - service, err := storage.NewService(n.rqliteAdapter.GetSQLDB(), n.logger.Logger) - if err != nil { - return err - } - - n.storageService = service - - // Set up stream handler for storage protocol - n.host.SetStreamHandler("/network/storage/1.0.0", n.storageService.HandleStorageStream) - - return nil -} - -// getListenAddresses returns the current listen addresses as strings -// Stop stops the node and all its services -func (n *Node) Stop() error { - n.logger.ComponentInfo(logging.ComponentNode, "Stopping network node") - - // Stop peer discovery - n.stopPeerDiscovery() - - // Stop storage service - if n.storageService != nil { - n.storageService.Close() - } - - // Stop DHT - if n.dht != nil { - n.dht.Close() - } - - // Stop LibP2P host - if n.host != nil { - n.host.Close() - } - - // Stop RQLite - if n.rqliteAdapter != nil { - n.rqliteAdapter.Close() - } - if n.rqliteManager != nil { - _ = n.rqliteManager.Stop() - } - - n.logger.ComponentInfo(logging.ComponentNode, "Network node stopped") - return nil -} - // GetPeerID returns the peer ID of this node func (n *Node) GetPeerID() string { if n.host == nil { @@ -485,119 +321,113 @@ func (n *Node) startPeerDiscovery() { ctx, cancel := context.WithCancel(context.Background()) n.discoveryCancel = cancel - // Start discovery in a goroutine + // Start bootstrap peer connections immediately go func() { - // Do initial discovery immediately (no delay for faster discovery) - n.discoverPeers(ctx) + n.connectToBootstrapPeers(ctx) - // Start with frequent discovery for the first minute - rapidTicker := time.NewTicker(10 * time.Second) - rapidAttempts := 0 - maxRapidAttempts := 6 // 6 attempts * 10 seconds = 1 minute + // Periodic peer discovery using interval from config + ticker := time.NewTicker(n.config.Discovery.DiscoveryInterval) + defer ticker.Stop() for { select { case <-ctx.Done(): - rapidTicker.Stop() return - case <-rapidTicker.C: + case <-ticker.C: n.discoverPeers(ctx) - rapidAttempts++ - - // After rapid attempts, switch to slower periodic discovery - if rapidAttempts >= maxRapidAttempts { - rapidTicker.Stop() - - // Continue with slower periodic discovery every 15 seconds - slowTicker := time.NewTicker(15 * time.Second) - defer slowTicker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-slowTicker.C: - n.discoverPeers(ctx) - } - } - } } } }() } -// discoverPeers discovers and connects to new peers +// discoverPeers discovers and connects to new peers using peer exchange func (n *Node) discoverPeers(ctx context.Context) { - if n.host == nil || n.dht == nil { + if n.host == nil { return } connectedPeers := n.host.Network().Peers() initialCount := len(connectedPeers) - n.logger.Debug("Node peer discovery", + if initialCount == 0 { + // No peers connected, try bootstrap peers again + n.logger.ComponentInfo(logging.ComponentNode, "No peers connected, retrying bootstrap peers") + n.connectToBootstrapPeers(ctx) + return + } + + n.logger.ComponentDebug(logging.ComponentNode, "Discovering peers via peer exchange", zap.Int("current_peers", initialCount)) - // Strategy 1: Use DHT to find new peers - newConnections := n.discoverViaDHT(ctx) + // Strategy: Use peer exchange through libp2p's identify protocol + // LibP2P automatically exchanges peer information when peers connect + // We just need to try connecting to peers in our peerstore - // Strategy 2: Search for random peers using DHT FindPeer + newConnections := n.discoverViaPeerExchange(ctx) finalPeerCount := len(n.host.Network().Peers()) - if newConnections > 0 || finalPeerCount != initialCount { - n.logger.Debug("Node peer discovery completed", + if newConnections > 0 { + n.logger.ComponentInfo(logging.ComponentNode, "Peer discovery completed", zap.Int("new_connections", newConnections), zap.Int("initial_peers", initialCount), zap.Int("final_peers", finalPeerCount)) } } -// discoverViaDHT uses the DHT to find and connect to new peers -func (n *Node) discoverViaDHT(ctx context.Context) int { - if n.dht == nil { - return 0 - } - +// discoverViaPeerExchange discovers new peers using peer exchange (identify protocol) +func (n *Node) discoverViaPeerExchange(ctx context.Context) int { connected := 0 - maxConnections := 5 + maxConnections := 3 // Conservative limit to avoid overwhelming proxy - // Get peers from routing table - routingTablePeers := n.dht.RoutingTable().ListPeers() - n.logger.ComponentDebug(logging.ComponentDHT, "Node DHT routing table has peers", zap.Int("count", len(routingTablePeers))) + // Get all peers from peerstore (includes peers discovered through identify protocol) + allKnownPeers := n.host.Peerstore().Peers() - // Strategy 1: Connect to peers in DHT routing table - for _, peerID := range routingTablePeers { - if peerID == n.host.ID() { + for _, knownPeer := range allKnownPeers { + if knownPeer == n.host.ID() { continue } - // Check if already connected - if n.host.Network().Connectedness(peerID) == 1 { + // Skip if already connected + if n.host.Network().Connectedness(knownPeer) == network.Connected { continue } // Get addresses for this peer - addrs := n.host.Peerstore().Addrs(peerID) + addrs := n.host.Peerstore().Addrs(knownPeer) if len(addrs) == 0 { continue } - // Try to connect - connectCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - peerInfo := peer.AddrInfo{ID: peerID, Addrs: addrs} + // Filter to only standard P2P ports (avoid ephemeral client ports) + var validAddrs []multiaddr.Multiaddr + for _, addr := range addrs { + addrStr := addr.String() + // Keep addresses with standard P2P ports (4000-4999 range) + if strings.Contains(addrStr, ":400") { + validAddrs = append(validAddrs, addr) + } + } + + if len(validAddrs) == 0 { + continue + } + + // Try to connect with shorter timeout (proxy connections are slower) + connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + peerInfo := peer.AddrInfo{ID: knownPeer, Addrs: validAddrs} if err := n.host.Connect(connectCtx, peerInfo); err != nil { cancel() - n.logger.Debug("Failed to connect to DHT peer", - zap.String("peer", peerID.String()), + n.logger.ComponentDebug(logging.ComponentNode, "Failed to connect to peer via exchange", + zap.String("peer", knownPeer.String()), zap.Error(err)) continue } cancel() - n.logger.Debug("Node connected to new peer via DHT", - zap.String("peer", peerID.String())) + n.logger.ComponentInfo(logging.ComponentNode, "Connected to new peer via peer exchange", + zap.String("peer", knownPeer.String())) connected++ if connected >= maxConnections { @@ -605,80 +435,6 @@ func (n *Node) discoverViaDHT(ctx context.Context) int { } } - // Strategy 2: Use peer exchange - check what peers our connected peers know about - connectedPeers := n.host.Network().Peers() - for _, connectedPeer := range connectedPeers { - if connectedPeer == n.host.ID() { - continue - } - - // Get all peers from peerstore (this includes peers that connected peers might know about) - allKnownPeers := n.host.Peerstore().Peers() - - for _, knownPeer := range allKnownPeers { - if knownPeer == n.host.ID() || knownPeer == connectedPeer { - continue - } - - // Skip if already connected - if n.host.Network().Connectedness(knownPeer) == 1 { - continue - } - - // Get addresses for this peer - addrs := n.host.Peerstore().Addrs(knownPeer) - if len(addrs) == 0 { - continue - } - - // Filter addresses to only include listening ports (not ephemeral client ports) - var validAddrs []multiaddr.Multiaddr - for _, addr := range addrs { - addrStr := addr.String() - // Skip ephemeral ports (typically above 49152) and keep standard ports - if !strings.Contains(addrStr, ":53") && // Skip ephemeral ports starting with 53 - !strings.Contains(addrStr, ":54") && // Skip ephemeral ports starting with 54 - !strings.Contains(addrStr, ":55") && // Skip ephemeral ports starting with 55 - !strings.Contains(addrStr, ":56") && // Skip ephemeral ports starting with 56 - !strings.Contains(addrStr, ":57") && // Skip ephemeral ports starting with 57 - !strings.Contains(addrStr, ":58") && // Skip ephemeral ports starting with 58 - !strings.Contains(addrStr, ":59") && // Skip ephemeral ports starting with 59 - !strings.Contains(addrStr, ":6") && // Skip ephemeral ports starting with 6 - (strings.Contains(addrStr, ":400") || // Include 4000-4999 range - strings.Contains(addrStr, ":401") || - strings.Contains(addrStr, ":402") || - strings.Contains(addrStr, ":403")) { - validAddrs = append(validAddrs, addr) - } - } - - if len(validAddrs) == 0 { - continue - } - - // Try to connect using only valid addresses - connectCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - peerInfo := peer.AddrInfo{ID: knownPeer, Addrs: validAddrs} - - if err := n.host.Connect(connectCtx, peerInfo); err != nil { - cancel() - n.logger.Debug("Failed to connect to peerstore peer", - zap.String("peer", knownPeer.String()), - zap.Error(err)) - continue - } - cancel() - - n.logger.Debug("Node connected to new peer via peerstore", - zap.String("peer", knownPeer.String())) - connected++ - - if connected >= maxConnections { - return connected - } - } - } - return connected } @@ -711,4 +467,76 @@ func (n *Node) stopPeerDiscovery() { n.discoveryCancel() n.discoveryCancel = nil } + n.logger.ComponentInfo(logging.ComponentNode, "Peer discovery stopped") +} + +// getListenAddresses returns the current listen addresses as strings +// Stop stops the node and all its services +func (n *Node) Stop() error { + n.logger.ComponentInfo(logging.ComponentNode, "Stopping network node") + + // Stop peer discovery + n.stopPeerDiscovery() + + // Stop storage service + if n.storageService != nil { + n.storageService.Close() + } + + // Stop DHT + // DHT removed - using simplified bootstrap + peer exchange discovery + + // Stop LibP2P host + if n.host != nil { + n.host.Close() + } + + // Stop RQLite + if n.rqliteAdapter != nil { + n.rqliteAdapter.Close() + } + if n.rqliteManager != nil { + _ = n.rqliteManager.Stop() + } + + n.logger.ComponentInfo(logging.ComponentNode, "Network node stopped") + return nil +} + +// Starts the network node +func (n *Node) Start(ctx context.Context) error { + n.logger.Info("Starting network node", zap.String("data_dir", n.config.Node.DataDir)) + + // Create data directory + if err := os.MkdirAll(n.config.Node.DataDir, 0755); err != nil { + return fmt.Errorf("failed to create data directory: %w", err) + } + + // Start RQLite + if err := n.startRQLite(ctx); err != nil { + return fmt.Errorf("failed to start RQLite: %w", err) + } + + // Start LibP2P host + if err := n.startLibP2P(); err != nil { + return fmt.Errorf("failed to start LibP2P: %w", err) + } + + // Start storage service + if err := n.startStorageService(); err != nil { + return fmt.Errorf("failed to start storage service: %w", err) + } + + // Get listen addresses for logging + var listenAddrs []string + for _, addr := range n.host.Addrs() { + listenAddrs = append(listenAddrs, addr.String()) + } + + n.logger.ComponentInfo(logging.ComponentNode, "Network node started successfully", + zap.String("peer_id", n.host.ID().String()), + zap.Strings("listen_addrs", listenAddrs), + ) + + return nil }