diff --git a/cmd/gateway/config.go b/cmd/gateway/config.go index 017ff0d..3983f2c 100644 --- a/cmd/gateway/config.go +++ b/cmd/gateway/config.go @@ -14,10 +14,6 @@ import ( "go.uber.org/zap" ) -// For transition, alias main.GatewayConfig to pkg/gateway.Config -// server.go will be removed; this keeps compatibility until then. -type GatewayConfig = gateway.Config - func getEnvDefault(key, def string) string { if v := os.Getenv(key); strings.TrimSpace(v) != "" { return v diff --git a/pkg/cli/production/lifecycle/stop.go b/pkg/cli/production/lifecycle/stop.go index e179745..4a96835 100644 --- a/pkg/cli/production/lifecycle/stop.go +++ b/pkg/cli/production/lifecycle/stop.go @@ -53,13 +53,17 @@ func HandleStop() { // Reset failed state for any services that might be in failed state resetArgs := []string{"reset-failed"} resetArgs = append(resetArgs, services...) - exec.Command("systemctl", resetArgs...).Run() + if err := exec.Command("systemctl", resetArgs...).Run(); err != nil { + fmt.Printf(" ⚠️ Warning: Failed to reset-failed state: %v\n", err) + } // Wait again after reset-failed time.Sleep(1 * time.Second) // Stop again to ensure they're stopped - exec.Command("systemctl", stopArgs...).Run() + if err := exec.Command("systemctl", stopArgs...).Run(); err != nil { + fmt.Printf(" ⚠️ Warning: Second stop attempt had errors: %v\n", err) + } time.Sleep(1 * time.Second) hadError := false diff --git a/pkg/cli/production/upgrade/flags.go b/pkg/cli/production/upgrade/flags.go index cff6d1d..193aa63 100644 --- a/pkg/cli/production/upgrade/flags.go +++ b/pkg/cli/production/upgrade/flags.go @@ -60,10 +60,6 @@ func ParseFlags(args []string) (*Flags, error) { fs.IntVar(&flags.AnyoneBandwidth, "anyone-bandwidth", 30, "Limit relay to N% of VPS bandwidth (0=unlimited, runs speedtest)") fs.IntVar(&flags.AnyoneAccounting, "anyone-accounting", 0, "Monthly data cap for relay in GB (0=unlimited)") - // Support legacy flags for backwards compatibility - nightly := fs.Bool("nightly", false, "Use nightly branch (deprecated, use --branch nightly)") - main := fs.Bool("main", false, "Use main branch (deprecated, use --branch main)") - if err := fs.Parse(args); err != nil { if err == flag.ErrHelp { return nil, err @@ -71,14 +67,6 @@ func ParseFlags(args []string) (*Flags, error) { return nil, fmt.Errorf("failed to parse flags: %w", err) } - // Handle legacy flags - if *nightly { - flags.Branch = "nightly" - } - if *main { - flags.Branch = "main" - } - // Set nameserver if explicitly provided if *nameserver { flags.Nameserver = nameserver diff --git a/pkg/cli/utils/systemd.go b/pkg/cli/utils/systemd.go index 068825f..ef5f38d 100644 --- a/pkg/cli/utils/systemd.go +++ b/pkg/cli/utils/systemd.go @@ -10,6 +10,8 @@ import ( "strings" "syscall" "time" + + "github.com/DeBrosOfficial/network/pkg/constants" ) var ErrServiceNotFound = errors.New("service not found") @@ -22,15 +24,15 @@ type PortSpec struct { var ServicePorts = map[string][]PortSpec{ "debros-gateway": { - {Name: "Gateway API", Port: 6001}, + {Name: "Gateway API", Port: constants.GatewayAPIPort}, }, "debros-olric": { - {Name: "Olric HTTP", Port: 3320}, - {Name: "Olric Memberlist", Port: 3322}, + {Name: "Olric HTTP", Port: constants.OlricHTTPPort}, + {Name: "Olric Memberlist", Port: constants.OlricMemberlistPort}, }, "debros-node": { - {Name: "RQLite HTTP", Port: 5001}, - {Name: "RQLite Raft", Port: 7001}, + {Name: "RQLite HTTP", Port: constants.RQLiteHTTPPort}, + {Name: "RQLite Raft", Port: constants.RQLiteRaftPort}, }, "debros-ipfs": { {Name: "IPFS API", Port: 4501}, @@ -48,12 +50,12 @@ func DefaultPorts() []PortSpec { {Name: "IPFS Swarm", Port: 4001}, {Name: "IPFS API", Port: 4501}, {Name: "IPFS Gateway", Port: 8080}, - {Name: "Gateway API", Port: 6001}, - {Name: "RQLite HTTP", Port: 5001}, - {Name: "RQLite Raft", Port: 7001}, + {Name: "Gateway API", Port: constants.GatewayAPIPort}, + {Name: "RQLite HTTP", Port: constants.RQLiteHTTPPort}, + {Name: "RQLite Raft", Port: constants.RQLiteRaftPort}, {Name: "IPFS Cluster API", Port: 9094}, - {Name: "Olric HTTP", Port: 3320}, - {Name: "Olric Memberlist", Port: 3322}, + {Name: "Olric HTTP", Port: constants.OlricHTTPPort}, + {Name: "Olric Memberlist", Port: constants.OlricMemberlistPort}, } } diff --git a/pkg/constants/capacity.go b/pkg/constants/capacity.go new file mode 100644 index 0000000..39c1eed --- /dev/null +++ b/pkg/constants/capacity.go @@ -0,0 +1,9 @@ +package constants + +// Node capacity limits used by both deployment and namespace scheduling. +const ( + MaxDeploymentsPerNode = 100 + MaxMemoryMB = 8192 // 8GB + MaxCPUPercent = 400 // 400% = 4 cores + MaxPortsPerNode = 9900 // ~10k ports available +) diff --git a/pkg/constants/ports.go b/pkg/constants/ports.go new file mode 100644 index 0000000..3d36c69 --- /dev/null +++ b/pkg/constants/ports.go @@ -0,0 +1,11 @@ +package constants + +// Service ports used across the network. +const ( + WireGuardPort = 51820 + RQLiteHTTPPort = 5001 + RQLiteRaftPort = 7001 + OlricHTTPPort = 3320 + OlricMemberlistPort = 3322 + GatewayAPIPort = 6001 +) diff --git a/pkg/deployments/home_node.go b/pkg/deployments/home_node.go index d3d29a4..53becb2 100644 --- a/pkg/deployments/home_node.go +++ b/pkg/deployments/home_node.go @@ -6,6 +6,7 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/constants" "github.com/DeBrosOfficial/network/pkg/rqlite" "go.uber.org/zap" ) @@ -270,7 +271,7 @@ func (hnm *HomeNodeManager) getNodeCapacity(ctx context.Context, nodeID string) AllocatedPorts: allocatedPorts, AvailablePorts: availablePorts, UsedMemoryMB: totalMemoryMB, - AvailableMemoryMB: 8192 - totalMemoryMB, // Assume 8GB per node (make configurable later) + AvailableMemoryMB: constants.MaxMemoryMB - totalMemoryMB, UsedCPUPercent: totalCPUPercent, Score: score, } @@ -331,12 +332,10 @@ func (hnm *HomeNodeManager) getNodeResourceUsage(ctx context.Context, nodeID str // calculateCapacityScore calculates a 0.0-1.0 score (higher is better) func (hnm *HomeNodeManager) calculateCapacityScore(deploymentCount, allocatedPorts, availablePorts, usedMemoryMB, usedCPUPercent int) float64 { - const ( - maxDeployments = 100 // Max deployments per node - maxMemoryMB = 8192 // 8GB - maxCPUPercent = 400 // 400% = 4 cores - maxPorts = 9900 // ~10k ports available - ) + maxDeployments := constants.MaxDeploymentsPerNode + maxMemoryMB := constants.MaxMemoryMB + maxCPUPercent := constants.MaxCPUPercent + maxPorts := constants.MaxPortsPerNode // Calculate individual component scores (0.0 to 1.0) deploymentScore := 1.0 - (float64(deploymentCount) / float64(maxDeployments)) diff --git a/pkg/deployments/port_allocator.go b/pkg/deployments/port_allocator.go index a153de4..17fcbb0 100644 --- a/pkg/deployments/port_allocator.go +++ b/pkg/deployments/port_allocator.go @@ -3,6 +3,7 @@ package deployments import ( "context" "fmt" + "strings" "time" "github.com/DeBrosOfficial/network/pkg/client" @@ -216,21 +217,6 @@ func isConflictError(err error) bool { if err == nil { return false } - // RQLite returns constraint violation errors as strings containing "UNIQUE constraint failed" errStr := err.Error() - return contains(errStr, "UNIQUE") || contains(errStr, "constraint") || contains(errStr, "conflict") -} - -// contains checks if a string contains a substring (case-insensitive) -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) -} - -func findSubstring(s, substr string) bool { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return true - } - } - return false + return strings.Contains(errStr, "UNIQUE") || strings.Contains(errStr, "constraint") || strings.Contains(errStr, "conflict") } diff --git a/pkg/deployments/port_allocator_test.go b/pkg/deployments/port_allocator_test.go index 5acfe2f..89d9f23 100644 --- a/pkg/deployments/port_allocator_test.go +++ b/pkg/deployments/port_allocator_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "reflect" + "strings" "testing" "github.com/DeBrosOfficial/network/pkg/rqlite" @@ -410,7 +411,7 @@ func TestContains(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := contains(tt.s, tt.substr) + result := strings.Contains(tt.s, tt.substr) if result != tt.expected { t.Errorf("contains(%q, %q) = %v, expected %v", tt.s, tt.substr, result, tt.expected) } diff --git a/pkg/environments/production/config.go b/pkg/environments/production/config.go index 435ed93..ab74e9e 100644 --- a/pkg/environments/production/config.go +++ b/pkg/environments/production/config.go @@ -429,7 +429,9 @@ func (sg *SecretGenerator) SaveConfig(filename string, content string) error { } // Fix ownership - exec.Command("chown", "debros:debros", configPath).Run() + if err := exec.Command("chown", "debros:debros", configPath).Run(); err != nil { + fmt.Printf("Warning: failed to chown %s to debros:debros: %v\n", configPath, err) + } return nil } diff --git a/pkg/environments/production/preferences.go b/pkg/environments/production/preferences.go index e3926be..ea34f05 100644 --- a/pkg/environments/production/preferences.go +++ b/pkg/environments/production/preferences.go @@ -3,7 +3,6 @@ package production import ( "os" "path/filepath" - "strings" "gopkg.in/yaml.v3" ) @@ -15,10 +14,7 @@ type NodePreferences struct { AnyoneClient bool `yaml:"anyone_client"` } -const ( - preferencesFile = "preferences.yaml" - legacyBranchFile = ".branch" -) +const preferencesFile = "preferences.yaml" // SavePreferences saves node preferences to disk func SavePreferences(oramaDir string, prefs *NodePreferences) error { @@ -38,10 +34,6 @@ func SavePreferences(oramaDir string, prefs *NodePreferences) error { return err } - // Also save branch to legacy .branch file for backward compatibility - legacyPath := filepath.Join(oramaDir, legacyBranchFile) - os.WriteFile(legacyPath, []byte(prefs.Branch), 0644) - return nil } @@ -53,7 +45,7 @@ func LoadPreferences(oramaDir string) *NodePreferences { Nameserver: false, } - // Try to load from preferences.yaml first + // Try to load from preferences.yaml path := filepath.Join(oramaDir, preferencesFile) if data, err := os.ReadFile(path); err == nil { if err := yaml.Unmarshal(data, prefs); err == nil { @@ -61,15 +53,6 @@ func LoadPreferences(oramaDir string) *NodePreferences { } } - // Fall back to legacy .branch file - legacyPath := filepath.Join(oramaDir, legacyBranchFile) - if data, err := os.ReadFile(legacyPath); err == nil { - branch := strings.TrimSpace(string(data)) - if branch != "" { - prefs.Branch = branch - } - } - return prefs } diff --git a/pkg/gateway/anon_proxy_handler.go b/pkg/gateway/anon_proxy_handler.go index 692434d..e6fe987 100644 --- a/pkg/gateway/anon_proxy_handler.go +++ b/pkg/gateway/anon_proxy_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "net/url" "strings" @@ -234,31 +235,15 @@ func isPrivateOrLocalHost(host string) bool { } // Check for localhost variants - if host == "localhost" || host == "::1" { + if host == "localhost" { return true } - // Check common private ranges (basic check) - if strings.HasPrefix(host, "10.") || - strings.HasPrefix(host, "192.168.") || - strings.HasPrefix(host, "172.16.") || - strings.HasPrefix(host, "172.17.") || - strings.HasPrefix(host, "172.18.") || - strings.HasPrefix(host, "172.19.") || - strings.HasPrefix(host, "172.20.") || - strings.HasPrefix(host, "172.21.") || - strings.HasPrefix(host, "172.22.") || - strings.HasPrefix(host, "172.23.") || - strings.HasPrefix(host, "172.24.") || - strings.HasPrefix(host, "172.25.") || - strings.HasPrefix(host, "172.26.") || - strings.HasPrefix(host, "172.27.") || - strings.HasPrefix(host, "172.28.") || - strings.HasPrefix(host, "172.29.") || - strings.HasPrefix(host, "172.30.") || - strings.HasPrefix(host, "172.31.") { - return true + // Parse as IP and use standard library checks + ip := net.ParseIP(host) + if ip == nil { + return false } - return false + return ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() } diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index ac69ba3..6bf8a24 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -1111,35 +1111,6 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de } } - // Legacy format: {name}.node-{shortID}.{baseDomain} (backwards compatibility) - if len(parts) == 2 && strings.HasPrefix(parts[1], "node-") { - deploymentName := parts[0] - shortNodeID := parts[1] // e.g., "node-kv4la8" - - // Query by name and matching short node ID - query := ` - SELECT id, namespace, name, type, port, content_cid, status, home_node_id - FROM deployments - WHERE name = ? - AND ('node-' || substr(home_node_id, 9, 6) = ? OR home_node_id = ?) - AND status = 'active' - LIMIT 1 - ` - result, err := db.Query(internalCtx, query, deploymentName, shortNodeID, shortNodeID) - if err == nil && len(result.Rows) > 0 { - row := result.Rows[0] - return &deployments.Deployment{ - ID: getString(row[0]), - Namespace: getString(row[1]), - Name: getString(row[2]), - Type: deployments.DeploymentType(getString(row[3])), - Port: getInt(row[4]), - ContentCID: getString(row[5]), - Status: deployments.DeploymentStatus(getString(row[6])), - HomeNodeID: getString(row[7]), - }, nil - } - } } // Try custom domain from deployment_domains table diff --git a/pkg/gateway/peer_discovery.go b/pkg/gateway/peer_discovery.go index 404bb3b..43432f6 100644 --- a/pkg/gateway/peer_discovery.go +++ b/pkg/gateway/peer_discovery.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/DeBrosOfficial/network/pkg/wireguard" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -337,16 +338,21 @@ func (pd *PeerDiscovery) updateHeartbeat(ctx context.Context) error { } // GetWireGuardIP detects the local WireGuard IP address using the wg0 network -// interface or the WireGuard config file. It does not require a PeerDiscovery -// instance and can be called from anywhere in the gateway package. +// interface, the 'ip' command, or the WireGuard config file. +// It does not require a PeerDiscovery instance and can be called from anywhere +// in the gateway package. func GetWireGuardIP() (string, error) { - // Method 1: Use 'ip addr show wg0' command (works without root) - ip, err := getWireGuardIPFromCommand() - if err == nil { + // Method 1: Use net.InterfaceByName (shared implementation) + if ip, err := wireguard.GetIP(); err == nil { return ip, nil } - // Method 2: Try to read from WireGuard config file (requires root, may fail) + // Method 2: Use 'ip addr show wg0' command (works without root) + if ip, err := getWireGuardIPFromCommand(); err == nil { + return ip, nil + } + + // Method 3: Try to read from WireGuard config file (requires root, may fail) configPath := "/etc/wireguard/wg0.conf" data, err := os.ReadFile(configPath) if err == nil { @@ -359,7 +365,6 @@ func GetWireGuardIP() (string, error) { parts := strings.Split(line, "=") if len(parts) == 2 { addrWithCIDR := strings.TrimSpace(parts[1]) - // Remove /24 suffix ip := strings.Split(addrWithCIDR, "/")[0] ip = strings.TrimSpace(ip) return ip, nil diff --git a/pkg/ipfs/cluster_peer.go b/pkg/ipfs/cluster_peer.go index cdc0a7f..9f28ac1 100644 --- a/pkg/ipfs/cluster_peer.go +++ b/pkg/ipfs/cluster_peer.go @@ -61,7 +61,9 @@ func (cm *ClusterConfigManager) UpdateAllClusterPeers() error { func (cm *ClusterConfigManager) RepairPeerConfiguration() error { cm.logger.Info("Attempting to repair IPFS Cluster peer configuration") - _ = cm.FixIPFSConfigAddresses() + if err := cm.FixIPFSConfigAddresses(); err != nil { + cm.logger.Warn("Failed to fix IPFS config addresses during repair", zap.Error(err)) + } peers, err := cm.DiscoverClusterPeersFromGateway() if err != nil { @@ -72,7 +74,9 @@ func (cm *ClusterConfigManager) RepairPeerConfiguration() error { peerAddrs = append(peerAddrs, p.Multiaddress) } if len(peerAddrs) > 0 { - _ = cm.UpdatePeerAddresses(peerAddrs) + if err := cm.UpdatePeerAddresses(peerAddrs); err != nil { + cm.logger.Warn("Failed to update peer addresses during repair", zap.Error(err)) + } } } diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index 82914f5..e7edbe5 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -893,21 +893,35 @@ func (cm *ClusterManager) GetClusterStatus(ctx context.Context, clusterID string ClusterID: cluster.ID, } - // Check individual service status - // TODO: Actually check each service's health - if cluster.Status == ClusterStatusReady { - status.RQLiteReady = true - status.OlricReady = true - status.GatewayReady = true - status.DNSReady = true - } - - // Get node list + // Check individual service status by inspecting cluster nodes nodes, err := cm.getClusterNodes(ctx, clusterID) if err == nil { + runningCount := 0 + hasRQLite := false + hasOlric := false + hasGateway := false + for _, node := range nodes { status.Nodes = append(status.Nodes, node.NodeID) + if node.Status == NodeStatusRunning { + runningCount++ + } + if node.RQLiteHTTPPort > 0 { + hasRQLite = true + } + if node.OlricHTTPPort > 0 { + hasOlric = true + } + if node.GatewayHTTPPort > 0 { + hasGateway = true + } } + + allRunning := len(nodes) > 0 && runningCount == len(nodes) + status.RQLiteReady = allRunning && hasRQLite + status.OlricReady = allRunning && hasOlric + status.GatewayReady = allRunning && hasGateway + status.DNSReady = allRunning } if cluster.ErrorMessage != "" { diff --git a/pkg/namespace/node_selector.go b/pkg/namespace/node_selector.go index 2be31ad..929e645 100644 --- a/pkg/namespace/node_selector.go +++ b/pkg/namespace/node_selector.go @@ -6,6 +6,7 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/constants" "github.com/DeBrosOfficial/network/pkg/rqlite" "go.uber.org/zap" ) @@ -176,12 +177,10 @@ func (cns *ClusterNodeSelector) getNodeCapacity(ctx context.Context, nodeID, ipA } // Calculate available capacity - const ( - maxDeployments = 100 - maxPorts = 9900 // User deployment port range - maxMemoryMB = 8192 // 8GB - maxCPUPercent = 400 // 4 cores - ) + maxDeployments := constants.MaxDeploymentsPerNode + maxPorts := constants.MaxPortsPerNode + maxMemoryMB := constants.MaxMemoryMB + maxCPUPercent := constants.MaxCPUPercent availablePorts := maxPorts - allocatedPorts if availablePorts < 0 { diff --git a/pkg/namespace/port_allocator.go b/pkg/namespace/port_allocator.go index d237d26..d58ef01 100644 --- a/pkg/namespace/port_allocator.go +++ b/pkg/namespace/port_allocator.go @@ -3,6 +3,7 @@ package namespace import ( "context" "fmt" + "strings" "time" "github.com/DeBrosOfficial/network/pkg/client" @@ -369,19 +370,5 @@ func isConflictError(err error) bool { return false } errStr := err.Error() - return contains(errStr, "UNIQUE") || contains(errStr, "constraint") || contains(errStr, "conflict") -} - -// contains checks if a string contains a substring (case-insensitive) -func contains(s, substr string) bool { - return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr)) -} - -func findSubstring(s, substr string) bool { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return true - } - } - return false + return strings.Contains(errStr, "UNIQUE") || strings.Contains(errStr, "constraint") || strings.Contains(errStr, "conflict") } diff --git a/pkg/namespace/port_allocator_test.go b/pkg/namespace/port_allocator_test.go index 1a44148..1da7a7e 100644 --- a/pkg/namespace/port_allocator_test.go +++ b/pkg/namespace/port_allocator_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "strings" "testing" "time" @@ -269,7 +270,7 @@ func TestContains(t *testing.T) { for _, tt := range tests { t.Run(tt.s+"_"+tt.substr, func(t *testing.T) { - result := contains(tt.s, tt.substr) + result := strings.Contains(tt.s, tt.substr) if result != tt.expected { t.Errorf("contains(%q, %q) = %v, want %v", tt.s, tt.substr, result, tt.expected) } diff --git a/pkg/namespace/wireguard.go b/pkg/namespace/wireguard.go index 38add16..3c71753 100644 --- a/pkg/namespace/wireguard.go +++ b/pkg/namespace/wireguard.go @@ -1,25 +1,9 @@ package namespace -import ( - "fmt" - "net" -) +import "github.com/DeBrosOfficial/network/pkg/wireguard" // getWireGuardIP returns the IPv4 address of the wg0 interface. // Used as a fallback when Olric BindAddr is empty or 0.0.0.0. func getWireGuardIP() (string, error) { - iface, err := net.InterfaceByName("wg0") - if err != nil { - return "", fmt.Errorf("wg0 interface not found: %w", err) - } - addrs, err := iface.Addrs() - if err != nil { - return "", fmt.Errorf("failed to get wg0 addresses: %w", err) - } - for _, addr := range addrs { - if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil { - return ipnet.IP.String(), nil - } - } - return "", fmt.Errorf("no IPv4 address on wg0") + return wireguard.GetIP() } diff --git a/pkg/node/dns_registration.go b/pkg/node/dns_registration.go index ceba888..d4cb76c 100644 --- a/pkg/node/dns_registration.go +++ b/pkg/node/dns_registration.go @@ -11,6 +11,7 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/logging" + "github.com/DeBrosOfficial/network/pkg/wireguard" "go.uber.org/zap" ) @@ -414,20 +415,7 @@ func (n *Node) isNameserverNode(ctx context.Context) bool { // getWireGuardIP returns the IPv4 address assigned to the wg0 interface, if any func (n *Node) getWireGuardIP() (string, error) { - iface, err := net.InterfaceByName("wg0") - if err != nil { - return "", err - } - addrs, err := iface.Addrs() - if err != nil { - return "", err - } - for _, addr := range addrs { - if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil { - return ipnet.IP.String(), nil - } - } - return "", fmt.Errorf("no IPv4 address on wg0") + return wireguard.GetIP() } // getNodeIPAddress attempts to determine the node's external IP address diff --git a/pkg/rqlite/cluster.go b/pkg/rqlite/cluster.go index 61228fc..ab1758d 100644 --- a/pkg/rqlite/cluster.go +++ b/pkg/rqlite/cluster.go @@ -47,7 +47,9 @@ func (r *RQLiteManager) waitForMinClusterSizeBeforeStart(ctx context.Context, rq return nil } - _ = r.discoveryService.TriggerPeerExchange(ctx) + if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil { + r.logger.Warn("Failed to trigger peer exchange before cluster wait", zap.Error(err)) + } checkInterval := 2 * time.Second for { @@ -92,7 +94,9 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql return fmt.Errorf("discovery service not available") } - _ = r.discoveryService.TriggerPeerExchange(ctx) + if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil { + r.logger.Warn("Failed to trigger peer exchange during pre-start discovery", zap.Error(err)) + } time.Sleep(1 * time.Second) r.discoveryService.TriggerSync() time.Sleep(2 * time.Second) @@ -123,7 +127,9 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql zap.Int("discovered_peers", discoveredPeers), zap.Int("min_cluster_size", r.config.MinClusterSize)) // Still write peers.json with just ourselves - better than nothing - _ = r.discoveryService.ForceWritePeersJSON() + if err := r.discoveryService.ForceWritePeersJSON(); err != nil { + r.logger.Warn("Failed to write single-node peers.json fallback", zap.Error(err)) + } return nil } @@ -137,8 +143,12 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql } if ourLogIndex == 0 && maxPeerIndex > 0 { - _ = r.clearRaftState(rqliteDataDir) - _ = r.discoveryService.ForceWritePeersJSON() + if err := r.clearRaftState(rqliteDataDir); err != nil { + r.logger.Warn("Failed to clear raft state during pre-start discovery", zap.Error(err)) + } + if err := r.discoveryService.ForceWritePeersJSON(); err != nil { + r.logger.Warn("Failed to write peers.json after clearing raft state", zap.Error(err)) + } } } @@ -150,7 +160,9 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql // recoverCluster restarts RQLite using peers.json func (r *RQLiteManager) recoverCluster(ctx context.Context, peersJSONPath string) error { - _ = r.Stop() + if err := r.Stop(); err != nil { + r.logger.Warn("Failed to stop RQLite during cluster recovery", zap.Error(err)) + } time.Sleep(2 * time.Second) rqliteDataDir, err := r.rqliteDataDirPath() @@ -187,10 +199,14 @@ func (r *RQLiteManager) recoverFromSplitBrain(ctx context.Context) error { } if ourIndex == 0 && maxPeerIndex > 0 { - _ = r.clearRaftState(rqliteDataDir) + if err := r.clearRaftState(rqliteDataDir); err != nil { + r.logger.Warn("Failed to clear raft state during split-brain recovery", zap.Error(err)) + } r.discoveryService.TriggerPeerExchange(ctx) time.Sleep(1 * time.Second) - _ = r.discoveryService.ForceWritePeersJSON() + if err := r.discoveryService.ForceWritePeersJSON(); err != nil { + r.logger.Warn("Failed to write peers.json during split-brain recovery", zap.Error(err)) + } return r.recoverCluster(ctx, filepath.Join(rqliteDataDir, "raft", "peers.json")) } @@ -265,7 +281,9 @@ func (r *RQLiteManager) startHealthMonitoring(ctx context.Context) { return case <-ticker.C: if r.isInSplitBrainState() { - _ = r.recoverFromSplitBrain(ctx) + if err := r.recoverFromSplitBrain(ctx); err != nil { + r.logger.Warn("Split-brain recovery attempt failed", zap.Error(err)) + } } } } diff --git a/pkg/serverless/cache/module_cache.go b/pkg/serverless/cache/module_cache.go index 2144606..a9e0a62 100644 --- a/pkg/serverless/cache/module_cache.go +++ b/pkg/serverless/cache/module_cache.go @@ -3,14 +3,21 @@ package cache import ( "context" "sync" + "time" "github.com/tetratelabs/wazero" "go.uber.org/zap" ) +// cacheEntry wraps a compiled module with access tracking for LRU eviction. +type cacheEntry struct { + module wazero.CompiledModule + lastAccessed time.Time +} + // ModuleCache manages compiled WASM module caching. type ModuleCache struct { - modules map[string]wazero.CompiledModule + modules map[string]*cacheEntry mu sync.RWMutex capacity int logger *zap.Logger @@ -19,7 +26,7 @@ type ModuleCache struct { // NewModuleCache creates a new ModuleCache. func NewModuleCache(capacity int, logger *zap.Logger) *ModuleCache { return &ModuleCache{ - modules: make(map[string]wazero.CompiledModule), + modules: make(map[string]*cacheEntry), capacity: capacity, logger: logger, } @@ -27,15 +34,20 @@ func NewModuleCache(capacity int, logger *zap.Logger) *ModuleCache { // Get retrieves a compiled module from the cache. func (c *ModuleCache) Get(wasmCID string) (wazero.CompiledModule, bool) { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() - module, exists := c.modules[wasmCID] - return module, exists + entry, exists := c.modules[wasmCID] + if !exists { + return nil, false + } + + entry.lastAccessed = time.Now() + return entry.module, true } // Set stores a compiled module in the cache. -// If the cache is full, it evicts the oldest module. +// If the cache is full, it evicts the least recently used module. func (c *ModuleCache) Set(wasmCID string, module wazero.CompiledModule) { c.mu.Lock() defer c.mu.Unlock() @@ -50,7 +62,10 @@ func (c *ModuleCache) Set(wasmCID string, module wazero.CompiledModule) { c.evictOldest() } - c.modules[wasmCID] = module + c.modules[wasmCID] = &cacheEntry{ + module: module, + lastAccessed: time.Now(), + } c.logger.Debug("Module cached", zap.String("wasm_cid", wasmCID), @@ -63,8 +78,8 @@ func (c *ModuleCache) Delete(ctx context.Context, wasmCID string) { c.mu.Lock() defer c.mu.Unlock() - if module, exists := c.modules[wasmCID]; exists { - _ = module.Close(ctx) + if entry, exists := c.modules[wasmCID]; exists { + _ = entry.module.Close(ctx) delete(c.modules, wasmCID) c.logger.Debug("Module removed from cache", zap.String("wasm_cid", wasmCID)) } @@ -97,8 +112,8 @@ func (c *ModuleCache) Clear(ctx context.Context) { c.mu.Lock() defer c.mu.Unlock() - for cid, module := range c.modules { - if err := module.Close(ctx); err != nil { + for cid, entry := range c.modules { + if err := entry.module.Close(ctx); err != nil { c.logger.Warn("Failed to close cached module during clear", zap.String("cid", cid), zap.Error(err), @@ -106,7 +121,7 @@ func (c *ModuleCache) Clear(ctx context.Context) { } } - c.modules = make(map[string]wazero.CompiledModule) + c.modules = make(map[string]*cacheEntry) c.logger.Debug("Module cache cleared") } @@ -118,16 +133,23 @@ func (c *ModuleCache) GetStats() (size int, capacity int) { return len(c.modules), c.capacity } -// evictOldest removes the oldest module from cache. +// evictOldest removes the least recently accessed module from cache. // Must be called with mu held. func (c *ModuleCache) evictOldest() { - // Simple LRU: just remove the first one we find - // In production, you'd want proper LRU tracking - for cid, module := range c.modules { - _ = module.Close(context.Background()) - delete(c.modules, cid) - c.logger.Debug("Evicted module from cache", zap.String("wasm_cid", cid)) - break + var oldestCID string + var oldestTime time.Time + + for cid, entry := range c.modules { + if oldestCID == "" || entry.lastAccessed.Before(oldestTime) { + oldestCID = cid + oldestTime = entry.lastAccessed + } + } + + if oldestCID != "" { + _ = c.modules[oldestCID].module.Close(context.Background()) + delete(c.modules, oldestCID) + c.logger.Debug("Evicted LRU module from cache", zap.String("wasm_cid", oldestCID)) } } @@ -135,12 +157,13 @@ func (c *ModuleCache) evictOldest() { // The compute function is called with the lock released to avoid blocking. func (c *ModuleCache) GetOrCompute(wasmCID string, compute func() (wazero.CompiledModule, error)) (wazero.CompiledModule, error) { // Try to get from cache first - c.mu.RLock() - if module, exists := c.modules[wasmCID]; exists { - c.mu.RUnlock() - return module, nil + c.mu.Lock() + if entry, exists := c.modules[wasmCID]; exists { + entry.lastAccessed = time.Now() + c.mu.Unlock() + return entry.module, nil } - c.mu.RUnlock() + c.mu.Unlock() // Compute the module (without holding the lock) module, err := compute() @@ -153,9 +176,10 @@ func (c *ModuleCache) GetOrCompute(wasmCID string, compute func() (wazero.Compil defer c.mu.Unlock() // Double-check (another goroutine might have added it) - if existingModule, exists := c.modules[wasmCID]; exists { + if entry, exists := c.modules[wasmCID]; exists { _ = module.Close(context.Background()) // Discard our compilation - return existingModule, nil + entry.lastAccessed = time.Now() + return entry.module, nil } // Evict if cache is full @@ -163,7 +187,10 @@ func (c *ModuleCache) GetOrCompute(wasmCID string, compute func() (wazero.Compil c.evictOldest() } - c.modules[wasmCID] = module + c.modules[wasmCID] = &cacheEntry{ + module: module, + lastAccessed: time.Now(), + } c.logger.Debug("Module compiled and cached", zap.String("wasm_cid", wasmCID), diff --git a/pkg/serverless/invoke.go b/pkg/serverless/invoke.go index 87ba126..0108769 100644 --- a/pkg/serverless/invoke.go +++ b/pkg/serverless/invoke.go @@ -3,6 +3,7 @@ package serverless import ( "context" "encoding/json" + "errors" "fmt" "time" @@ -249,7 +250,7 @@ func (i *Invoker) isRetryable(err error) bool { // Retry execution errors (could be transient) var execErr *ExecutionError - if ok := errorAs(err, &execErr); ok { + if errors.As(err, &execErr) { return true } @@ -347,22 +348,6 @@ type DLQMessage struct { CallerWallet string `json:"caller_wallet,omitempty"` } -// errorAs is a helper to avoid import of errors package. -func errorAs(err error, target interface{}) bool { - if err == nil { - return false - } - // Simple type assertion for our custom error types - switch t := target.(type) { - case **ExecutionError: - if e, ok := err.(*ExecutionError); ok { - *t = e - return true - } - } - return false -} - // ----------------------------------------------------------------------------- // Batch Invocation (for future use) // ----------------------------------------------------------------------------- diff --git a/pkg/tlsutil/client.go b/pkg/tlsutil/client.go index 28feadf..0d16ce1 100644 --- a/pkg/tlsutil/client.go +++ b/pkg/tlsutil/client.go @@ -82,12 +82,9 @@ func GetTLSConfig() *tls.Config { MinVersion: tls.VersionTLS12, } - // If we have a CA cert pool, use it + // If we have a CA cert pool, use it for verifying self-signed certs if caCertPool != nil { config.RootCAs = caCertPool - } else if len(trustedDomains) > 0 { - // Fallback: skip verification if trusted domains are configured but no CA pool - config.InsecureSkipVerify = true } return config @@ -103,11 +100,12 @@ func NewHTTPClient(timeout time.Duration) *http.Client { } } -// NewHTTPClientForDomain creates an HTTP client configured for a specific domain +// NewHTTPClientForDomain creates an HTTP client configured for a specific domain. +// Only skips TLS verification for explicitly trusted domains when no CA cert is available. func NewHTTPClientForDomain(timeout time.Duration, hostname string) *http.Client { tlsConfig := GetTLSConfig() - // If this domain is in trusted list and we don't have a CA pool, allow insecure + // Only skip TLS for explicitly trusted domains when no CA pool is configured if caCertPool == nil && ShouldSkipTLSVerify(hostname) { tlsConfig.InsecureSkipVerify = true } diff --git a/pkg/wireguard/ip.go b/pkg/wireguard/ip.go new file mode 100644 index 0000000..5bd14d7 --- /dev/null +++ b/pkg/wireguard/ip.go @@ -0,0 +1,24 @@ +package wireguard + +import ( + "fmt" + "net" +) + +// GetIP returns the IPv4 address of the wg0 interface. +func GetIP() (string, error) { + iface, err := net.InterfaceByName("wg0") + if err != nil { + return "", fmt.Errorf("wg0 interface not found: %w", err) + } + addrs, err := iface.Addrs() + if err != nil { + return "", fmt.Errorf("failed to get wg0 addresses: %w", err) + } + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil { + return ipnet.IP.String(), nil + } + } + return "", fmt.Errorf("no IPv4 address on wg0") +}