From 0c41158ff9e1c47bc77abda5855fe7dda7bd4830 Mon Sep 17 00:00:00 2001 From: JohnySigma Date: Fri, 20 Feb 2026 18:38:23 +0200 Subject: [PATCH] fix: resolve merge conflicts and add missing TURN/SFU configs --- pkg/config/config.go | 307 ++++----------- pkg/config/gateway_config.go | 51 +++ pkg/gateway/config.go | 12 +- pkg/gateway/context.go | 1 + pkg/gateway/gateway.go | 704 ++++++----------------------------- 5 files changed, 250 insertions(+), 825 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index e678e51..85429aa 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,6 +3,7 @@ package config import ( "time" + "github.com/DeBrosOfficial/network/pkg/config/validate" "github.com/multiformats/go-multiaddr" ) @@ -14,248 +15,84 @@ type Config struct { Security SecurityConfig `yaml:"security"` Logging LoggingConfig `yaml:"logging"` HTTPGateway HTTPGatewayConfig `yaml:"http_gateway"` - TURNServer TURNServerConfig `yaml:"turn_server"` // Built-in TURN server -} - -// NodeConfig contains node-specific configuration -type NodeConfig struct { - ID string `yaml:"id"` // Auto-generated if empty - ListenAddresses []string `yaml:"listen_addresses"` // LibP2P listen addresses - DataDir string `yaml:"data_dir"` // Data directory - MaxConnections int `yaml:"max_connections"` // Maximum peer connections - Domain string `yaml:"domain"` // Domain for this node (e.g., node-1.orama.network) -} - -// DatabaseConfig contains database-related configuration -type DatabaseConfig struct { - DataDir string `yaml:"data_dir"` - ReplicationFactor int `yaml:"replication_factor"` - ShardCount int `yaml:"shard_count"` - MaxDatabaseSize int64 `yaml:"max_database_size"` // In bytes - BackupInterval time.Duration `yaml:"backup_interval"` - - // RQLite-specific configuration - RQLitePort int `yaml:"rqlite_port"` // RQLite HTTP API port - RQLiteRaftPort int `yaml:"rqlite_raft_port"` // RQLite Raft consensus port - RQLiteJoinAddress string `yaml:"rqlite_join_address"` // Address to join RQLite cluster - - // RQLite node-to-node TLS encryption (for inter-node Raft communication) - // See: https://rqlite.io/docs/guides/security/#encrypting-node-to-node-communication - NodeCert string `yaml:"node_cert"` // Path to X.509 certificate for node-to-node communication - NodeKey string `yaml:"node_key"` // Path to X.509 private key for node-to-node communication - NodeCACert string `yaml:"node_ca_cert"` // Path to CA certificate (optional, uses system CA if not set) - NodeNoVerify bool `yaml:"node_no_verify"` // Skip certificate verification (for testing/self-signed certs) - - // Dynamic discovery configuration (always enabled) - ClusterSyncInterval time.Duration `yaml:"cluster_sync_interval"` // default: 30s - PeerInactivityLimit time.Duration `yaml:"peer_inactivity_limit"` // default: 24h - MinClusterSize int `yaml:"min_cluster_size"` // default: 1 - - // Olric cache configuration - OlricHTTPPort int `yaml:"olric_http_port"` // Olric HTTP API port (default: 3320) - OlricMemberlistPort int `yaml:"olric_memberlist_port"` // Olric memberlist port (default: 3322) - - // IPFS storage configuration - IPFS IPFSConfig `yaml:"ipfs"` -} - -// IPFSConfig contains IPFS storage configuration -type IPFSConfig struct { - // ClusterAPIURL is the IPFS Cluster HTTP API URL (e.g., "http://localhost:9094") - // If empty, IPFS storage is disabled for this node - ClusterAPIURL string `yaml:"cluster_api_url"` - - // APIURL is the IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001") - // If empty, defaults to "http://localhost:5001" - APIURL string `yaml:"api_url"` - - // Timeout for IPFS operations - // If zero, defaults to 60 seconds - Timeout time.Duration `yaml:"timeout"` - - // ReplicationFactor is the replication factor for pinned content - // If zero, defaults to 3 - ReplicationFactor int `yaml:"replication_factor"` - - // EnableEncryption enables client-side encryption before upload - // Defaults to true - EnableEncryption bool `yaml:"enable_encryption"` -} - -// DiscoveryConfig contains peer discovery configuration -type DiscoveryConfig struct { - BootstrapPeers []string `yaml:"bootstrap_peers"` // Peer addresses to connect to - DiscoveryInterval time.Duration `yaml:"discovery_interval"` // Discovery announcement interval - BootstrapPort int `yaml:"bootstrap_port"` // Default port for peer discovery - HttpAdvAddress string `yaml:"http_adv_address"` // HTTP advertisement address - RaftAdvAddress string `yaml:"raft_adv_address"` // Raft advertisement - NodeNamespace string `yaml:"node_namespace"` // Namespace for node identifiers -} - -// SecurityConfig contains security-related configuration -type SecurityConfig struct { - EnableTLS bool `yaml:"enable_tls"` - PrivateKeyFile string `yaml:"private_key_file"` - CertificateFile string `yaml:"certificate_file"` -} - -// LoggingConfig contains logging configuration -type LoggingConfig struct { - Level string `yaml:"level"` // debug, info, warn, error - Format string `yaml:"format"` // json, console - OutputFile string `yaml:"output_file"` // Empty for stdout -} - -// HTTPGatewayConfig contains HTTP reverse proxy gateway configuration -type HTTPGatewayConfig struct { - Enabled bool `yaml:"enabled"` // Enable HTTP gateway - ListenAddr string `yaml:"listen_addr"` // Address to listen on (e.g., ":8080") - NodeName string `yaml:"node_name"` // Node name for routing - Routes map[string]RouteConfig `yaml:"routes"` // Service routes - HTTPS HTTPSConfig `yaml:"https"` // HTTPS/TLS configuration - SNI SNIConfig `yaml:"sni"` // SNI-based TCP routing configuration - - // Full gateway configuration (for API, auth, pubsub) - ClientNamespace string `yaml:"client_namespace"` // Namespace for network client - RQLiteDSN string `yaml:"rqlite_dsn"` // RQLite database DSN - OlricServers []string `yaml:"olric_servers"` // List of Olric server addresses - OlricTimeout time.Duration `yaml:"olric_timeout"` // Timeout for Olric operations - IPFSClusterAPIURL string `yaml:"ipfs_cluster_api_url"` // IPFS Cluster API URL - IPFSAPIURL string `yaml:"ipfs_api_url"` // IPFS API URL - IPFSTimeout time.Duration `yaml:"ipfs_timeout"` // Timeout for IPFS operations - - // WebRTC configuration for video/audio calls - TURN *TURNConfig `yaml:"turn"` // TURN/STUN server configuration - SFU *SFUConfig `yaml:"sfu"` // SFU (Selective Forwarding Unit) configuration -} - -// HTTPSConfig contains HTTPS/TLS configuration for the gateway -type HTTPSConfig struct { - Enabled bool `yaml:"enabled"` // Enable HTTPS (port 443) - Domain string `yaml:"domain"` // Primary domain (e.g., node-123.orama.network) - AutoCert bool `yaml:"auto_cert"` // Use Let's Encrypt for automatic certificate - UseSelfSigned bool `yaml:"use_self_signed"` // Use self-signed certificates (pre-generated) - CertFile string `yaml:"cert_file"` // Path to certificate file (if not using auto_cert) - KeyFile string `yaml:"key_file"` // Path to key file (if not using auto_cert) - CacheDir string `yaml:"cache_dir"` // Directory for Let's Encrypt certificate cache - HTTPPort int `yaml:"http_port"` // HTTP port for ACME challenge (default: 80) - HTTPSPort int `yaml:"https_port"` // HTTPS port (default: 443) - Email string `yaml:"email"` // Email for Let's Encrypt account -} - -// SNIConfig contains SNI-based TCP routing configuration for port 7001 -type SNIConfig struct { - Enabled bool `yaml:"enabled"` // Enable SNI-based TCP routing - ListenAddr string `yaml:"listen_addr"` // Address to listen on (e.g., ":7001") - Routes map[string]string `yaml:"routes"` // SNI hostname -> backend address mapping - CertFile string `yaml:"cert_file"` // Path to certificate file - KeyFile string `yaml:"key_file"` // Path to key file -} - -// RouteConfig defines a single reverse proxy route -type RouteConfig struct { - PathPrefix string `yaml:"path_prefix"` // URL path prefix (e.g., "/rqlite/http") - BackendURL string `yaml:"backend_url"` // Backend service URL - Timeout time.Duration `yaml:"timeout"` // Request timeout - WebSocket bool `yaml:"websocket"` // Support WebSocket upgrades -} - -// ClientConfig represents configuration for network clients -type ClientConfig struct { - AppName string `yaml:"app_name"` - DatabaseName string `yaml:"database_name"` - BootstrapPeers []string `yaml:"bootstrap_peers"` - ConnectTimeout time.Duration `yaml:"connect_timeout"` - RetryAttempts int `yaml:"retry_attempts"` -} - -// TURNConfig contains TURN/STUN server credential configuration -type TURNConfig struct { - // SharedSecret is the shared secret for TURN credential generation (HMAC-SHA1) - // Should be set via TURN_SHARED_SECRET environment variable - SharedSecret string `yaml:"shared_secret"` - - // TTL is the time-to-live for generated credentials - // Default: 24 hours - TTL time.Duration `yaml:"ttl"` - - // ExternalHost is the external hostname or IP address for STUN/TURN URLs - // - Production: Set to your public domain (e.g., "turn.example.com") - // - Development: Leave empty for auto-detection of LAN IP - // Can also be set via TURN_EXTERNAL_HOST environment variable - ExternalHost string `yaml:"external_host"` - - // STUNURLs are the STUN server URLs to return to clients - // Use "::" as placeholder for ExternalHost (e.g., "stun:::3478" -> "stun:turn.example.com:3478") - // e.g., ["stun:::3478"] or ["stun:gateway.orama.com:3478"] - STUNURLs []string `yaml:"stun_urls"` - - // TURNURLs are the TURN server URLs to return to clients - // Use "::" as placeholder for ExternalHost (e.g., "turn:::3478" -> "turn:turn.example.com:3478") - // e.g., ["turn:::3478?transport=udp"] or ["turn:gateway.orama.com:3478?transport=udp"] - TURNURLs []string `yaml:"turn_urls"` - - // TLSEnabled indicates whether TURNS (TURN over TLS) is available - // When true, turns:// URLs will be included in the response - TLSEnabled bool `yaml:"tls_enabled"` -} - -// SFUConfig contains WebRTC SFU (Selective Forwarding Unit) configuration -type SFUConfig struct { - // Enabled enables the SFU service - Enabled bool `yaml:"enabled"` - - // MaxParticipants is the maximum number of participants per room - // Default: 10 - MaxParticipants int `yaml:"max_participants"` - - // MediaTimeout is the timeout for media operations - // Default: 30 seconds - MediaTimeout time.Duration `yaml:"media_timeout"` - - // ICEServers are additional ICE servers for WebRTC connections - // These are used in addition to the TURN servers from TURNConfig - ICEServers []ICEServerConfig `yaml:"ice_servers"` -} - -// ICEServerConfig represents a single ICE server configuration -type ICEServerConfig struct { - URLs []string `yaml:"urls"` - Username string `yaml:"username,omitempty"` - Credential string `yaml:"credential,omitempty"` + TURNServer TURNServerConfig `yaml:"turn_server"` } // TURNServerConfig contains built-in TURN server configuration type TURNServerConfig struct { - // Enabled enables the built-in TURN server - Enabled bool `yaml:"enabled"` - - // ListenAddr is the UDP address to listen on (e.g., "0.0.0.0:3478") - ListenAddr string `yaml:"listen_addr"` - - // PublicIP is the public IP address to advertise for relay - // If empty, will try to auto-detect - PublicIP string `yaml:"public_ip"` - - // Realm is the TURN realm (e.g., "orama.network") - Realm string `yaml:"realm"` - - // MinPort and MaxPort define the relay port range - MinPort uint16 `yaml:"min_port"` - MaxPort uint16 `yaml:"max_port"` - - // TLS Configuration for TURNS (TURN over TLS) - // TLSEnabled enables TURNS listener - TLSEnabled bool `yaml:"tls_enabled"` - - // TLSListenAddr is the TCP/TLS address to listen on (e.g., "0.0.0.0:443") + Enabled bool `yaml:"enabled"` + ListenAddr string `yaml:"listen_addr"` + PublicIP string `yaml:"public_ip"` + Realm string `yaml:"realm"` + MinPort uint16 `yaml:"min_port"` + MaxPort uint16 `yaml:"max_port"` + TLSEnabled bool `yaml:"tls_enabled"` TLSListenAddr string `yaml:"tls_listen_addr"` + TLSCertFile string `yaml:"tls_cert_file"` + TLSKeyFile string `yaml:"tls_key_file"` +} - // TLSCertFile is the path to the TLS certificate file - TLSCertFile string `yaml:"tls_cert_file"` +// ValidationError represents a single validation error with context. +// This is exported from the validate subpackage for backward compatibility. +type ValidationError = validate.ValidationError - // TLSKeyFile is the path to the TLS private key file - TLSKeyFile string `yaml:"tls_key_file"` +// ValidateSwarmKey validates that a swarm key is 64 hex characters. +// This is exported from the validate subpackage for backward compatibility. +func ValidateSwarmKey(key string) error { + return validate.ValidateSwarmKey(key) +} + +// Validate performs comprehensive validation of the entire config. +// It aggregates all errors and returns them, allowing the caller to print all issues at once. +func (c *Config) Validate() []error { + var errs []error + + // Validate node config + errs = append(errs, validate.ValidateNode(validate.NodeConfig{ + ID: c.Node.ID, + ListenAddresses: c.Node.ListenAddresses, + DataDir: c.Node.DataDir, + MaxConnections: c.Node.MaxConnections, + })...) + + // Validate database config + errs = append(errs, validate.ValidateDatabase(validate.DatabaseConfig{ + DataDir: c.Database.DataDir, + ReplicationFactor: c.Database.ReplicationFactor, + ShardCount: c.Database.ShardCount, + MaxDatabaseSize: c.Database.MaxDatabaseSize, + RQLitePort: c.Database.RQLitePort, + RQLiteRaftPort: c.Database.RQLiteRaftPort, + RQLiteJoinAddress: c.Database.RQLiteJoinAddress, + ClusterSyncInterval: c.Database.ClusterSyncInterval, + PeerInactivityLimit: c.Database.PeerInactivityLimit, + MinClusterSize: c.Database.MinClusterSize, + })...) + + // Validate discovery config + errs = append(errs, validate.ValidateDiscovery(validate.DiscoveryConfig{ + BootstrapPeers: c.Discovery.BootstrapPeers, + DiscoveryInterval: c.Discovery.DiscoveryInterval, + BootstrapPort: c.Discovery.BootstrapPort, + HttpAdvAddress: c.Discovery.HttpAdvAddress, + RaftAdvAddress: c.Discovery.RaftAdvAddress, + })...) + + // Validate security config + errs = append(errs, validate.ValidateSecurity(validate.SecurityConfig{ + EnableTLS: c.Security.EnableTLS, + PrivateKeyFile: c.Security.PrivateKeyFile, + CertificateFile: c.Security.CertificateFile, + })...) + + // Validate logging config + errs = append(errs, validate.ValidateLogging(validate.LoggingConfig{ + Level: c.Logging.Level, + Format: c.Logging.Format, + OutputFile: c.Logging.OutputFile, + })...) + + return errs } // ParseMultiaddrs converts string addresses to multiaddr objects diff --git a/pkg/config/gateway_config.go b/pkg/config/gateway_config.go index 38b4614..af1efd1 100644 --- a/pkg/config/gateway_config.go +++ b/pkg/config/gateway_config.go @@ -19,6 +19,10 @@ type HTTPGatewayConfig struct { IPFSClusterAPIURL string `yaml:"ipfs_cluster_api_url"` // IPFS Cluster API URL IPFSAPIURL string `yaml:"ipfs_api_url"` // IPFS API URL IPFSTimeout time.Duration `yaml:"ipfs_timeout"` // Timeout for IPFS operations + + // WebRTC configuration + TURN *TURNConfig `yaml:"turn"` // TURN/STUN server configuration + SFU *SFUConfig `yaml:"sfu"` // SFU (Selective Forwarding Unit) configuration } // HTTPSConfig contains HTTPS/TLS configuration for the gateway @@ -60,3 +64,50 @@ type ClientConfig struct { ConnectTimeout time.Duration `yaml:"connect_timeout"` RetryAttempts int `yaml:"retry_attempts"` } + +// TURNConfig contains TURN/STUN server credential configuration +type TURNConfig struct { + // SharedSecret is the shared secret for TURN credential generation (HMAC-SHA1) + SharedSecret string `yaml:"shared_secret"` + + // TTL is the time-to-live for generated credentials (default: 24 hours) + TTL time.Duration `yaml:"ttl"` + + // ExternalHost is the external hostname or IP address for STUN/TURN URLs + // Production: Set to your public domain (e.g., "turn.example.com") + // Development: Leave empty for auto-detection of LAN IP + ExternalHost string `yaml:"external_host"` + + // STUNURLs are the STUN server URLs to return to clients + // Use ":::" as placeholder for ExternalHost (e.g., "stun:::3478" -> "stun:host:3478") + STUNURLs []string `yaml:"stun_urls"` + + // TURNURLs are the TURN server URLs to return to clients + // Use ":::" as placeholder for ExternalHost (e.g., "turn:::3478" -> "turn:host:3478") + TURNURLs []string `yaml:"turn_urls"` + + // TLSEnabled indicates whether TURNS (TURN over TLS) is available + TLSEnabled bool `yaml:"tls_enabled"` +} + +// SFUConfig contains WebRTC SFU (Selective Forwarding Unit) configuration +type SFUConfig struct { + // Enabled enables the SFU service + Enabled bool `yaml:"enabled"` + + // MaxParticipants is the maximum number of participants per room (default: 10) + MaxParticipants int `yaml:"max_participants"` + + // MediaTimeout is the timeout for media operations (default: 30 seconds) + MediaTimeout time.Duration `yaml:"media_timeout"` + + // ICEServers are additional ICE servers for WebRTC connections + ICEServers []ICEServerConfig `yaml:"ice_servers"` +} + +// ICEServerConfig represents a single ICE server configuration +type ICEServerConfig struct { + URLs []string `yaml:"urls"` + Username string `yaml:"username,omitempty"` + Credential string `yaml:"credential,omitempty"` +} diff --git a/pkg/gateway/config.go b/pkg/gateway/config.go index b983932..2092398 100644 --- a/pkg/gateway/config.go +++ b/pkg/gateway/config.go @@ -1,6 +1,10 @@ package gateway -import "time" +import ( + "time" + + "github.com/DeBrosOfficial/network/pkg/config" +) // Config holds configuration for the gateway server type Config struct { @@ -28,4 +32,10 @@ type Config struct { IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) IPFSReplicationFactor int // Replication factor for pins (default: 3) IPFSEnableEncryption bool // Enable client-side encryption before upload (default: true, discovered from node configs) + + // TURN/STUN configuration for WebRTC + TURN *config.TURNConfig + + // SFU configuration for WebRTC group calls + SFU *config.SFUConfig } diff --git a/pkg/gateway/context.go b/pkg/gateway/context.go index be0461e..e05a859 100644 --- a/pkg/gateway/context.go +++ b/pkg/gateway/context.go @@ -12,6 +12,7 @@ const ( ctxKeyAPIKey = ctxkeys.APIKey ctxKeyJWT = ctxkeys.JWT CtxKeyNamespaceOverride = ctxkeys.NamespaceOverride + ctxKeyNamespaceOverride = ctxkeys.NamespaceOverride // alias for internal use ) // withInternalAuth creates a context for internal gateway operations that bypass authentication. diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 41a3e9c..fb57948 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -1,75 +1,32 @@ +// Package gateway provides the main API Gateway for the Orama Network. +// It orchestrates traffic between clients and various backend services including +// distributed caching (Olric), decentralized storage (IPFS), and serverless +// WebAssembly (WASM) execution. The gateway implements robust security through +// wallet-based cryptographic authentication and JWT lifecycle management. package gateway import ( "context" - "crypto/rand" - "crypto/rsa" - "crypto/x509" "database/sql" - "encoding/pem" - "fmt" - "net" - "os" - "path/filepath" - "strings" "sync" "time" "github.com/DeBrosOfficial/network/pkg/client" - "github.com/DeBrosOfficial/network/pkg/config" "github.com/DeBrosOfficial/network/pkg/gateway/auth" + authhandlers "github.com/DeBrosOfficial/network/pkg/gateway/handlers/auth" + "github.com/DeBrosOfficial/network/pkg/gateway/handlers/cache" + pubsubhandlers "github.com/DeBrosOfficial/network/pkg/gateway/handlers/pubsub" + serverlesshandlers "github.com/DeBrosOfficial/network/pkg/gateway/handlers/serverless" + "github.com/DeBrosOfficial/network/pkg/gateway/handlers/storage" + "github.com/DeBrosOfficial/network/pkg/gateway/sfu" "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/olric" - "github.com/DeBrosOfficial/network/pkg/pubsub" "github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/DeBrosOfficial/network/pkg/serverless" - "github.com/multiformats/go-multiaddr" - olriclib "github.com/olric-data/olric" "go.uber.org/zap" - - _ "github.com/rqlite/gorqlite/stdlib" ) -const ( - olricInitMaxAttempts = 5 - olricInitInitialBackoff = 500 * time.Millisecond - olricInitMaxBackoff = 5 * time.Second -) - -// Config holds configuration for the gateway server -type Config struct { - ListenAddr string - ClientNamespace string - BootstrapPeers []string - NodePeerID string // The node's actual peer ID from its identity file - - // Optional DSN for rqlite database/sql driver, e.g. "http://localhost:4001" - // If empty, defaults to "http://localhost:4001". - RQLiteDSN string - - // HTTPS configuration - EnableHTTPS bool // Enable HTTPS with ACME (Let's Encrypt) - DomainName string // Domain name for HTTPS certificate - TLSCacheDir string // Directory to cache TLS certificates (default: ~/.orama/tls-cache) - - // Olric cache configuration - OlricServers []string // List of Olric server addresses (e.g., ["localhost:3320"]). If empty, defaults to ["localhost:3320"] - OlricTimeout time.Duration // Timeout for Olric operations (default: 10s) - - // IPFS Cluster configuration - IPFSClusterAPIURL string // IPFS Cluster HTTP API URL (e.g., "http://localhost:9094"). If empty, gateway will discover from node configs - IPFSAPIURL string // IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001"). If empty, gateway will discover from node configs - IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) - IPFSReplicationFactor int // Replication factor for pins (default: 3) - IPFSEnableEncryption bool // Enable client-side encryption before upload (default: true, discovered from node configs) - - // TURN/STUN configuration for WebRTC - TURN *config.TURNConfig - - // SFU configuration for WebRTC group calls - SFU *config.SFUConfig -} type Gateway struct { logger *logging.ColoredLogger @@ -86,28 +43,32 @@ type Gateway struct { // Olric cache client olricClient *olric.Client olricMu sync.RWMutex + cacheHandlers *cache.CacheHandlers // IPFS storage client - ipfsClient ipfs.IPFSClient + ipfsClient ipfs.IPFSClient + storageHandlers *storage.Handlers // Local pub/sub bypass for same-gateway subscribers localSubscribers map[string][]*localSubscriber // topic+namespace -> subscribers presenceMembers map[string][]PresenceMember // topicKey -> members mu sync.RWMutex presenceMu sync.RWMutex + pubsubHandlers *pubsubhandlers.PubSubHandlers // Serverless function engine serverlessEngine *serverless.Engine serverlessRegistry *serverless.Registry serverlessInvoker *serverless.Invoker serverlessWSMgr *serverless.WSManager - serverlessHandlers *ServerlessHandlers + serverlessHandlers *serverlesshandlers.ServerlessHandlers // Authentication service - authService *auth.Service + authService *auth.Service + authHandlers *authhandlers.Handlers // SFU manager for WebRTC group calls - sfuManager *SFUManager + sfuManager *sfu.RoomManager } // localSubscriber represents a WebSocket subscriber for local message delivery @@ -124,359 +85,113 @@ type PresenceMember struct { ConnID string `json:"-"` // Internal: for tracking which connection } -// New creates and initializes a new Gateway instance -func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { - logger.ComponentInfo(logging.ComponentGeneral, "Building client config...") +// authClientAdapter adapts client.NetworkClient to authhandlers.NetworkClient +type authClientAdapter struct { + client client.NetworkClient +} - // Build client config from gateway cfg - cliCfg := client.DefaultClientConfig(cfg.ClientNamespace) - if len(cfg.BootstrapPeers) > 0 { - cliCfg.BootstrapPeers = cfg.BootstrapPeers - } +func (a *authClientAdapter) Database() authhandlers.DatabaseClient { + return &authDatabaseAdapter{db: a.client.Database()} +} - logger.ComponentInfo(logging.ComponentGeneral, "Creating network client...") - c, err := client.NewClient(cliCfg) +// authDatabaseAdapter adapts client.DatabaseClient to authhandlers.DatabaseClient +type authDatabaseAdapter struct { + db client.DatabaseClient +} + +func (a *authDatabaseAdapter) Query(ctx context.Context, sql string, args ...interface{}) (*authhandlers.QueryResult, error) { + result, err := a.db.Query(ctx, sql, args...) if err != nil { - logger.ComponentError(logging.ComponentClient, "failed to create network client", zap.Error(err)) return nil, err } + // Convert client.QueryResult to authhandlers.QueryResult + // The auth handlers expect []interface{} but client returns [][]interface{} + convertedRows := make([]interface{}, len(result.Rows)) + for i, row := range result.Rows { + convertedRows[i] = row + } + return &authhandlers.QueryResult{ + Count: int(result.Count), + Rows: convertedRows, + }, nil +} - logger.ComponentInfo(logging.ComponentGeneral, "Connecting network client...") - if err := c.Connect(); err != nil { - logger.ComponentError(logging.ComponentClient, "failed to connect network client", zap.Error(err)) +// New creates and initializes a new Gateway instance. +// It establishes all necessary service connections and dependencies. +func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { + logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway dependencies...") + + // Initialize all dependencies (network client, database, cache, storage, serverless) + deps, err := NewDependencies(logger, cfg) + if err != nil { + logger.ComponentError(logging.ComponentGeneral, "failed to create dependencies", zap.Error(err)) return nil, err } - logger.ComponentInfo(logging.ComponentClient, "Network client connected", - zap.String("namespace", cliCfg.AppName), - zap.Int("peer_count", len(cliCfg.BootstrapPeers)), - ) - logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway instance...") gw := &Gateway{ - logger: logger, - cfg: cfg, - client: c, - nodePeerID: cfg.NodePeerID, - startedAt: time.Now(), - localSubscribers: make(map[string][]*localSubscriber), - presenceMembers: make(map[string][]PresenceMember), + logger: logger, + cfg: cfg, + client: deps.Client, + nodePeerID: cfg.NodePeerID, + startedAt: time.Now(), + sqlDB: deps.SQLDB, + ormClient: deps.ORMClient, + ormHTTP: deps.ORMHTTP, + olricClient: deps.OlricClient, + ipfsClient: deps.IPFSClient, + serverlessEngine: deps.ServerlessEngine, + serverlessRegistry: deps.ServerlessRegistry, + serverlessInvoker: deps.ServerlessInvoker, + serverlessWSMgr: deps.ServerlessWSMgr, + serverlessHandlers: deps.ServerlessHandlers, + authService: deps.AuthService, + localSubscribers: make(map[string][]*localSubscriber), + presenceMembers: make(map[string][]PresenceMember), } - logger.ComponentInfo(logging.ComponentGeneral, "Initializing RQLite ORM HTTP gateway...") - dsn := cfg.RQLiteDSN - if dsn == "" { - dsn = "http://localhost:5001" - } - db, dbErr := sql.Open("rqlite", dsn) - if dbErr != nil { - logger.ComponentWarn(logging.ComponentGeneral, "failed to open rqlite sql db; http orm gateway disabled", zap.Error(dbErr)) - } else { - // Configure connection pool with proper timeouts and limits - db.SetMaxOpenConns(25) // Maximum number of open connections - db.SetMaxIdleConns(5) // Maximum number of idle connections - db.SetConnMaxLifetime(5 * time.Minute) // Maximum lifetime of a connection - db.SetConnMaxIdleTime(2 * time.Minute) // Maximum idle time before closing + // Initialize handler instances + gw.pubsubHandlers = pubsubhandlers.NewPubSubHandlers(deps.Client, logger) - gw.sqlDB = db - orm := rqlite.NewClient(db) - gw.ormClient = orm - gw.ormHTTP = rqlite.NewHTTPGateway(orm, "/v1/db") - // Set a reasonable timeout for HTTP requests (30 seconds) - gw.ormHTTP.Timeout = 30 * time.Second - logger.ComponentInfo(logging.ComponentGeneral, "RQLite ORM HTTP gateway ready", - zap.String("dsn", dsn), - zap.String("base_path", "/v1/db"), - zap.Duration("timeout", gw.ormHTTP.Timeout), + if deps.OlricClient != nil { + gw.cacheHandlers = cache.NewCacheHandlers(logger, deps.OlricClient) + } + + if deps.IPFSClient != nil { + gw.storageHandlers = storage.New(deps.IPFSClient, logger, storage.Config{ + IPFSReplicationFactor: cfg.IPFSReplicationFactor, + IPFSAPIURL: cfg.IPFSAPIURL, + }) + } + + if deps.AuthService != nil { + // Create adapter for auth handlers to use the client + authClientAdapter := &authClientAdapter{client: deps.Client} + gw.authHandlers = authhandlers.NewHandlers( + logger, + deps.AuthService, + authClientAdapter, + cfg.ClientNamespace, + gw.withInternalAuth, ) } - logger.ComponentInfo(logging.ComponentGeneral, "Initializing Olric cache client...") - - // Discover Olric servers dynamically from LibP2P peers if not explicitly configured - olricServers := cfg.OlricServers - if len(olricServers) == 0 { - logger.ComponentInfo(logging.ComponentGeneral, "Olric servers not configured, discovering from LibP2P peers...") - discovered := discoverOlricServers(c, logger.Logger) - if len(discovered) > 0 { - olricServers = discovered - logger.ComponentInfo(logging.ComponentGeneral, "Discovered Olric servers from LibP2P peers", - zap.Strings("servers", olricServers)) - } else { - // Fallback to localhost for local development - olricServers = []string{"localhost:3320"} - logger.ComponentInfo(logging.ComponentGeneral, "No Olric servers discovered, using localhost fallback") + // Start background Olric reconnection if initial connection failed + if deps.OlricClient == nil { + olricCfg := olric.Config{ + Servers: cfg.OlricServers, + Timeout: cfg.OlricTimeout, + } + if len(olricCfg.Servers) == 0 { + olricCfg.Servers = []string{"localhost:3320"} } - } else { - logger.ComponentInfo(logging.ComponentGeneral, "Using explicitly configured Olric servers", - zap.Strings("servers", olricServers)) - } - - olricCfg := olric.Config{ - Servers: olricServers, - Timeout: cfg.OlricTimeout, - } - olricClient, olricErr := initializeOlricClientWithRetry(olricCfg, logger) - if olricErr != nil { - logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize Olric cache client; cache endpoints disabled", zap.Error(olricErr)) gw.startOlricReconnectLoop(olricCfg) - } else { - gw.setOlricClient(olricClient) - logger.ComponentInfo(logging.ComponentGeneral, "Olric cache client ready", - zap.Strings("servers", olricCfg.Servers), - zap.Duration("timeout", olricCfg.Timeout), - ) } - logger.ComponentInfo(logging.ComponentGeneral, "Initializing IPFS Cluster client...") - - // Discover IPFS endpoints from node configs if not explicitly configured - ipfsClusterURL := cfg.IPFSClusterAPIURL - ipfsAPIURL := cfg.IPFSAPIURL - ipfsTimeout := cfg.IPFSTimeout - ipfsReplicationFactor := cfg.IPFSReplicationFactor - ipfsEnableEncryption := cfg.IPFSEnableEncryption - - if ipfsClusterURL == "" { - logger.ComponentInfo(logging.ComponentGeneral, "IPFS Cluster URL not configured, discovering from node configs...") - discovered := discoverIPFSFromNodeConfigs(logger.Logger) - if discovered.clusterURL != "" { - ipfsClusterURL = discovered.clusterURL - ipfsAPIURL = discovered.apiURL - if discovered.timeout > 0 { - ipfsTimeout = discovered.timeout - } - if discovered.replicationFactor > 0 { - ipfsReplicationFactor = discovered.replicationFactor - } - ipfsEnableEncryption = discovered.enableEncryption - logger.ComponentInfo(logging.ComponentGeneral, "Discovered IPFS endpoints from node configs", - zap.String("cluster_url", ipfsClusterURL), - zap.String("api_url", ipfsAPIURL), - zap.Bool("encryption_enabled", ipfsEnableEncryption)) - } else { - // Fallback to localhost defaults - ipfsClusterURL = "http://localhost:9094" - ipfsAPIURL = "http://localhost:5001" - ipfsEnableEncryption = true // Default to true - logger.ComponentInfo(logging.ComponentGeneral, "No IPFS config found in node configs, using localhost defaults") - } - } - - if ipfsAPIURL == "" { - ipfsAPIURL = "http://localhost:5001" - } - if ipfsTimeout == 0 { - ipfsTimeout = 60 * time.Second - } - if ipfsReplicationFactor == 0 { - ipfsReplicationFactor = 3 - } - if !cfg.IPFSEnableEncryption && !ipfsEnableEncryption { - // Only disable if explicitly set to false in both places - ipfsEnableEncryption = false - } else { - // Default to true if not explicitly disabled - ipfsEnableEncryption = true - } - - ipfsCfg := ipfs.Config{ - ClusterAPIURL: ipfsClusterURL, - Timeout: ipfsTimeout, - } - ipfsClient, ipfsErr := ipfs.NewClient(ipfsCfg, logger.Logger) - if ipfsErr != nil { - logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize IPFS Cluster client; storage endpoints disabled", zap.Error(ipfsErr)) - } else { - gw.ipfsClient = ipfsClient - - // Check peer count and warn if insufficient (use background context to avoid blocking) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if peerCount, err := ipfsClient.GetPeerCount(ctx); err == nil { - if peerCount < ipfsReplicationFactor { - logger.ComponentWarn(logging.ComponentGeneral, "insufficient cluster peers for replication factor", - zap.Int("peer_count", peerCount), - zap.Int("replication_factor", ipfsReplicationFactor), - zap.String("message", "Some pin operations may fail until more peers join the cluster")) - } else { - logger.ComponentInfo(logging.ComponentGeneral, "IPFS Cluster peer count sufficient", - zap.Int("peer_count", peerCount), - zap.Int("replication_factor", ipfsReplicationFactor)) - } - } else { - logger.ComponentWarn(logging.ComponentGeneral, "failed to get cluster peer count", zap.Error(err)) - } - - logger.ComponentInfo(logging.ComponentGeneral, "IPFS Cluster client ready", - zap.String("cluster_api_url", ipfsCfg.ClusterAPIURL), - zap.String("ipfs_api_url", ipfsAPIURL), - zap.Duration("timeout", ipfsCfg.Timeout), - zap.Int("replication_factor", ipfsReplicationFactor), - zap.Bool("encryption_enabled", ipfsEnableEncryption), - ) - } - // Store IPFS settings in gateway for use by handlers - gw.cfg.IPFSAPIURL = ipfsAPIURL - gw.cfg.IPFSReplicationFactor = ipfsReplicationFactor - gw.cfg.IPFSEnableEncryption = ipfsEnableEncryption - - // Initialize serverless function engine - logger.ComponentInfo(logging.ComponentGeneral, "Initializing serverless function engine...") - if gw.ormClient != nil && gw.ipfsClient != nil { - // Create serverless registry (stores functions in RQLite + IPFS) - registryCfg := serverless.RegistryConfig{ - IPFSAPIURL: ipfsAPIURL, - } - registry := serverless.NewRegistry(gw.ormClient, gw.ipfsClient, registryCfg, logger.Logger) - gw.serverlessRegistry = registry - - // Create WebSocket manager for function streaming - gw.serverlessWSMgr = serverless.NewWSManager(logger.Logger) - - // Get underlying Olric client if available - var olricClient olriclib.Client - if oc := gw.getOlricClient(); oc != nil { - olricClient = oc.UnderlyingClient() - } - - // Create host functions provider (allows functions to call Orama services) - // Get pubsub adapter from client for serverless functions - var pubsubAdapter *pubsub.ClientAdapter - if gw.client != nil { - if concreteClient, ok := gw.client.(*client.Client); ok { - pubsubAdapter = concreteClient.PubSubAdapter() - if pubsubAdapter != nil { - logger.ComponentInfo(logging.ComponentGeneral, "pubsub adapter available for serverless functions") - } else { - logger.ComponentWarn(logging.ComponentGeneral, "pubsub adapter is nil - serverless pubsub will be unavailable") - } - } - } - - hostFuncsCfg := serverless.HostFunctionsConfig{ - IPFSAPIURL: ipfsAPIURL, - HTTPTimeout: 30 * time.Second, - } - hostFuncs := serverless.NewHostFunctions( - gw.ormClient, - olricClient, - gw.ipfsClient, - pubsubAdapter, // pubsub adapter for serverless functions - gw.serverlessWSMgr, - nil, // secrets manager - TODO: implement - hostFuncsCfg, - logger.Logger, - ) - - // Create WASM engine configuration - engineCfg := serverless.DefaultConfig() - engineCfg.DefaultMemoryLimitMB = 128 - engineCfg.MaxMemoryLimitMB = 256 - engineCfg.DefaultTimeoutSeconds = 30 - engineCfg.MaxTimeoutSeconds = 60 - engineCfg.ModuleCacheSize = 100 - - // Create WASM engine - engine, engineErr := serverless.NewEngine(engineCfg, registry, hostFuncs, logger.Logger, serverless.WithInvocationLogger(registry)) - if engineErr != nil { - logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize serverless engine; functions disabled", zap.Error(engineErr)) - } else { - gw.serverlessEngine = engine - - // Create invoker - gw.serverlessInvoker = serverless.NewInvoker(engine, registry, hostFuncs, logger.Logger) - - // Create trigger manager - triggerManager := serverless.NewDBTriggerManager(gw.ormClient, logger.Logger) - - // Create HTTP handlers - gw.serverlessHandlers = NewServerlessHandlers( - gw.serverlessInvoker, - registry, - gw.serverlessWSMgr, - triggerManager, - logger.Logger, - ) - - // Initialize auth service - // For now using ephemeral key, can be loaded from config later - key, _ := rsa.GenerateKey(rand.Reader, 2048) - keyPEM := pem.EncodeToMemory(&pem.Block{ - Type: "RSA PRIVATE KEY", - Bytes: x509.MarshalPKCS1PrivateKey(key), - }) - authService, err := auth.NewService(logger, c, string(keyPEM), cfg.ClientNamespace) - if err != nil { - logger.ComponentError(logging.ComponentGeneral, "failed to initialize auth service", zap.Error(err)) - } else { - gw.authService = authService - } - - logger.ComponentInfo(logging.ComponentGeneral, "Serverless function engine ready", - zap.Int("default_memory_mb", engineCfg.DefaultMemoryLimitMB), - zap.Int("default_timeout_sec", engineCfg.DefaultTimeoutSeconds), - zap.Int("module_cache_size", engineCfg.ModuleCacheSize), - ) - } - } else { - logger.ComponentWarn(logging.ComponentGeneral, "serverless engine requires RQLite and IPFS; functions disabled") - } - - // Initialize SFU manager for WebRTC calls - if err := gw.initializeSFUManager(); err != nil { - logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize SFU manager; WebRTC calls disabled", zap.Error(err)) - } - - logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...") + logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed") return gw, nil } -// withInternalAuth creates a context for internal gateway operations that bypass authentication -func (g *Gateway) withInternalAuth(ctx context.Context) context.Context { - return client.WithInternalAuth(ctx) -} - -// Close disconnects the gateway client -func (g *Gateway) Close() { - // Close SFU manager first - if g.sfuManager != nil { - if err := g.sfuManager.Close(); err != nil { - g.logger.ComponentWarn(logging.ComponentGeneral, "error during SFU manager close", zap.Error(err)) - } - } - // Close serverless engine - if g.serverlessEngine != nil { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if err := g.serverlessEngine.Close(ctx); err != nil { - g.logger.ComponentWarn(logging.ComponentGeneral, "error during serverless engine close", zap.Error(err)) - } - cancel() - } - if g.client != nil { - if err := g.client.Disconnect(); err != nil { - g.logger.ComponentWarn(logging.ComponentClient, "error during client disconnect", zap.Error(err)) - } - } - if g.sqlDB != nil { - _ = g.sqlDB.Close() - } - if client := g.getOlricClient(); client != nil { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := client.Close(ctx); err != nil { - g.logger.ComponentWarn(logging.ComponentGeneral, "error during Olric client close", zap.Error(err)) - } - } - if g.ipfsClient != nil { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := g.ipfsClient.Close(ctx); err != nil { - g.logger.ComponentWarn(logging.ComponentGeneral, "error during IPFS client close", zap.Error(err)) - } - } -} - // getLocalSubscribers returns all local subscribers for a given topic and namespace func (g *Gateway) getLocalSubscribers(topic, namespace string) []*localSubscriber { topicKey := namespace + "." + topic @@ -486,23 +201,32 @@ func (g *Gateway) getLocalSubscribers(topic, namespace string) []*localSubscribe return nil } +// setOlricClient atomically sets the Olric client and reinitializes cache handlers. func (g *Gateway) setOlricClient(client *olric.Client) { g.olricMu.Lock() defer g.olricMu.Unlock() g.olricClient = client + if client != nil { + g.cacheHandlers = cache.NewCacheHandlers(g.logger, client) + } } +// getOlricClient atomically retrieves the current Olric client. func (g *Gateway) getOlricClient() *olric.Client { g.olricMu.RLock() defer g.olricMu.RUnlock() return g.olricClient } +// startOlricReconnectLoop starts a background goroutine that continuously attempts +// to reconnect to the Olric cluster with exponential backoff. func (g *Gateway) startOlricReconnectLoop(cfg olric.Config) { go func() { retryDelay := 5 * time.Second + maxBackoff := 30 * time.Second + for { - client, err := initializeOlricClientWithRetry(cfg, g.logger) + client, err := olric.NewClient(cfg, g.logger.Logger) if err == nil { g.setOlricClient(client) g.logger.ComponentInfo(logging.ComponentGeneral, "Olric cache client connected after background retries", @@ -516,211 +240,13 @@ func (g *Gateway) startOlricReconnectLoop(cfg olric.Config) { zap.Error(err)) time.Sleep(retryDelay) - if retryDelay < olricInitMaxBackoff { + if retryDelay < maxBackoff { retryDelay *= 2 - if retryDelay > olricInitMaxBackoff { - retryDelay = olricInitMaxBackoff + if retryDelay > maxBackoff { + retryDelay = maxBackoff } } } }() } -func initializeOlricClientWithRetry(cfg olric.Config, logger *logging.ColoredLogger) (*olric.Client, error) { - backoff := olricInitInitialBackoff - - for attempt := 1; attempt <= olricInitMaxAttempts; attempt++ { - client, err := olric.NewClient(cfg, logger.Logger) - if err == nil { - if attempt > 1 { - logger.ComponentInfo(logging.ComponentGeneral, "Olric cache client initialized after retries", - zap.Int("attempts", attempt)) - } - return client, nil - } - - logger.ComponentWarn(logging.ComponentGeneral, "Olric cache client init attempt failed", - zap.Int("attempt", attempt), - zap.Duration("retry_in", backoff), - zap.Error(err)) - - if attempt == olricInitMaxAttempts { - return nil, fmt.Errorf("failed to initialize Olric cache client after %d attempts: %w", attempt, err) - } - - time.Sleep(backoff) - backoff *= 2 - if backoff > olricInitMaxBackoff { - backoff = olricInitMaxBackoff - } - } - - return nil, fmt.Errorf("failed to initialize Olric cache client") -} - -// discoverOlricServers discovers Olric server addresses from LibP2P peers -// Returns a list of IP:port addresses where Olric servers are expected to run (port 3320) -func discoverOlricServers(networkClient client.NetworkClient, logger *zap.Logger) []string { - // Get network info to access peer information - networkInfo := networkClient.Network() - if networkInfo == nil { - logger.Debug("Network info not available for Olric discovery") - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - peers, err := networkInfo.GetPeers(ctx) - if err != nil { - logger.Debug("Failed to get peers for Olric discovery", zap.Error(err)) - return nil - } - - olricServers := make([]string, 0) - seen := make(map[string]bool) - - for _, peer := range peers { - for _, addrStr := range peer.Addresses { - // Parse multiaddr - ma, err := multiaddr.NewMultiaddr(addrStr) - if err != nil { - continue - } - - // Extract IP address - var ip string - if ipv4, err := ma.ValueForProtocol(multiaddr.P_IP4); err == nil && ipv4 != "" { - ip = ipv4 - } else if ipv6, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && ipv6 != "" { - ip = ipv6 - } else { - continue - } - - // Skip localhost loopback addresses (we'll use localhost:3320 as fallback) - if ip == "localhost" || ip == "::1" { - continue - } - - // Build Olric server address (standard port 3320) - olricAddr := net.JoinHostPort(ip, "3320") - if !seen[olricAddr] { - olricServers = append(olricServers, olricAddr) - seen[olricAddr] = true - } - } - } - - // Also check peers from config - if cfg := networkClient.Config(); cfg != nil { - for _, peerAddr := range cfg.BootstrapPeers { - ma, err := multiaddr.NewMultiaddr(peerAddr) - if err != nil { - continue - } - - var ip string - if ipv4, err := ma.ValueForProtocol(multiaddr.P_IP4); err == nil && ipv4 != "" { - ip = ipv4 - } else if ipv6, err := ma.ValueForProtocol(multiaddr.P_IP6); err == nil && ipv6 != "" { - ip = ipv6 - } else { - continue - } - - // Skip localhost - if ip == "localhost" || ip == "::1" { - continue - } - - olricAddr := net.JoinHostPort(ip, "3320") - if !seen[olricAddr] { - olricServers = append(olricServers, olricAddr) - seen[olricAddr] = true - } - } - } - - // If we found servers, log them - if len(olricServers) > 0 { - logger.Info("Discovered Olric servers from LibP2P network", - zap.Strings("servers", olricServers)) - } - - return olricServers -} - -// ipfsDiscoveryResult holds discovered IPFS configuration -type ipfsDiscoveryResult struct { - clusterURL string - apiURL string - timeout time.Duration - replicationFactor int - enableEncryption bool -} - -// discoverIPFSFromNodeConfigs discovers IPFS configuration from node.yaml files -// Checks node-1.yaml through node-5.yaml for IPFS configuration -func discoverIPFSFromNodeConfigs(logger *zap.Logger) ipfsDiscoveryResult { - homeDir, err := os.UserHomeDir() - if err != nil { - logger.Debug("Failed to get home directory for IPFS discovery", zap.Error(err)) - return ipfsDiscoveryResult{} - } - - configDir := filepath.Join(homeDir, ".orama") - - // Try all node config files for IPFS settings - configFiles := []string{"node-1.yaml", "node-2.yaml", "node-3.yaml", "node-4.yaml", "node-5.yaml"} - - for _, filename := range configFiles { - configPath := filepath.Join(configDir, filename) - data, err := os.ReadFile(configPath) - if err != nil { - continue - } - - var nodeCfg config.Config - if err := config.DecodeStrict(strings.NewReader(string(data)), &nodeCfg); err != nil { - logger.Debug("Failed to parse node config for IPFS discovery", - zap.String("file", filename), zap.Error(err)) - continue - } - - // Check if IPFS is configured - if nodeCfg.Database.IPFS.ClusterAPIURL != "" { - result := ipfsDiscoveryResult{ - clusterURL: nodeCfg.Database.IPFS.ClusterAPIURL, - apiURL: nodeCfg.Database.IPFS.APIURL, - timeout: nodeCfg.Database.IPFS.Timeout, - replicationFactor: nodeCfg.Database.IPFS.ReplicationFactor, - enableEncryption: nodeCfg.Database.IPFS.EnableEncryption, - } - - if result.apiURL == "" { - result.apiURL = "http://localhost:5001" - } - if result.timeout == 0 { - result.timeout = 60 * time.Second - } - if result.replicationFactor == 0 { - result.replicationFactor = 3 - } - // Default encryption to true if not set - if !result.enableEncryption { - result.enableEncryption = true - } - - logger.Info("Discovered IPFS config from node config", - zap.String("file", filename), - zap.String("cluster_url", result.clusterURL), - zap.String("api_url", result.apiURL), - zap.Bool("encryption_enabled", result.enableEncryption)) - - return result - } - } - - return ipfsDiscoveryResult{} -}