From f972358e787d878ab247e0f9763425c45cea392f Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Wed, 4 Feb 2026 16:14:49 +0200 Subject: [PATCH] Bored of fixing bugs --- cmd/gateway/config.go | 5 ++ pkg/cli/utils/systemd.go | 30 ++++++-- pkg/gateway/config.go | 4 + pkg/gateway/gateway.go | 30 ++++++++ .../handlers/storage/download_handler.go | 14 ++++ pkg/gateway/handlers/storage/handlers.go | 17 +++++ pkg/gateway/instance_spawner.go | 8 ++ pkg/gateway/middleware.go | 8 +- pkg/namespace/cluster_manager.go | 76 +++++++++++++++---- pkg/node/gateway.go | 5 +- pkg/olric/instance_spawner.go | 9 ++- 11 files changed, 182 insertions(+), 24 deletions(-) diff --git a/cmd/gateway/config.go b/cmd/gateway/config.go index feee2f5..017ff0d 100644 --- a/cmd/gateway/config.go +++ b/cmd/gateway/config.go @@ -77,6 +77,7 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { ListenAddr string `yaml:"listen_addr"` ClientNamespace string `yaml:"client_namespace"` RQLiteDSN string `yaml:"rqlite_dsn"` + GlobalRQLiteDSN string `yaml:"global_rqlite_dsn"` Peers []string `yaml:"bootstrap_peers"` EnableHTTPS bool `yaml:"enable_https"` DomainName string `yaml:"domain_name"` @@ -113,6 +114,7 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { ClientNamespace: "default", BootstrapPeers: nil, RQLiteDSN: "", + GlobalRQLiteDSN: "", EnableHTTPS: false, DomainName: "", TLSCacheDir: "", @@ -133,6 +135,9 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { if v := strings.TrimSpace(y.RQLiteDSN); v != "" { cfg.RQLiteDSN = v } + if v := strings.TrimSpace(y.GlobalRQLiteDSN); v != "" { + cfg.GlobalRQLiteDSN = v + } if len(y.Peers) > 0 { var peers []string for _, p := range y.Peers { diff --git a/pkg/cli/utils/systemd.go b/pkg/cli/utils/systemd.go index 1aede4d..6d59962 100644 --- a/pkg/cli/utils/systemd.go +++ b/pkg/cli/utils/systemd.go @@ -150,10 +150,11 @@ func IsServiceMasked(service string) (bool, error) { return false, nil } -// GetProductionServices returns a list of all DeBros production service names that exist +// GetProductionServices returns a list of all DeBros production service names that exist, +// including both global services and namespace-specific services func GetProductionServices() []string { - // Unified service names (no bootstrap/node distinction) - allServices := []string{ + // Global/default service names + globalServices := []string{ "debros-gateway", "debros-node", "debros-olric", @@ -163,15 +164,34 @@ func GetProductionServices() []string { "debros-anyone-relay", } - // Filter to only existing services by checking if unit file exists var existing []string - for _, svc := range allServices { + + // Add existing global services + for _, svc := range globalServices { unitPath := filepath.Join("/etc/systemd/system", svc+".service") if _, err := os.Stat(unitPath); err == nil { existing = append(existing, svc) } } + // Also discover namespace-specific services (debros-*@.service) + // These are created when namespaces are provisioned and need to be restarted too + systemdDir := "/etc/systemd/system" + entries, err := os.ReadDir(systemdDir) + if err == nil { + for _, entry := range entries { + name := entry.Name() + // Look for debros-*@*.service pattern (namespace services) + if strings.HasPrefix(name, "debros-") && + strings.Contains(name, "@") && + strings.HasSuffix(name, ".service") { + // Extract service name without .service extension + serviceName := strings.TrimSuffix(name, ".service") + existing = append(existing, serviceName) + } + } + } + return existing } diff --git a/pkg/gateway/config.go b/pkg/gateway/config.go index ecdcd30..f07a1d0 100644 --- a/pkg/gateway/config.go +++ b/pkg/gateway/config.go @@ -13,6 +13,10 @@ type Config struct { // If empty, defaults to "http://localhost:4001". RQLiteDSN string + // Global RQLite DSN for API key validation (for namespace gateways) + // If empty, uses RQLiteDSN (for main/global gateways) + GlobalRQLiteDSN string + // HTTPS configuration EnableHTTPS bool // Enable HTTPS with ACME (Let's Encrypt) DomainName string // Domain name for HTTPS certificate diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index cdbcb42..20a19e9 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -52,6 +52,9 @@ type Gateway struct { ormClient rqlite.Client ormHTTP *rqlite.HTTPGateway + // Global RQLite client for API key validation (namespace gateways only) + authClient client.NetworkClient + // Olric cache client olricClient *olric.Client olricMu sync.RWMutex @@ -237,6 +240,33 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { presenceMembers: make(map[string][]PresenceMember), } + // Create separate auth client for global RQLite if GlobalRQLiteDSN is provided + // This allows namespace gateways to validate API keys against the global database + if cfg.GlobalRQLiteDSN != "" && cfg.GlobalRQLiteDSN != cfg.RQLiteDSN { + logger.ComponentInfo(logging.ComponentGeneral, "Creating global auth client...", + zap.String("global_dsn", cfg.GlobalRQLiteDSN), + ) + + // Create client config for global namespace + authCfg := client.DefaultClientConfig("default") // Use "default" namespace for global + authCfg.DatabaseEndpoints = []string{cfg.GlobalRQLiteDSN} + if len(cfg.BootstrapPeers) > 0 { + authCfg.BootstrapPeers = cfg.BootstrapPeers + } + + authClient, err := client.NewClient(authCfg) + if err != nil { + logger.ComponentWarn(logging.ComponentGeneral, "Failed to create global auth client", zap.Error(err)) + } else { + if err := authClient.Connect(); err != nil { + logger.ComponentWarn(logging.ComponentGeneral, "Failed to connect global auth client", zap.Error(err)) + } else { + gw.authClient = authClient + logger.ComponentInfo(logging.ComponentGeneral, "Global auth client connected") + } + } + } + // Initialize handler instances gw.pubsubHandlers = pubsubhandlers.NewPubSubHandlers(deps.Client, logger) diff --git a/pkg/gateway/handlers/storage/download_handler.go b/pkg/gateway/handlers/storage/download_handler.go index 3614a3f..23fa65f 100644 --- a/pkg/gateway/handlers/storage/download_handler.go +++ b/pkg/gateway/handlers/storage/download_handler.go @@ -40,7 +40,12 @@ func (h *Handlers) DownloadHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() + h.logger.ComponentDebug(logging.ComponentGeneral, "Starting CID retrieval", + zap.String("cid", path), + zap.String("namespace", namespace)) + // Check if namespace owns this CID (namespace isolation) + h.logger.ComponentDebug(logging.ComponentGeneral, "Checking CID ownership", zap.String("cid", path)) hasAccess, err := h.checkCIDOwnership(ctx, path, namespace) if err != nil { h.logger.ComponentError(logging.ComponentGeneral, "failed to check CID ownership", @@ -55,12 +60,18 @@ func (h *Handlers) DownloadHandler(w http.ResponseWriter, r *http.Request) { return } + h.logger.ComponentDebug(logging.ComponentGeneral, "CID ownership check passed", zap.String("cid", path)) + // Get IPFS API URL from config ipfsAPIURL := h.config.IPFSAPIURL if ipfsAPIURL == "" { ipfsAPIURL = "http://localhost:5001" } + h.logger.ComponentDebug(logging.ComponentGeneral, "Fetching content from IPFS", + zap.String("cid", path), + zap.String("ipfs_api_url", ipfsAPIURL)) + reader, err := h.ipfsClient.Get(ctx, path, ipfsAPIURL) if err != nil { h.logger.ComponentError(logging.ComponentGeneral, "failed to get content from IPFS", @@ -77,6 +88,9 @@ func (h *Handlers) DownloadHandler(w http.ResponseWriter, r *http.Request) { } defer reader.Close() + h.logger.ComponentDebug(logging.ComponentGeneral, "Successfully retrieved content from IPFS, starting stream", + zap.String("cid", path)) + // Set headers for file download w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", path)) diff --git a/pkg/gateway/handlers/storage/handlers.go b/pkg/gateway/handlers/storage/handlers.go index a05ec9f..7a080a6 100644 --- a/pkg/gateway/handlers/storage/handlers.go +++ b/pkg/gateway/handlers/storage/handlers.go @@ -3,11 +3,13 @@ package storage import ( "context" "io" + "time" "github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys" "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" ) // IPFSClient defines the interface for interacting with IPFS. @@ -82,13 +84,28 @@ func (h *Handlers) checkCIDOwnership(ctx context.Context, cid, namespace string) return true, nil // Allow access in test mode } + // Add 5-second timeout to prevent hanging on slow RQLite queries + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + h.logger.ComponentDebug(logging.ComponentGeneral, "Querying RQLite for CID ownership", + zap.String("cid", cid), + zap.String("namespace", namespace)) + query := `SELECT COUNT(*) as count FROM ipfs_content_ownership WHERE cid = ? AND namespace = ?` var result []map[string]interface{} if err := h.db.Query(ctx, &result, query, cid, namespace); err != nil { + h.logger.ComponentError(logging.ComponentGeneral, "RQLite ownership query failed", + zap.Error(err), + zap.String("cid", cid)) return false, err } + h.logger.ComponentDebug(logging.ComponentGeneral, "RQLite ownership query completed", + zap.String("cid", cid), + zap.Int("result_count", len(result))) + if len(result) == 0 { return false, nil } diff --git a/pkg/gateway/instance_spawner.go b/pkg/gateway/instance_spawner.go index f208978..6d0563e 100644 --- a/pkg/gateway/instance_spawner.go +++ b/pkg/gateway/instance_spawner.go @@ -77,7 +77,9 @@ type InstanceConfig struct { HTTPPort int // HTTP API port BaseDomain string // Base domain (e.g., "orama-devnet.network") RQLiteDSN string // RQLite connection DSN (e.g., "http://localhost:10000") + GlobalRQLiteDSN string // Global RQLite DSN for API key validation (empty = use RQLiteDSN) OlricServers []string // Olric server addresses + OlricTimeout time.Duration // Timeout for Olric operations NodePeerID string // Physical node's peer ID for home node management DataDir string // Data directory for deployments, SQLite, etc. // IPFS configuration for storage endpoints @@ -94,6 +96,7 @@ type GatewayYAMLConfig struct { ListenAddr string `yaml:"listen_addr"` ClientNamespace string `yaml:"client_namespace"` RQLiteDSN string `yaml:"rqlite_dsn"` + GlobalRQLiteDSN string `yaml:"global_rqlite_dsn,omitempty"` BootstrapPeers []string `yaml:"bootstrap_peers,omitempty"` EnableHTTPS bool `yaml:"enable_https,omitempty"` DomainName string `yaml:"domain_name,omitempty"` @@ -277,6 +280,7 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig, ListenAddr: fmt.Sprintf(":%d", cfg.HTTPPort), ClientNamespace: cfg.Namespace, RQLiteDSN: cfg.RQLiteDSN, + GlobalRQLiteDSN: cfg.GlobalRQLiteDSN, OlricServers: cfg.OlricServers, // Note: DomainName is used for HTTPS/TLS, not needed for namespace gateways in dev mode DomainName: cfg.BaseDomain, @@ -285,6 +289,10 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig, IPFSAPIURL: cfg.IPFSAPIURL, IPFSReplicationFactor: cfg.IPFSReplicationFactor, } + // Set Olric timeout if provided + if cfg.OlricTimeout > 0 { + gatewayCfg.OlricTimeout = cfg.OlricTimeout.String() + } // Set IPFS timeout if provided if cfg.IPFSTimeout > 0 { gatewayCfg.IPFSTimeout = cfg.IPFSTimeout.String() diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 8da98d9..b81bc81 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -277,7 +277,13 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler { } // Look up API key in DB and derive namespace - db := g.client.Database() + // Use authClient for namespace gateways (validates against global RQLite) + // Otherwise use regular client for global gateways + authClient := g.client + if g.authClient != nil { + authClient = g.authClient + } + db := authClient.Database() // Use internal auth for DB validation (auth not established yet) internalCtx := client.WithInternalAuth(r.Context()) // Join to namespaces to resolve name in one query diff --git a/pkg/namespace/cluster_manager.go b/pkg/namespace/cluster_manager.go index 16207d3..51f5342 100644 --- a/pkg/namespace/cluster_manager.go +++ b/pkg/namespace/cluster_manager.go @@ -24,8 +24,9 @@ import ( // ClusterManagerConfig contains configuration for the cluster manager type ClusterManagerConfig struct { - BaseDomain string // Base domain for namespace gateways (e.g., "orama-devnet.network") - BaseDataDir string // Base directory for namespace data (e.g., "~/.orama/data/namespaces") + BaseDomain string // Base domain for namespace gateways (e.g., "orama-devnet.network") + BaseDataDir string // Base directory for namespace data (e.g., "~/.orama/data/namespaces") + GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001") // IPFS configuration for namespace gateways (defaults used if not set) IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094") IPFSAPIURL string // IPFS API URL (default: "http://localhost:5001") @@ -41,9 +42,10 @@ type ClusterManager struct { rqliteSpawner *rqlite.InstanceSpawner olricSpawner *olric.InstanceSpawner gatewaySpawner *gateway.InstanceSpawner - logger *zap.Logger - baseDomain string - baseDataDir string + logger *zap.Logger + baseDomain string + baseDataDir string + globalRQLiteDSN string // Global RQLite DSN for namespace gateway auth // IPFS configuration for namespace gateways ipfsClusterAPIURL string @@ -99,6 +101,7 @@ func NewClusterManager( gatewaySpawner: gatewaySpawner, baseDomain: cfg.BaseDomain, baseDataDir: cfg.BaseDataDir, + globalRQLiteDSN: cfg.GlobalRQLiteDSN, ipfsClusterAPIURL: ipfsClusterAPIURL, ipfsAPIURL: ipfsAPIURL, ipfsTimeout: ipfsTimeout, @@ -146,6 +149,7 @@ func NewClusterManagerWithComponents( gatewaySpawner: gatewaySpawner, baseDomain: cfg.BaseDomain, baseDataDir: cfg.BaseDataDir, + globalRQLiteDSN: cfg.GlobalRQLiteDSN, ipfsClusterAPIURL: ipfsClusterAPIURL, ipfsAPIURL: ipfsAPIURL, ipfsTimeout: ipfsTimeout, @@ -479,7 +483,9 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name HTTPPort: portBlocks[i].GatewayHTTPPort, BaseDomain: cm.baseDomain, RQLiteDSN: rqliteDSN, + GlobalRQLiteDSN: cm.globalRQLiteDSN, OlricServers: olricServers, + OlricTimeout: 30 * time.Second, IPFSClusterAPIURL: cm.ipfsClusterAPIURL, IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, @@ -683,35 +689,73 @@ func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, n } // createDNSRecords creates DNS records for the namespace gateway. -// All namespace nodes get DNS A records since all nodes now run Caddy -// and can serve TLS for ns-{namespace}.{baseDomain} subdomains. +// Creates A records for ALL nameservers (not just cluster nodes) so that any nameserver +// can receive requests and proxy them to the namespace cluster via internal routing. func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error { fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain) + // Query for ALL nameserver IPs (not just the selected cluster nodes) + // This ensures DNS round-robins across all nameservers, even those not hosting the cluster + type nameserverIP struct { + IPAddress string `db:"ip_address"` + } + var nameservers []nameserverIP + nameserverQuery := ` + SELECT DISTINCT ip_address + FROM dns_nameservers + WHERE domain = ? + ORDER BY hostname + ` + err := cm.db.Query(ctx, &nameservers, nameserverQuery, cm.baseDomain) + if err != nil { + cm.logger.Error("Failed to query nameservers for DNS records", + zap.String("domain", cm.baseDomain), + zap.Error(err), + ) + return fmt.Errorf("failed to query nameservers: %w", err) + } + + var nameserverIPs []string + for _, ns := range nameservers { + if ns.IPAddress != "" { + nameserverIPs = append(nameserverIPs, ns.IPAddress) + } + } + + // Fallback: if no nameservers found in dns_nameservers table, use cluster node IPs + // This maintains backwards compatibility with clusters created before nameserver tracking + if len(nameserverIPs) == 0 { + cm.logger.Warn("No nameservers found in dns_nameservers table, falling back to cluster node IPs", + zap.String("domain", cm.baseDomain), + ) + for _, node := range nodes { + nameserverIPs = append(nameserverIPs, node.IPAddress) + } + } + recordCount := 0 - for i, node := range nodes { + for _, ip := range nameserverIPs { query := ` INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by) VALUES (?, 'A', ?, 300, ?, 'system') ` - _, err := cm.db.Exec(ctx, query, fqdn, node.IPAddress, cluster.NamespaceName) + _, err := cm.db.Exec(ctx, query, fqdn, ip, cluster.NamespaceName) if err != nil { cm.logger.Warn("Failed to create DNS record", zap.String("fqdn", fqdn), - zap.String("ip", node.IPAddress), + zap.String("ip", ip), zap.Error(err), ) } else { - cm.logger.Info("Created DNS A record", + cm.logger.Info("Created DNS A record for nameserver", zap.String("fqdn", fqdn), - zap.String("ip", node.IPAddress), - zap.Int("gateway_port", portBlocks[i].GatewayHTTPPort), + zap.String("ip", ip), ) recordCount++ } } - cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d records)", fqdn, recordCount), nil) + cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d nameserver records)", fqdn, recordCount), nil) return nil } @@ -1393,7 +1437,9 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n HTTPPort: pb.GatewayHTTPPort, BaseDomain: cm.baseDomain, RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), + GlobalRQLiteDSN: cm.globalRQLiteDSN, OlricServers: olricServers, + OlricTimeout: 30 * time.Second, IPFSClusterAPIURL: cm.ipfsClusterAPIURL, IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, @@ -1648,7 +1694,9 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl HTTPPort: pb.GatewayHTTPPort, BaseDomain: state.BaseDomain, RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort), + GlobalRQLiteDSN: cm.globalRQLiteDSN, OlricServers: olricServers, + OlricTimeout: 30 * time.Second, IPFSClusterAPIURL: cm.ipfsClusterAPIURL, IPFSAPIURL: cm.ipfsAPIURL, IPFSTimeout: cm.ipfsTimeout, diff --git a/pkg/node/gateway.go b/pkg/node/gateway.go index 9276179..239a23f 100644 --- a/pkg/node/gateway.go +++ b/pkg/node/gateway.go @@ -72,8 +72,9 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { if ormClient := apiGateway.GetORMClient(); ormClient != nil { baseDataDir := filepath.Join(os.ExpandEnv(n.config.Node.DataDir), "..", "data", "namespaces") clusterCfg := namespace.ClusterManagerConfig{ - BaseDomain: n.config.HTTPGateway.BaseDomain, - BaseDataDir: baseDataDir, + BaseDomain: n.config.HTTPGateway.BaseDomain, + BaseDataDir: baseDataDir, + GlobalRQLiteDSN: gwCfg.RQLiteDSN, // Pass global RQLite DSN for namespace gateway auth } clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger) clusterManager.SetLocalNodeID(gwCfg.NodePeerID) diff --git a/pkg/olric/instance_spawner.go b/pkg/olric/instance_spawner.go index 7e321d8..fa02eda 100644 --- a/pkg/olric/instance_spawner.go +++ b/pkg/olric/instance_spawner.go @@ -86,8 +86,9 @@ type InstanceConfig struct { // OlricConfig represents the Olric YAML configuration structure type OlricConfig struct { - Server OlricServerConfig `yaml:"server"` - Memberlist OlricMemberlistConfig `yaml:"memberlist"` + Server OlricServerConfig `yaml:"server"` + Memberlist OlricMemberlistConfig `yaml:"memberlist"` + PartitionCount uint64 `yaml:"partitionCount"` // Number of partitions (default: 256, we use 12 for namespace isolation) } // OlricServerConfig represents the server section of Olric config @@ -269,6 +270,10 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig) BindPort: cfg.MemberlistPort, Peers: cfg.PeerAddresses, }, + // Use 12 partitions for namespace Olric instances (vs 256 default) + // This gives perfect distribution for 2-6 nodes and 20x faster scans + // 12 partitions × 2 (primary+replica) = 24 network calls (~0.6s vs 12s) + PartitionCount: 12, } data, err := yaml.Marshal(olricCfg)