diff --git a/.codex/environments/environment.toml b/.codex/environments/environment.toml new file mode 100644 index 0000000..e88452c --- /dev/null +++ b/.codex/environments/environment.toml @@ -0,0 +1,6 @@ +# THIS IS AUTOGENERATED. DO NOT EDIT MANUALLY +version = 1 +name = "network" + +[setup] +script = "export MCP_BEARER_TOKEN=\"ra_9941ab97eb51668394a68963a2ab6fead0ca942afe437a6e2f4a520efcb24036\"" diff --git a/bin-linux/identity b/bin-linux/identity deleted file mode 100755 index dff0783..0000000 Binary files a/bin-linux/identity and /dev/null differ diff --git a/pkg/config/config.go b/pkg/config/config.go index e1881d3..6a1007c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -127,7 +127,7 @@ func DefaultConfig() *Config { // IPFS storage configuration IPFS: IPFSConfig{ ClusterAPIURL: "", // Empty = disabled - APIURL: "http://localhost:5001", + APIURL: "http://localhost:4501", Timeout: 60 * time.Second, ReplicationFactor: 3, EnableEncryption: true, @@ -158,7 +158,7 @@ func DefaultConfig() *Config { OlricServers: []string{"localhost:3320"}, OlricTimeout: 10 * time.Second, IPFSClusterAPIURL: "http://localhost:9094", - IPFSAPIURL: "http://localhost:5001", + IPFSAPIURL: "http://localhost:4501", IPFSTimeout: 60 * time.Second, }, } diff --git a/pkg/config/database_config.go b/pkg/config/database_config.go index 533f482..3898503 100644 --- a/pkg/config/database_config.go +++ b/pkg/config/database_config.go @@ -41,8 +41,8 @@ type IPFSConfig struct { // If empty, IPFS storage is disabled for this node ClusterAPIURL string `yaml:"cluster_api_url"` - // APIURL is the IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001") - // If empty, defaults to "http://localhost:5001" + // APIURL is the IPFS HTTP API URL for content retrieval (e.g., "http://localhost:4501") + // If empty, defaults to "http://localhost:4501" APIURL string `yaml:"api_url"` // Timeout for IPFS operations diff --git a/pkg/gateway/config.go b/pkg/gateway/config.go index f07a1d0..9384513 100644 --- a/pkg/gateway/config.go +++ b/pkg/gateway/config.go @@ -34,7 +34,7 @@ type Config struct { // IPFS Cluster configuration IPFSClusterAPIURL string // IPFS Cluster HTTP API URL (e.g., "http://localhost:9094"). If empty, gateway will discover from node configs - IPFSAPIURL string // IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001"). If empty, gateway will discover from node configs + IPFSAPIURL string // IPFS HTTP API URL for content retrieval (e.g., "http://localhost:4501"). If empty, gateway will discover from node configs IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) IPFSReplicationFactor int // Replication factor for pins (default: 3) IPFSEnableEncryption bool // Enable client-side encryption before upload (default: true, discovered from node configs) diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index b81bc81..2e58317 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -3,9 +3,11 @@ package gateway import ( "context" "encoding/json" + "hash/fnv" "io" "net" "net/http" + "sort" "strconv" "strings" "time" @@ -781,14 +783,15 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R db := g.client.Database() internalCtx := client.WithInternalAuth(r.Context()) - // Single query: get internal IP + gateway port from cluster tables + // Query all ready namespace gateways and choose a stable target. + // Random selection causes WS subscribe and publish calls to hit different + // nodes, which makes pubsub delivery flaky for short-lived subscriptions. query := ` SELECT COALESCE(dn.internal_ip, dn.ip_address), npa.gateway_http_port FROM namespace_port_allocations npa JOIN namespace_clusters nc ON npa.namespace_cluster_id = nc.id JOIN dns_nodes dn ON npa.node_id = dn.id WHERE nc.namespace_name = ? AND nc.status = 'ready' - ORDER BY RANDOM() LIMIT 1 ` result, err := db.Query(internalCtx, query, namespaceName) if err != nil || result == nil || len(result.Rows) == 0 { @@ -799,16 +802,54 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R return } - gatewayIP := getString(result.Rows[0][0]) - if gatewayIP == "" { + type namespaceGatewayTarget struct { + ip string + port int + } + targets := make([]namespaceGatewayTarget, 0, len(result.Rows)) + for _, row := range result.Rows { + if len(row) == 0 { + continue + } + ip := getString(row[0]) + if ip == "" { + continue + } + port := 10004 + if len(row) > 1 { + if p := getInt(row[1]); p > 0 { + port = p + } + } + targets = append(targets, namespaceGatewayTarget{ip: ip, port: port}) + } + if len(targets) == 0 { http.Error(w, "Namespace gateway not available", http.StatusServiceUnavailable) return } - gatewayPort := 10004 - if p := getInt(result.Rows[0][1]); p > 0 { - gatewayPort = p - } + // Keep ordering deterministic before hashing, otherwise DB row order can vary. + sort.Slice(targets, func(i, j int) bool { + if targets[i].ip == targets[j].ip { + return targets[i].port < targets[j].port + } + return targets[i].ip < targets[j].ip + }) + + affinityKey := namespaceName + "|" + validatedNamespace + if apiKey := extractAPIKey(r); apiKey != "" { + affinityKey = namespaceName + "|" + apiKey + } else if authz := strings.TrimSpace(r.Header.Get("Authorization")); authz != "" { + affinityKey = namespaceName + "|" + authz + } else { + affinityKey = namespaceName + "|" + getClientIP(r) + } + hasher := fnv.New32a() + _, _ = hasher.Write([]byte(affinityKey)) + targetIdx := int(hasher.Sum32()) % len(targets) + selected := targets[targetIdx] + gatewayIP := selected.ip + gatewayPort := selected.port targetHost := gatewayIP + ":" + strconv.Itoa(gatewayPort) // Handle WebSocket upgrade requests specially (http.Client can't handle 101 Switching Protocols) diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index 4c9a0e6..d431ff8 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -30,7 +30,7 @@ type ClusterManagerConfig struct { GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001") // IPFS configuration for namespace gateways (defaults used if not set) IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094") - IPFSAPIURL string // IPFS API URL (default: "http://localhost:5001") + IPFSAPIURL string // IPFS API URL (default: "http://localhost:4501") IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) IPFSReplicationFactor int // IPFS replication factor (default: 3) } @@ -78,7 +78,7 @@ func NewClusterManager( } ipfsAPIURL := cfg.IPFSAPIURL if ipfsAPIURL == "" { - ipfsAPIURL = "http://localhost:5001" + ipfsAPIURL = "http://localhost:4501" } ipfsTimeout := cfg.IPFSTimeout if ipfsTimeout == 0 { @@ -122,7 +122,7 @@ func NewClusterManagerWithComponents( } ipfsAPIURL := cfg.IPFSAPIURL if ipfsAPIURL == "" { - ipfsAPIURL = "http://localhost:5001" + ipfsAPIURL = "http://localhost:4501" } ipfsTimeout := cfg.IPFSTimeout if ipfsTimeout == 0 { @@ -262,7 +262,7 @@ func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int, return nil, fmt.Errorf("failed to allocate ports on node %s: %w", node.NodeID, err) } portBlocks[i] = block - cm.logEvent(ctx, cluster.ID, EventPortsAllocated, node.NodeID, + cm.logEvent(ctx, cluster.ID, EventPortsAllocated, node.NodeID, fmt.Sprintf("Allocated ports %d-%d", block.PortStart, block.PortEnd), nil) } @@ -427,8 +427,8 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp NodeID: node.NodeID, HTTPPort: portBlocks[i].OlricHTTPPort, MemberlistPort: portBlocks[i].OlricMemberlistPort, - BindAddr: node.InternalIP, // Bind to WG IP directly (0.0.0.0 resolves to IPv6 on some hosts) - AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers + BindAddr: node.InternalIP, // Bind to WG IP directly (0.0.0.0 resolves to IPv6 on some hosts) + AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers PeerAddresses: peers, } } @@ -583,15 +583,15 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name // spawnRQLiteRemote sends a spawn-rqlite request to a remote node func (cm *ClusterManager) spawnRQLiteRemote(ctx context.Context, nodeIP string, cfg rqlite.InstanceConfig) (*rqlite.Instance, error) { resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ - "action": "spawn-rqlite", - "namespace": cfg.Namespace, - "node_id": cfg.NodeID, - "rqlite_http_port": cfg.HTTPPort, - "rqlite_raft_port": cfg.RaftPort, + "action": "spawn-rqlite", + "namespace": cfg.Namespace, + "node_id": cfg.NodeID, + "rqlite_http_port": cfg.HTTPPort, + "rqlite_raft_port": cfg.RaftPort, "rqlite_http_adv_addr": cfg.HTTPAdvAddress, "rqlite_raft_adv_addr": cfg.RaftAdvAddress, - "rqlite_join_addrs": cfg.JoinAddresses, - "rqlite_is_leader": cfg.IsLeader, + "rqlite_join_addrs": cfg.JoinAddresses, + "rqlite_is_leader": cfg.IsLeader, }) if err != nil { return nil, err @@ -602,14 +602,14 @@ func (cm *ClusterManager) spawnRQLiteRemote(ctx context.Context, nodeIP string, // spawnOlricRemote sends a spawn-olric request to a remote node func (cm *ClusterManager) spawnOlricRemote(ctx context.Context, nodeIP string, cfg olric.InstanceConfig) (*olric.OlricInstance, error) { resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ - "action": "spawn-olric", - "namespace": cfg.Namespace, - "node_id": cfg.NodeID, - "olric_http_port": cfg.HTTPPort, + "action": "spawn-olric", + "namespace": cfg.Namespace, + "node_id": cfg.NodeID, + "olric_http_port": cfg.HTTPPort, "olric_memberlist_port": cfg.MemberlistPort, - "olric_bind_addr": cfg.BindAddr, - "olric_advertise_addr": cfg.AdvertiseAddr, - "olric_peer_addresses": cfg.PeerAddresses, + "olric_bind_addr": cfg.BindAddr, + "olric_advertise_addr": cfg.AdvertiseAddr, + "olric_peer_addresses": cfg.PeerAddresses, }) if err != nil { return nil, err @@ -747,52 +747,33 @@ func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, n } // createDNSRecords creates DNS records for the namespace gateway. -// Creates A records for ALL nameservers (not just cluster nodes) so that any nameserver -// can receive requests and proxy them to the namespace cluster via internal routing. +// Creates A records pointing to the public IPs of nodes running the namespace gateway cluster. func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error { fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain) - // Query for ALL nameserver IPs (not just the selected cluster nodes) - // This ensures DNS round-robins across all nameservers, even those not hosting the cluster - type nameserverIP struct { - IPAddress string `db:"ip_address"` - } - var nameservers []nameserverIP - nameserverQuery := ` - SELECT DISTINCT ip_address - FROM dns_nameservers - WHERE domain = ? - ORDER BY hostname - ` - err := cm.db.Query(ctx, &nameservers, nameserverQuery, cm.baseDomain) - if err != nil { - cm.logger.Error("Failed to query nameservers for DNS records", - zap.String("domain", cm.baseDomain), - zap.Error(err), - ) - return fmt.Errorf("failed to query nameservers: %w", err) - } - - var nameserverIPs []string - for _, ns := range nameservers { - if ns.IPAddress != "" { - nameserverIPs = append(nameserverIPs, ns.IPAddress) + // Collect public IPs from the selected cluster nodes + var gatewayIPs []string + for _, node := range nodes { + if node.IPAddress != "" { + gatewayIPs = append(gatewayIPs, node.IPAddress) } } - // Fallback: if no nameservers found in dns_nameservers table, use cluster node IPs - // This maintains backwards compatibility with clusters created before nameserver tracking - if len(nameserverIPs) == 0 { - cm.logger.Warn("No nameservers found in dns_nameservers table, falling back to cluster node IPs", - zap.String("domain", cm.baseDomain), + if len(gatewayIPs) == 0 { + cm.logger.Error("No valid node IPs found for DNS records", + zap.String("namespace", cluster.NamespaceName), + zap.Int("node_count", len(nodes)), ) - for _, node := range nodes { - nameserverIPs = append(nameserverIPs, node.IPAddress) - } + return fmt.Errorf("no valid node IPs found for DNS records") } + cm.logger.Info("Creating DNS records for namespace gateway", + zap.String("namespace", cluster.NamespaceName), + zap.Strings("ips", gatewayIPs), + ) + recordCount := 0 - for _, ip := range nameserverIPs { + for _, ip := range gatewayIPs { query := ` INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by) VALUES (?, 'A', ?, 300, ?, 'system') @@ -805,7 +786,7 @@ func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *Namespa zap.Error(err), ) } else { - cm.logger.Info("Created DNS A record for nameserver", + cm.logger.Info("Created DNS A record for gateway node", zap.String("fqdn", fqdn), zap.String("ip", ip), ) @@ -813,7 +794,7 @@ func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *Namespa } } - cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d nameserver records)", fqdn, recordCount), nil) + cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d gateway node records)", fqdn, recordCount), nil) return nil } @@ -1329,12 +1310,12 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n // Get all nodes' IPs and port allocations type nodePortInfo struct { - NodeID string `db:"node_id"` - InternalIP string `db:"internal_ip"` - RQLiteHTTPPort int `db:"rqlite_http_port"` - RQLiteRaftPort int `db:"rqlite_raft_port"` - OlricHTTPPort int `db:"olric_http_port"` - OlricMemberlistPort int `db:"olric_memberlist_port"` + NodeID string `db:"node_id"` + InternalIP string `db:"internal_ip"` + RQLiteHTTPPort int `db:"rqlite_http_port"` + RQLiteRaftPort int `db:"rqlite_raft_port"` + OlricHTTPPort int `db:"olric_http_port"` + OlricMemberlistPort int `db:"olric_memberlist_port"` } var allNodePorts []nodePortInfo allPortsQuery := ` @@ -1557,15 +1538,15 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n // ClusterLocalState is persisted to disk so namespace processes can be restored // without querying the main RQLite cluster (which may not have a leader yet on cold start). type ClusterLocalState struct { - ClusterID string `json:"cluster_id"` - NamespaceName string `json:"namespace_name"` - LocalNodeID string `json:"local_node_id"` - LocalIP string `json:"local_ip"` - LocalPorts ClusterLocalStatePorts `json:"local_ports"` + ClusterID string `json:"cluster_id"` + NamespaceName string `json:"namespace_name"` + LocalNodeID string `json:"local_node_id"` + LocalIP string `json:"local_ip"` + LocalPorts ClusterLocalStatePorts `json:"local_ports"` AllNodes []ClusterLocalStateNode `json:"all_nodes"` - HasGateway bool `json:"has_gateway"` - BaseDomain string `json:"base_domain"` - SavedAt time.Time `json:"saved_at"` + HasGateway bool `json:"has_gateway"` + BaseDomain string `json:"base_domain"` + SavedAt time.Time `json:"saved_at"` } type ClusterLocalStatePorts struct { diff --git a/pkg/node/gateway.go b/pkg/node/gateway.go index d180fad..91b55fe 100644 --- a/pkg/node/gateway.go +++ b/pkg/node/gateway.go @@ -70,9 +70,13 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { if ormClient := apiGateway.GetORMClient(); ormClient != nil { baseDataDir := filepath.Join(os.ExpandEnv(n.config.Node.DataDir), "..", "data", "namespaces") clusterCfg := namespace.ClusterManagerConfig{ - BaseDomain: n.config.HTTPGateway.BaseDomain, - BaseDataDir: baseDataDir, - GlobalRQLiteDSN: gwCfg.RQLiteDSN, // Pass global RQLite DSN for namespace gateway auth + BaseDomain: n.config.HTTPGateway.BaseDomain, + BaseDataDir: baseDataDir, + GlobalRQLiteDSN: gwCfg.RQLiteDSN, // Pass global RQLite DSN for namespace gateway auth + IPFSClusterAPIURL: gwCfg.IPFSClusterAPIURL, + IPFSAPIURL: gwCfg.IPFSAPIURL, + IPFSTimeout: gwCfg.IPFSTimeout, + IPFSReplicationFactor: n.config.Database.IPFS.ReplicationFactor, } clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger) clusterManager.SetLocalNodeID(gwCfg.NodePeerID)