Fixed bugs on pubsub and ipfs

This commit is contained in:
anonpenguin23 2026-02-06 07:21:26 +02:00
parent 02b5c095d0
commit 854523c3a9
8 changed files with 120 additions and 88 deletions

View File

@ -0,0 +1,6 @@
# THIS IS AUTOGENERATED. DO NOT EDIT MANUALLY
version = 1
name = "network"
[setup]
script = "export MCP_BEARER_TOKEN=\"ra_9941ab97eb51668394a68963a2ab6fead0ca942afe437a6e2f4a520efcb24036\""

Binary file not shown.

View File

@ -127,7 +127,7 @@ func DefaultConfig() *Config {
// IPFS storage configuration // IPFS storage configuration
IPFS: IPFSConfig{ IPFS: IPFSConfig{
ClusterAPIURL: "", // Empty = disabled ClusterAPIURL: "", // Empty = disabled
APIURL: "http://localhost:5001", APIURL: "http://localhost:4501",
Timeout: 60 * time.Second, Timeout: 60 * time.Second,
ReplicationFactor: 3, ReplicationFactor: 3,
EnableEncryption: true, EnableEncryption: true,
@ -158,7 +158,7 @@ func DefaultConfig() *Config {
OlricServers: []string{"localhost:3320"}, OlricServers: []string{"localhost:3320"},
OlricTimeout: 10 * time.Second, OlricTimeout: 10 * time.Second,
IPFSClusterAPIURL: "http://localhost:9094", IPFSClusterAPIURL: "http://localhost:9094",
IPFSAPIURL: "http://localhost:5001", IPFSAPIURL: "http://localhost:4501",
IPFSTimeout: 60 * time.Second, IPFSTimeout: 60 * time.Second,
}, },
} }

View File

@ -41,8 +41,8 @@ type IPFSConfig struct {
// If empty, IPFS storage is disabled for this node // If empty, IPFS storage is disabled for this node
ClusterAPIURL string `yaml:"cluster_api_url"` ClusterAPIURL string `yaml:"cluster_api_url"`
// APIURL is the IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001") // APIURL is the IPFS HTTP API URL for content retrieval (e.g., "http://localhost:4501")
// If empty, defaults to "http://localhost:5001" // If empty, defaults to "http://localhost:4501"
APIURL string `yaml:"api_url"` APIURL string `yaml:"api_url"`
// Timeout for IPFS operations // Timeout for IPFS operations

View File

@ -34,7 +34,7 @@ type Config struct {
// IPFS Cluster configuration // IPFS Cluster configuration
IPFSClusterAPIURL string // IPFS Cluster HTTP API URL (e.g., "http://localhost:9094"). If empty, gateway will discover from node configs IPFSClusterAPIURL string // IPFS Cluster HTTP API URL (e.g., "http://localhost:9094"). If empty, gateway will discover from node configs
IPFSAPIURL string // IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001"). If empty, gateway will discover from node configs IPFSAPIURL string // IPFS HTTP API URL for content retrieval (e.g., "http://localhost:4501"). If empty, gateway will discover from node configs
IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s)
IPFSReplicationFactor int // Replication factor for pins (default: 3) IPFSReplicationFactor int // Replication factor for pins (default: 3)
IPFSEnableEncryption bool // Enable client-side encryption before upload (default: true, discovered from node configs) IPFSEnableEncryption bool // Enable client-side encryption before upload (default: true, discovered from node configs)

View File

@ -3,9 +3,11 @@ package gateway
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"hash/fnv"
"io" "io"
"net" "net"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -781,14 +783,15 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R
db := g.client.Database() db := g.client.Database()
internalCtx := client.WithInternalAuth(r.Context()) internalCtx := client.WithInternalAuth(r.Context())
// Single query: get internal IP + gateway port from cluster tables // Query all ready namespace gateways and choose a stable target.
// Random selection causes WS subscribe and publish calls to hit different
// nodes, which makes pubsub delivery flaky for short-lived subscriptions.
query := ` query := `
SELECT COALESCE(dn.internal_ip, dn.ip_address), npa.gateway_http_port SELECT COALESCE(dn.internal_ip, dn.ip_address), npa.gateway_http_port
FROM namespace_port_allocations npa FROM namespace_port_allocations npa
JOIN namespace_clusters nc ON npa.namespace_cluster_id = nc.id JOIN namespace_clusters nc ON npa.namespace_cluster_id = nc.id
JOIN dns_nodes dn ON npa.node_id = dn.id JOIN dns_nodes dn ON npa.node_id = dn.id
WHERE nc.namespace_name = ? AND nc.status = 'ready' WHERE nc.namespace_name = ? AND nc.status = 'ready'
ORDER BY RANDOM() LIMIT 1
` `
result, err := db.Query(internalCtx, query, namespaceName) result, err := db.Query(internalCtx, query, namespaceName)
if err != nil || result == nil || len(result.Rows) == 0 { if err != nil || result == nil || len(result.Rows) == 0 {
@ -799,16 +802,54 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R
return return
} }
gatewayIP := getString(result.Rows[0][0]) type namespaceGatewayTarget struct {
if gatewayIP == "" { ip string
port int
}
targets := make([]namespaceGatewayTarget, 0, len(result.Rows))
for _, row := range result.Rows {
if len(row) == 0 {
continue
}
ip := getString(row[0])
if ip == "" {
continue
}
port := 10004
if len(row) > 1 {
if p := getInt(row[1]); p > 0 {
port = p
}
}
targets = append(targets, namespaceGatewayTarget{ip: ip, port: port})
}
if len(targets) == 0 {
http.Error(w, "Namespace gateway not available", http.StatusServiceUnavailable) http.Error(w, "Namespace gateway not available", http.StatusServiceUnavailable)
return return
} }
gatewayPort := 10004
if p := getInt(result.Rows[0][1]); p > 0 {
gatewayPort = p
}
// Keep ordering deterministic before hashing, otherwise DB row order can vary.
sort.Slice(targets, func(i, j int) bool {
if targets[i].ip == targets[j].ip {
return targets[i].port < targets[j].port
}
return targets[i].ip < targets[j].ip
})
affinityKey := namespaceName + "|" + validatedNamespace
if apiKey := extractAPIKey(r); apiKey != "" {
affinityKey = namespaceName + "|" + apiKey
} else if authz := strings.TrimSpace(r.Header.Get("Authorization")); authz != "" {
affinityKey = namespaceName + "|" + authz
} else {
affinityKey = namespaceName + "|" + getClientIP(r)
}
hasher := fnv.New32a()
_, _ = hasher.Write([]byte(affinityKey))
targetIdx := int(hasher.Sum32()) % len(targets)
selected := targets[targetIdx]
gatewayIP := selected.ip
gatewayPort := selected.port
targetHost := gatewayIP + ":" + strconv.Itoa(gatewayPort) targetHost := gatewayIP + ":" + strconv.Itoa(gatewayPort)
// Handle WebSocket upgrade requests specially (http.Client can't handle 101 Switching Protocols) // Handle WebSocket upgrade requests specially (http.Client can't handle 101 Switching Protocols)

View File

@ -30,7 +30,7 @@ type ClusterManagerConfig struct {
GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001") GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001")
// IPFS configuration for namespace gateways (defaults used if not set) // IPFS configuration for namespace gateways (defaults used if not set)
IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094") IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094")
IPFSAPIURL string // IPFS API URL (default: "http://localhost:5001") IPFSAPIURL string // IPFS API URL (default: "http://localhost:4501")
IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s)
IPFSReplicationFactor int // IPFS replication factor (default: 3) IPFSReplicationFactor int // IPFS replication factor (default: 3)
} }
@ -78,7 +78,7 @@ func NewClusterManager(
} }
ipfsAPIURL := cfg.IPFSAPIURL ipfsAPIURL := cfg.IPFSAPIURL
if ipfsAPIURL == "" { if ipfsAPIURL == "" {
ipfsAPIURL = "http://localhost:5001" ipfsAPIURL = "http://localhost:4501"
} }
ipfsTimeout := cfg.IPFSTimeout ipfsTimeout := cfg.IPFSTimeout
if ipfsTimeout == 0 { if ipfsTimeout == 0 {
@ -122,7 +122,7 @@ func NewClusterManagerWithComponents(
} }
ipfsAPIURL := cfg.IPFSAPIURL ipfsAPIURL := cfg.IPFSAPIURL
if ipfsAPIURL == "" { if ipfsAPIURL == "" {
ipfsAPIURL = "http://localhost:5001" ipfsAPIURL = "http://localhost:4501"
} }
ipfsTimeout := cfg.IPFSTimeout ipfsTimeout := cfg.IPFSTimeout
if ipfsTimeout == 0 { if ipfsTimeout == 0 {
@ -262,7 +262,7 @@ func (cm *ClusterManager) ProvisionCluster(ctx context.Context, namespaceID int,
return nil, fmt.Errorf("failed to allocate ports on node %s: %w", node.NodeID, err) return nil, fmt.Errorf("failed to allocate ports on node %s: %w", node.NodeID, err)
} }
portBlocks[i] = block portBlocks[i] = block
cm.logEvent(ctx, cluster.ID, EventPortsAllocated, node.NodeID, cm.logEvent(ctx, cluster.ID, EventPortsAllocated, node.NodeID,
fmt.Sprintf("Allocated ports %d-%d", block.PortStart, block.PortEnd), nil) fmt.Sprintf("Allocated ports %d-%d", block.PortStart, block.PortEnd), nil)
} }
@ -427,8 +427,8 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp
NodeID: node.NodeID, NodeID: node.NodeID,
HTTPPort: portBlocks[i].OlricHTTPPort, HTTPPort: portBlocks[i].OlricHTTPPort,
MemberlistPort: portBlocks[i].OlricMemberlistPort, MemberlistPort: portBlocks[i].OlricMemberlistPort,
BindAddr: node.InternalIP, // Bind to WG IP directly (0.0.0.0 resolves to IPv6 on some hosts) BindAddr: node.InternalIP, // Bind to WG IP directly (0.0.0.0 resolves to IPv6 on some hosts)
AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers AdvertiseAddr: node.InternalIP, // Advertise WG IP to peers
PeerAddresses: peers, PeerAddresses: peers,
} }
} }
@ -583,15 +583,15 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name
// spawnRQLiteRemote sends a spawn-rqlite request to a remote node // spawnRQLiteRemote sends a spawn-rqlite request to a remote node
func (cm *ClusterManager) spawnRQLiteRemote(ctx context.Context, nodeIP string, cfg rqlite.InstanceConfig) (*rqlite.Instance, error) { func (cm *ClusterManager) spawnRQLiteRemote(ctx context.Context, nodeIP string, cfg rqlite.InstanceConfig) (*rqlite.Instance, error) {
resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
"action": "spawn-rqlite", "action": "spawn-rqlite",
"namespace": cfg.Namespace, "namespace": cfg.Namespace,
"node_id": cfg.NodeID, "node_id": cfg.NodeID,
"rqlite_http_port": cfg.HTTPPort, "rqlite_http_port": cfg.HTTPPort,
"rqlite_raft_port": cfg.RaftPort, "rqlite_raft_port": cfg.RaftPort,
"rqlite_http_adv_addr": cfg.HTTPAdvAddress, "rqlite_http_adv_addr": cfg.HTTPAdvAddress,
"rqlite_raft_adv_addr": cfg.RaftAdvAddress, "rqlite_raft_adv_addr": cfg.RaftAdvAddress,
"rqlite_join_addrs": cfg.JoinAddresses, "rqlite_join_addrs": cfg.JoinAddresses,
"rqlite_is_leader": cfg.IsLeader, "rqlite_is_leader": cfg.IsLeader,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -602,14 +602,14 @@ func (cm *ClusterManager) spawnRQLiteRemote(ctx context.Context, nodeIP string,
// spawnOlricRemote sends a spawn-olric request to a remote node // spawnOlricRemote sends a spawn-olric request to a remote node
func (cm *ClusterManager) spawnOlricRemote(ctx context.Context, nodeIP string, cfg olric.InstanceConfig) (*olric.OlricInstance, error) { func (cm *ClusterManager) spawnOlricRemote(ctx context.Context, nodeIP string, cfg olric.InstanceConfig) (*olric.OlricInstance, error) {
resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{ resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
"action": "spawn-olric", "action": "spawn-olric",
"namespace": cfg.Namespace, "namespace": cfg.Namespace,
"node_id": cfg.NodeID, "node_id": cfg.NodeID,
"olric_http_port": cfg.HTTPPort, "olric_http_port": cfg.HTTPPort,
"olric_memberlist_port": cfg.MemberlistPort, "olric_memberlist_port": cfg.MemberlistPort,
"olric_bind_addr": cfg.BindAddr, "olric_bind_addr": cfg.BindAddr,
"olric_advertise_addr": cfg.AdvertiseAddr, "olric_advertise_addr": cfg.AdvertiseAddr,
"olric_peer_addresses": cfg.PeerAddresses, "olric_peer_addresses": cfg.PeerAddresses,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -747,52 +747,33 @@ func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, n
} }
// createDNSRecords creates DNS records for the namespace gateway. // createDNSRecords creates DNS records for the namespace gateway.
// Creates A records for ALL nameservers (not just cluster nodes) so that any nameserver // Creates A records pointing to the public IPs of nodes running the namespace gateway cluster.
// can receive requests and proxy them to the namespace cluster via internal routing.
func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error { func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error {
fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain) fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain)
// Query for ALL nameserver IPs (not just the selected cluster nodes) // Collect public IPs from the selected cluster nodes
// This ensures DNS round-robins across all nameservers, even those not hosting the cluster var gatewayIPs []string
type nameserverIP struct { for _, node := range nodes {
IPAddress string `db:"ip_address"` if node.IPAddress != "" {
} gatewayIPs = append(gatewayIPs, node.IPAddress)
var nameservers []nameserverIP
nameserverQuery := `
SELECT DISTINCT ip_address
FROM dns_nameservers
WHERE domain = ?
ORDER BY hostname
`
err := cm.db.Query(ctx, &nameservers, nameserverQuery, cm.baseDomain)
if err != nil {
cm.logger.Error("Failed to query nameservers for DNS records",
zap.String("domain", cm.baseDomain),
zap.Error(err),
)
return fmt.Errorf("failed to query nameservers: %w", err)
}
var nameserverIPs []string
for _, ns := range nameservers {
if ns.IPAddress != "" {
nameserverIPs = append(nameserverIPs, ns.IPAddress)
} }
} }
// Fallback: if no nameservers found in dns_nameservers table, use cluster node IPs if len(gatewayIPs) == 0 {
// This maintains backwards compatibility with clusters created before nameserver tracking cm.logger.Error("No valid node IPs found for DNS records",
if len(nameserverIPs) == 0 { zap.String("namespace", cluster.NamespaceName),
cm.logger.Warn("No nameservers found in dns_nameservers table, falling back to cluster node IPs", zap.Int("node_count", len(nodes)),
zap.String("domain", cm.baseDomain),
) )
for _, node := range nodes { return fmt.Errorf("no valid node IPs found for DNS records")
nameserverIPs = append(nameserverIPs, node.IPAddress)
}
} }
cm.logger.Info("Creating DNS records for namespace gateway",
zap.String("namespace", cluster.NamespaceName),
zap.Strings("ips", gatewayIPs),
)
recordCount := 0 recordCount := 0
for _, ip := range nameserverIPs { for _, ip := range gatewayIPs {
query := ` query := `
INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by) INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by)
VALUES (?, 'A', ?, 300, ?, 'system') VALUES (?, 'A', ?, 300, ?, 'system')
@ -805,7 +786,7 @@ func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *Namespa
zap.Error(err), zap.Error(err),
) )
} else { } else {
cm.logger.Info("Created DNS A record for nameserver", cm.logger.Info("Created DNS A record for gateway node",
zap.String("fqdn", fqdn), zap.String("fqdn", fqdn),
zap.String("ip", ip), zap.String("ip", ip),
) )
@ -813,7 +794,7 @@ func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *Namespa
} }
} }
cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d nameserver records)", fqdn, recordCount), nil) cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d gateway node records)", fqdn, recordCount), nil)
return nil return nil
} }
@ -1329,12 +1310,12 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
// Get all nodes' IPs and port allocations // Get all nodes' IPs and port allocations
type nodePortInfo struct { type nodePortInfo struct {
NodeID string `db:"node_id"` NodeID string `db:"node_id"`
InternalIP string `db:"internal_ip"` InternalIP string `db:"internal_ip"`
RQLiteHTTPPort int `db:"rqlite_http_port"` RQLiteHTTPPort int `db:"rqlite_http_port"`
RQLiteRaftPort int `db:"rqlite_raft_port"` RQLiteRaftPort int `db:"rqlite_raft_port"`
OlricHTTPPort int `db:"olric_http_port"` OlricHTTPPort int `db:"olric_http_port"`
OlricMemberlistPort int `db:"olric_memberlist_port"` OlricMemberlistPort int `db:"olric_memberlist_port"`
} }
var allNodePorts []nodePortInfo var allNodePorts []nodePortInfo
allPortsQuery := ` allPortsQuery := `
@ -1557,15 +1538,15 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
// ClusterLocalState is persisted to disk so namespace processes can be restored // ClusterLocalState is persisted to disk so namespace processes can be restored
// without querying the main RQLite cluster (which may not have a leader yet on cold start). // without querying the main RQLite cluster (which may not have a leader yet on cold start).
type ClusterLocalState struct { type ClusterLocalState struct {
ClusterID string `json:"cluster_id"` ClusterID string `json:"cluster_id"`
NamespaceName string `json:"namespace_name"` NamespaceName string `json:"namespace_name"`
LocalNodeID string `json:"local_node_id"` LocalNodeID string `json:"local_node_id"`
LocalIP string `json:"local_ip"` LocalIP string `json:"local_ip"`
LocalPorts ClusterLocalStatePorts `json:"local_ports"` LocalPorts ClusterLocalStatePorts `json:"local_ports"`
AllNodes []ClusterLocalStateNode `json:"all_nodes"` AllNodes []ClusterLocalStateNode `json:"all_nodes"`
HasGateway bool `json:"has_gateway"` HasGateway bool `json:"has_gateway"`
BaseDomain string `json:"base_domain"` BaseDomain string `json:"base_domain"`
SavedAt time.Time `json:"saved_at"` SavedAt time.Time `json:"saved_at"`
} }
type ClusterLocalStatePorts struct { type ClusterLocalStatePorts struct {

View File

@ -70,9 +70,13 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
if ormClient := apiGateway.GetORMClient(); ormClient != nil { if ormClient := apiGateway.GetORMClient(); ormClient != nil {
baseDataDir := filepath.Join(os.ExpandEnv(n.config.Node.DataDir), "..", "data", "namespaces") baseDataDir := filepath.Join(os.ExpandEnv(n.config.Node.DataDir), "..", "data", "namespaces")
clusterCfg := namespace.ClusterManagerConfig{ clusterCfg := namespace.ClusterManagerConfig{
BaseDomain: n.config.HTTPGateway.BaseDomain, BaseDomain: n.config.HTTPGateway.BaseDomain,
BaseDataDir: baseDataDir, BaseDataDir: baseDataDir,
GlobalRQLiteDSN: gwCfg.RQLiteDSN, // Pass global RQLite DSN for namespace gateway auth GlobalRQLiteDSN: gwCfg.RQLiteDSN, // Pass global RQLite DSN for namespace gateway auth
IPFSClusterAPIURL: gwCfg.IPFSClusterAPIURL,
IPFSAPIURL: gwCfg.IPFSAPIURL,
IPFSTimeout: gwCfg.IPFSTimeout,
IPFSReplicationFactor: n.config.Database.IPFS.ReplicationFactor,
} }
clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger) clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger)
clusterManager.SetLocalNodeID(gwCfg.NodePeerID) clusterManager.SetLocalNodeID(gwCfg.NodePeerID)