diff --git a/.gitignore b/.gitignore index 4f504f3..83f7e41 100644 --- a/.gitignore +++ b/.gitignore @@ -88,4 +88,6 @@ configs/ # Remote node credentials scripts/remote-nodes.conf -orama-cli-linux \ No newline at end of file +orama-cli-linux + +rnd/ \ No newline at end of file diff --git a/e2e/deployments/nextjs_ssr_test.go b/e2e/deployments/nextjs_ssr_test.go index 4cb09e1..da3ae8d 100644 --- a/e2e/deployments/nextjs_ssr_test.go +++ b/e2e/deployments/nextjs_ssr_test.go @@ -213,6 +213,11 @@ func createNextJSDeployment(t *testing.T, env *e2e.E2ETestEnv, name, tarballPath body.WriteString("Content-Disposition: form-data; name=\"name\"\r\n\r\n") body.WriteString(name + "\r\n") + // Write ssr field (enable SSR mode) + body.WriteString("--" + boundary + "\r\n") + body.WriteString("Content-Disposition: form-data; name=\"ssr\"\r\n\r\n") + body.WriteString("true\r\n") + // Write tarball file body.WriteString("--" + boundary + "\r\n") body.WriteString("Content-Disposition: form-data; name=\"tarball\"; filename=\"app.tar.gz\"\r\n") @@ -230,7 +235,9 @@ func createNextJSDeployment(t *testing.T, env *e2e.E2ETestEnv, name, tarballPath req.Header.Set("Content-Type", "multipart/form-data; boundary="+boundary) req.Header.Set("Authorization", "Bearer "+env.APIKey) - resp, err := env.HTTPClient.Do(req) + // Use a longer timeout for large Next.js uploads (can be 50MB+) + uploadClient := e2e.NewHTTPClient(5 * time.Minute) + resp, err := uploadClient.Do(req) if err != nil { t.Fatalf("failed to execute request: %v", err) } diff --git a/e2e/env.go b/e2e/env.go index 2902852..1b3492b 100644 --- a/e2e/env.go +++ b/e2e/env.go @@ -89,7 +89,13 @@ func GetGatewayURL() string { } cacheMutex.RUnlock() - // Check environment variable first + // Check environment variables first (ORAMA_GATEWAY_URL takes precedence) + if envURL := os.Getenv("ORAMA_GATEWAY_URL"); envURL != "" { + cacheMutex.Lock() + gatewayURLCache = envURL + cacheMutex.Unlock() + return envURL + } if envURL := os.Getenv("GATEWAY_URL"); envURL != "" { cacheMutex.Lock() gatewayURLCache = envURL @@ -153,7 +159,16 @@ func queryAPIKeyFromRQLite() (string, error) { return envKey, nil } - // 2. Build database path from bootstrap/node config + // 2. If ORAMA_GATEWAY_URL is set (production mode), query the remote RQLite HTTP API + if gatewayURL := os.Getenv("ORAMA_GATEWAY_URL"); gatewayURL != "" { + apiKey, err := queryAPIKeyFromRemoteRQLite(gatewayURL) + if err == nil && apiKey != "" { + return apiKey, nil + } + // Fall through to local database check if remote fails + } + + // 3. Build database path from bootstrap/node config (for local development) homeDir, err := os.UserHomeDir() if err != nil { return "", fmt.Errorf("failed to get home directory: %w", err) @@ -210,6 +225,60 @@ func queryAPIKeyFromRQLite() (string, error) { return "", fmt.Errorf("failed to retrieve API key from any SQLite database") } +// queryAPIKeyFromRemoteRQLite queries the remote RQLite HTTP API for an API key +func queryAPIKeyFromRemoteRQLite(gatewayURL string) (string, error) { + // Parse the gateway URL to extract the host + parsed, err := url.Parse(gatewayURL) + if err != nil { + return "", fmt.Errorf("failed to parse gateway URL: %w", err) + } + + // RQLite HTTP API runs on port 5001 (not the gateway port 6001) + rqliteURL := fmt.Sprintf("http://%s:5001/db/query", parsed.Hostname()) + + // Create request body + reqBody := `["SELECT key FROM api_keys LIMIT 1"]` + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, rqliteURL, strings.NewReader(reqBody)) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", fmt.Errorf("failed to query rqlite: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("rqlite returned status %d", resp.StatusCode) + } + + // Parse response + var result struct { + Results []struct { + Columns []string `json:"columns"` + Values [][]interface{} `json:"values"` + } `json:"results"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return "", fmt.Errorf("failed to decode response: %w", err) + } + + if len(result.Results) > 0 && len(result.Results[0].Values) > 0 && len(result.Results[0].Values[0]) > 0 { + if apiKey, ok := result.Results[0].Values[0][0].(string); ok && apiKey != "" { + return apiKey, nil + } + } + + return "", fmt.Errorf("no API key found in rqlite") +} + // GetAPIKey returns the gateway API key from rqlite or cache func GetAPIKey() string { cacheMutex.RLock() diff --git a/e2e/ipfs_cluster_test.go b/e2e/ipfs_cluster_test.go index 5d8dff1..a0d4812 100644 --- a/e2e/ipfs_cluster_test.go +++ b/e2e/ipfs_cluster_test.go @@ -13,7 +13,12 @@ import ( "github.com/DeBrosOfficial/network/pkg/ipfs" ) +// Note: These tests connect directly to IPFS Cluster API (localhost:9094) +// and IPFS API (localhost:4501). They are for local development only. +// For production testing, use storage_http_test.go which uses gateway endpoints. + func TestIPFSCluster_Health(t *testing.T) { + SkipIfProduction(t) // Direct IPFS connection not available in production ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -35,6 +40,7 @@ func TestIPFSCluster_Health(t *testing.T) { } func TestIPFSCluster_GetPeerCount(t *testing.T) { + SkipIfProduction(t) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -62,6 +68,7 @@ func TestIPFSCluster_GetPeerCount(t *testing.T) { } func TestIPFSCluster_AddFile(t *testing.T) { + SkipIfProduction(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -94,6 +101,7 @@ func TestIPFSCluster_AddFile(t *testing.T) { } func TestIPFSCluster_PinFile(t *testing.T) { + SkipIfProduction(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -131,6 +139,7 @@ func TestIPFSCluster_PinFile(t *testing.T) { } func TestIPFSCluster_PinStatus(t *testing.T) { + SkipIfProduction(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -188,6 +197,7 @@ func TestIPFSCluster_PinStatus(t *testing.T) { } func TestIPFSCluster_UnpinFile(t *testing.T) { + SkipIfProduction(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -226,6 +236,7 @@ func TestIPFSCluster_UnpinFile(t *testing.T) { } func TestIPFSCluster_GetFile(t *testing.T) { + SkipIfProduction(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -272,6 +283,7 @@ func TestIPFSCluster_GetFile(t *testing.T) { } func TestIPFSCluster_LargeFile(t *testing.T) { + SkipIfProduction(t) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() @@ -305,6 +317,7 @@ func TestIPFSCluster_LargeFile(t *testing.T) { } func TestIPFSCluster_ReplicationFactor(t *testing.T) { + SkipIfProduction(t) // Direct IPFS connection not available in production ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -352,6 +365,7 @@ func TestIPFSCluster_ReplicationFactor(t *testing.T) { } func TestIPFSCluster_MultipleFiles(t *testing.T) { + SkipIfProduction(t) // Direct IPFS connection not available in production ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() diff --git a/pkg/cli/production/install/orchestrator.go b/pkg/cli/production/install/orchestrator.go index 6a433fe..a32ef8d 100644 --- a/pkg/cli/production/install/orchestrator.go +++ b/pkg/cli/production/install/orchestrator.go @@ -107,7 +107,10 @@ func (o *Orchestrator) Execute() error { // Phase 4: Generate configs (BEFORE service initialization) fmt.Printf("\n⚙️ Phase 4: Generating configurations...\n") - enableHTTPS := o.flags.Domain != "" + // Internal gateway always runs HTTP on port 6001 + // When using Caddy (nameserver mode), Caddy handles external HTTPS and proxies to internal gateway + // When not using Caddy, the gateway runs HTTP-only (use a reverse proxy for HTTPS) + enableHTTPS := false if err := o.setup.Phase4GenerateConfigs(o.peers, o.flags.VpsIP, enableHTTPS, o.flags.Domain, o.flags.BaseDomain, o.flags.JoinAddress); err != nil { return fmt.Errorf("configuration generation failed: %w", err) } diff --git a/pkg/environments/production/config.go b/pkg/environments/production/config.go index 714ee1c..b224111 100644 --- a/pkg/environments/production/config.go +++ b/pkg/environments/production/config.go @@ -106,18 +106,11 @@ func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP stri } // Determine advertise addresses - use vpsIP if provided - // When HTTPS is enabled, RQLite uses native TLS on port 7002 (not SNI gateway) - // This avoids conflicts between SNI gateway TLS termination and RQLite's native TLS + // Always use port 7001 for RQLite Raft (no TLS) var httpAdvAddr, raftAdvAddr string if vpsIP != "" { httpAdvAddr = net.JoinHostPort(vpsIP, "5001") - if enableHTTPS { - // Use direct IP:7002 for Raft - RQLite handles TLS natively via -node-cert - // This bypasses the SNI gateway which would cause TLS termination conflicts - raftAdvAddr = net.JoinHostPort(vpsIP, "7002") - } else { - raftAdvAddr = net.JoinHostPort(vpsIP, "7001") - } + raftAdvAddr = net.JoinHostPort(vpsIP, "7001") } else { // Fallback to localhost if no vpsIP httpAdvAddr = "localhost:5001" @@ -125,21 +118,14 @@ func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP stri } // Determine RQLite join address - // When HTTPS is enabled, use port 7002 (direct RQLite TLS) instead of 7001 (SNI gateway) + // Always use port 7001 for RQLite Raft communication (no TLS) joinPort := "7001" - if enableHTTPS { - joinPort = "7002" - } var rqliteJoinAddr string if joinAddress != "" { // Use explicitly provided join address - // Adjust port based on HTTPS mode: - // - HTTPS enabled: use port 7002 (direct RQLite TLS, bypassing SNI gateway) - // - HTTPS disabled: use port 7001 (standard RQLite Raft port) - if enableHTTPS && strings.Contains(joinAddress, ":7001") { - rqliteJoinAddr = strings.Replace(joinAddress, ":7001", ":7002", 1) - } else if !enableHTTPS && strings.Contains(joinAddress, ":7002") { + // Normalize to port 7001 (non-TLS) regardless of what was provided + if strings.Contains(joinAddress, ":7002") { rqliteJoinAddr = strings.Replace(joinAddress, ":7002", ":7001", 1) } else { rqliteJoinAddr = joinAddress @@ -166,11 +152,9 @@ func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP stri } // Unified data directory (all nodes equal) - // When HTTPS/SNI is enabled, use internal port 7002 for RQLite Raft (SNI gateway listens on 7001) + // Always use port 7001 for RQLite Raft - TLS is optional and managed separately + // The SNI gateway approach was removed to simplify certificate management raftInternalPort := 7001 - if enableHTTPS { - raftInternalPort = 7002 // Internal port when SNI is enabled - } data := templates.NodeConfigData{ NodeID: nodeID, @@ -194,15 +178,10 @@ func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP stri HTTPSPort: httpsPort, } - // When HTTPS is enabled, configure RQLite node-to-node TLS encryption - // RQLite handles TLS natively on port 7002, bypassing the SNI gateway - // This avoids TLS termination conflicts between SNI gateway and RQLite - if enableHTTPS && domain != "" { - data.NodeCert = filepath.Join(tlsCacheDir, domain+".crt") - data.NodeKey = filepath.Join(tlsCacheDir, domain+".key") - // Skip verification since nodes may have different domain certificates - data.NodeNoVerify = true - } + // RQLite node-to-node TLS encryption is disabled by default + // This simplifies certificate management - RQLite uses plain TCP for internal Raft + // HTTPS is still used for client-facing gateway traffic via autocert + // TLS can be enabled manually later if needed for inter-node encryption return templates.RenderNodeConfig(data) } diff --git a/pkg/environments/production/installers/coredns.go b/pkg/environments/production/installers/coredns.go index 85577df..10116d2 100644 --- a/pkg/environments/production/installers/coredns.go +++ b/pkg/environments/production/installers/coredns.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/url" "os" "os/exec" "path/filepath" @@ -314,12 +315,42 @@ func (ci *CoreDNSInstaller) generateCorefile(domain, rqliteDSN string) string { // seedStaticRecords inserts static zone records into RQLite func (ci *CoreDNSInstaller) seedStaticRecords(domain, rqliteDSN, ns1IP, ns2IP, ns3IP string) error { + // First, check if nameserver A records already exist with different IPs + // If so, we should preserve them instead of overwriting with potentially wrong IPs + existingNSIPs, err := ci.getExistingNameserverIPs(domain, rqliteDSN) + if err == nil && len(existingNSIPs) == 3 { + // Check if they have at least 2 different IPs (properly configured cluster) + uniqueIPs := make(map[string]bool) + for _, ip := range existingNSIPs { + uniqueIPs[ip] = true + } + if len(uniqueIPs) >= 2 { + // Nameserver records are already properly configured, use existing IPs + fmt.Fprintf(ci.logWriter, " Using existing nameserver IPs from database\n") + ns1IP = existingNSIPs[0] + ns2IP = existingNSIPs[1] + ns3IP = existingNSIPs[2] + } + } + // Generate serial based on current date serial := fmt.Sprintf("%d", time.Now().Unix()) // SOA record format: "mname rname serial refresh retry expire minimum" soaValue := fmt.Sprintf("ns1.%s. admin.%s. %s 3600 1800 604800 300", domain, domain, serial) + // First, delete existing system records to avoid duplicates + // We only delete system records, not deployment-created records + deleteStatements := []string{ + fmt.Sprintf(`DELETE FROM dns_records WHERE namespace = 'system' AND fqdn = '%s.' AND record_type IN ('SOA', 'NS', 'A')`, domain), + fmt.Sprintf(`DELETE FROM dns_records WHERE namespace = 'system' AND fqdn = '*.%s.' AND record_type = 'A'`, domain), + fmt.Sprintf(`DELETE FROM dns_records WHERE namespace = 'system' AND fqdn LIKE 'ns%%.%s.' AND record_type = 'A'`, domain), + } + + if err := ci.executeRQLiteStatements(rqliteDSN, deleteStatements); err != nil { + return fmt.Errorf("failed to clean up old records: %w", err) + } + // Define all static records records := []struct { fqdn string @@ -354,10 +385,9 @@ func (ci *CoreDNSInstaller) seedStaticRecords(domain, rqliteDSN, ns1IP, ns2IP, n // Build SQL statements var statements []string for _, r := range records { - // Use INSERT OR REPLACE to handle updates // IMPORTANT: Must set is_active = TRUE for CoreDNS to find the records stmt := fmt.Sprintf( - `INSERT OR REPLACE INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at) VALUES ('%s', '%s', '%s', %d, 'system', 'system', TRUE, datetime('now'), datetime('now'))`, + `INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at) VALUES ('%s', '%s', '%s', %d, 'system', 'system', TRUE, datetime('now'), datetime('now'))`, r.fqdn, r.recordType, r.value, r.ttl, ) statements = append(statements, stmt) @@ -367,6 +397,63 @@ func (ci *CoreDNSInstaller) seedStaticRecords(domain, rqliteDSN, ns1IP, ns2IP, n return ci.executeRQLiteStatements(rqliteDSN, statements) } +// getExistingNameserverIPs queries RQLite for existing ns1, ns2, ns3 A record IPs +func (ci *CoreDNSInstaller) getExistingNameserverIPs(domain, rqliteDSN string) ([]string, error) { + // Build query - use url.QueryEscape to properly encode the SQL + query := fmt.Sprintf("SELECT fqdn, value FROM dns_records WHERE fqdn LIKE 'ns_.%s.' AND record_type = 'A' AND is_active = TRUE ORDER BY fqdn", domain) + queryURL := fmt.Sprintf("%s/db/query?q=%s", rqliteDSN, url.QueryEscape(query)) + + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get(queryURL) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("query failed with status %d", resp.StatusCode) + } + + var result struct { + Results []struct { + Values [][]interface{} `json:"values"` + } `json:"results"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, err + } + + if len(result.Results) == 0 || result.Results[0].Values == nil || len(result.Results[0].Values) < 3 { + return nil, fmt.Errorf("not enough nameserver records found") + } + + // Extract IPs for ns1, ns2, ns3 (ordered by fqdn) + ips := make([]string, 0, 3) + for _, row := range result.Results[0].Values { + if len(row) >= 2 { + if ip, ok := row[1].(string); ok { + ips = append(ips, ip) + } + } + } + + if len(ips) != 3 { + return nil, fmt.Errorf("expected 3 nameserver IPs, got %d", len(ips)) + } + + return ips, nil +} + +// rqliteResult represents the response from RQLite execute endpoint +type rqliteResult struct { + Results []struct { + Error string `json:"error,omitempty"` + RowsAffected int `json:"rows_affected,omitempty"` + LastInsertID int `json:"last_insert_id,omitempty"` + } `json:"results"` +} + // executeRQLiteStatements executes SQL statements via RQLite HTTP API func (ci *CoreDNSInstaller) executeRQLiteStatements(rqliteDSN string, statements []string) error { // RQLite execute endpoint @@ -378,6 +465,9 @@ func (ci *CoreDNSInstaller) executeRQLiteStatements(rqliteDSN string, statements return fmt.Errorf("failed to marshal statements: %w", err) } + // Log what we're sending for debugging + fmt.Fprintf(ci.logWriter, " Executing %d SQL statements...\n", len(statements)) + // Create request req, err := http.NewRequest("POST", executeURL, bytes.NewReader(body)) if err != nil { @@ -393,11 +483,39 @@ func (ci *CoreDNSInstaller) executeRQLiteStatements(rqliteDSN string, statements } defer resp.Body.Close() + // Read response body + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response: %w", err) + } + if resp.StatusCode != http.StatusOK { - respBody, _ := io.ReadAll(resp.Body) return fmt.Errorf("RQLite returned status %d: %s", resp.StatusCode, string(respBody)) } + // Parse response to check for SQL errors + var result rqliteResult + if err := json.Unmarshal(respBody, &result); err != nil { + return fmt.Errorf("failed to parse RQLite response: %w (body: %s)", err, string(respBody)) + } + + // Check each result for errors + var errors []string + successCount := 0 + for i, r := range result.Results { + if r.Error != "" { + errors = append(errors, fmt.Sprintf("statement %d: %s", i+1, r.Error)) + } else { + successCount++ + } + } + + if len(errors) > 0 { + fmt.Fprintf(ci.logWriter, " ⚠️ %d/%d statements succeeded, %d failed\n", successCount, len(statements), len(errors)) + return fmt.Errorf("SQL errors: %v", errors) + } + + fmt.Fprintf(ci.logWriter, " ✓ All %d statements executed successfully\n", successCount) return nil } diff --git a/pkg/environments/production/orchestrator.go b/pkg/environments/production/orchestrator.go index 01ec663..576e9ca 100644 --- a/pkg/environments/production/orchestrator.go +++ b/pkg/environments/production/orchestrator.go @@ -668,18 +668,32 @@ func (ps *ProductionSetup) SeedDNSRecords(baseDomain, vpsIP string, peerAddresse ps.logf("Seeding DNS records...") - // Get node IPs from peer addresses or use the VPS IP for all + // Get node IPs from peer addresses (multiaddrs) or use the VPS IP for all + // Peer addresses are multiaddrs like /ip4/1.2.3.4/tcp/4001/p2p/12D3KooW... + // We need to extract just the IP from them ns1IP := vpsIP ns2IP := vpsIP ns3IP := vpsIP - if len(peerAddresses) >= 1 && peerAddresses[0] != "" { - ns1IP = peerAddresses[0] + + // Extract IPs from multiaddrs + var extractedIPs []string + for _, peer := range peerAddresses { + if peer != "" { + if ip := extractIPFromMultiaddr(peer); ip != "" { + extractedIPs = append(extractedIPs, ip) + } + } } - if len(peerAddresses) >= 2 && peerAddresses[1] != "" { - ns2IP = peerAddresses[1] + + // Assign extracted IPs to nameservers + if len(extractedIPs) >= 1 { + ns1IP = extractedIPs[0] } - if len(peerAddresses) >= 3 && peerAddresses[2] != "" { - ns3IP = peerAddresses[2] + if len(extractedIPs) >= 2 { + ns2IP = extractedIPs[1] + } + if len(extractedIPs) >= 3 { + ns3IP = extractedIPs[2] } rqliteDSN := "http://localhost:5001" diff --git a/pkg/environments/templates/node.yaml b/pkg/environments/templates/node.yaml index 3d72faf..7a894d9 100644 --- a/pkg/environments/templates/node.yaml +++ b/pkg/environments/templates/node.yaml @@ -49,7 +49,7 @@ logging: http_gateway: enabled: true - listen_addr: "{{if .EnableHTTPS}}:{{.HTTPSPort}}{{else}}:{{.UnifiedGatewayPort}}{{end}}" + listen_addr: ":{{.UnifiedGatewayPort}}" node_name: "{{.NodeID}}" base_domain: "{{.BaseDomain}}" @@ -63,17 +63,8 @@ http_gateway: email: "admin@{{.Domain}}" {{end}} - {{if .EnableHTTPS}}sni: - enabled: true - listen_addr: ":{{.RQLiteRaftPort}}" - cert_file: "{{.TLSCacheDir}}/{{.Domain}}.crt" - key_file: "{{.TLSCacheDir}}/{{.Domain}}.key" - routes: - # Note: Raft traffic bypasses SNI gateway - RQLite uses native TLS on port 7002 - ipfs.{{.Domain}}: "localhost:4101" - ipfs-cluster.{{.Domain}}: "localhost:9098" - olric.{{.Domain}}: "localhost:3322" - {{end}} + # SNI gateway disabled - Caddy handles TLS termination for external traffic + # Internal service-to-service communication uses plain TCP # Full gateway configuration (for API, auth, pubsub, and internal service routing) client_namespace: "default" diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index e5bf5db..28e8182 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -258,6 +258,10 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { if gw.cfg.BaseDomain != "" { gw.deploymentService.SetBaseDomain(gw.cfg.BaseDomain) } + // Set node peer ID so deployments run on the node that receives the request + if gw.cfg.NodePeerID != "" { + gw.deploymentService.SetNodePeerID(gw.cfg.NodePeerID) + } // Create deployment handlers gw.staticHandler = deploymentshandlers.NewStaticDeploymentHandler( diff --git a/pkg/gateway/handlers/deployments/go_handler.go b/pkg/gateway/handlers/deployments/go_handler.go index 6f8173d..48aa772 100644 --- a/pkg/gateway/handlers/deployments/go_handler.go +++ b/pkg/gateway/handlers/deployments/go_handler.go @@ -73,6 +73,15 @@ func (h *GoHandler) HandleUpload(w http.ResponseWriter, r *http.Request) { healthCheckPath = "/health" } + // Parse environment variables (form fields starting with "env_") + envVars := make(map[string]string) + for key, values := range r.MultipartForm.Value { + if strings.HasPrefix(key, "env_") && len(values) > 0 { + envName := strings.TrimPrefix(key, "env_") + envVars[envName] = values[0] + } + } + // Get tarball file file, header, err := r.FormFile("tarball") if err != nil { @@ -99,7 +108,7 @@ func (h *GoHandler) HandleUpload(w http.ResponseWriter, r *http.Request) { cid := addResp.Cid // Deploy the Go backend - deployment, err := h.deploy(ctx, namespace, name, subdomain, cid, healthCheckPath) + deployment, err := h.deploy(ctx, namespace, name, subdomain, cid, healthCheckPath, envVars) if err != nil { h.logger.Error("Failed to deploy Go backend", zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -131,7 +140,7 @@ func (h *GoHandler) HandleUpload(w http.ResponseWriter, r *http.Request) { } // deploy deploys a Go backend -func (h *GoHandler) deploy(ctx context.Context, namespace, name, subdomain, cid, healthCheckPath string) (*deployments.Deployment, error) { +func (h *GoHandler) deploy(ctx context.Context, namespace, name, subdomain, cid, healthCheckPath string, envVars map[string]string) (*deployments.Deployment, error) { // Create deployment directory deployPath := filepath.Join(h.baseDeployPath, namespace, name) if err := os.MkdirAll(deployPath, 0755); err != nil { @@ -169,7 +178,7 @@ func (h *GoHandler) deploy(ctx context.Context, namespace, name, subdomain, cid, Status: deployments.DeploymentStatusDeploying, ContentCID: cid, Subdomain: subdomain, - Environment: make(map[string]string), + Environment: envVars, MemoryLimitMB: 256, CPULimitPercent: 100, HealthCheckPath: healthCheckPath, diff --git a/pkg/gateway/handlers/deployments/service.go b/pkg/gateway/handlers/deployments/service.go index 6543b02..2366224 100644 --- a/pkg/gateway/handlers/deployments/service.go +++ b/pkg/gateway/handlers/deployments/service.go @@ -19,6 +19,7 @@ type DeploymentService struct { portAllocator *deployments.PortAllocator logger *zap.Logger baseDomain string // Base domain for deployments (e.g., "dbrs.space") + nodePeerID string // Current node's peer ID (deployments run on this node) } // NewDeploymentService creates a new deployment service @@ -44,6 +45,12 @@ func (s *DeploymentService) SetBaseDomain(domain string) { } } +// SetNodePeerID sets the current node's peer ID +// Deployments will always run on this node (no cross-node routing for deployment creation) +func (s *DeploymentService) SetNodePeerID(peerID string) { + s.nodePeerID = peerID +} + // BaseDomain returns the configured base domain func (s *DeploymentService) BaseDomain() string { if s.baseDomain == "" { @@ -69,8 +76,13 @@ func GetShortNodeID(peerID string) string { // CreateDeployment creates a new deployment func (s *DeploymentService) CreateDeployment(ctx context.Context, deployment *deployments.Deployment) error { - // Assign home node if not already assigned - if deployment.HomeNodeID == "" { + // Always use current node's peer ID for home node + // Deployments run on the node that receives the creation request + // This ensures port allocation matches where the service actually runs + if s.nodePeerID != "" { + deployment.HomeNodeID = s.nodePeerID + } else if deployment.HomeNodeID == "" { + // Fallback to home node manager if no node peer ID configured homeNodeID, err := s.homeNodeManager.AssignHomeNode(ctx, deployment.Namespace) if err != nil { return fmt.Errorf("failed to assign home node: %w", err) diff --git a/pkg/gateway/handlers/sqlite/create_handler.go b/pkg/gateway/handlers/sqlite/create_handler.go index 76b85c4..559acaa 100644 --- a/pkg/gateway/handlers/sqlite/create_handler.go +++ b/pkg/gateway/handlers/sqlite/create_handler.go @@ -54,12 +54,19 @@ func NewSQLiteHandler(db rqlite.Client, homeNodeManager *deployments.HomeNodeMan } } +// writeCreateError writes an error response as JSON for consistency +func writeCreateError(w http.ResponseWriter, status int, message string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(map[string]string{"error": message}) +} + // CreateDatabase creates a new SQLite database for a namespace func (h *SQLiteHandler) CreateDatabase(w http.ResponseWriter, r *http.Request) { ctx := r.Context() namespace, ok := ctx.Value(ctxkeys.NamespaceOverride).(string) if !ok || namespace == "" { - http.Error(w, "Namespace not found in context", http.StatusUnauthorized) + writeCreateError(w, http.StatusUnauthorized, "Namespace not found in context") return } @@ -68,18 +75,18 @@ func (h *SQLiteHandler) CreateDatabase(w http.ResponseWriter, r *http.Request) { } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, "Invalid request body", http.StatusBadRequest) + writeCreateError(w, http.StatusBadRequest, "Invalid request body") return } if req.DatabaseName == "" { - http.Error(w, "database_name is required", http.StatusBadRequest) + writeCreateError(w, http.StatusBadRequest, "database_name is required") return } // Validate database name (alphanumeric, underscore, hyphen only) if !isValidDatabaseName(req.DatabaseName) { - http.Error(w, "Invalid database name. Use only alphanumeric characters, underscores, and hyphens", http.StatusBadRequest) + writeCreateError(w, http.StatusBadRequest, "Invalid database name. Use only alphanumeric characters, underscores, and hyphens") return } @@ -88,18 +95,26 @@ func (h *SQLiteHandler) CreateDatabase(w http.ResponseWriter, r *http.Request) { zap.String("database", req.DatabaseName), ) - // Assign home node for namespace - homeNodeID, err := h.homeNodeManager.AssignHomeNode(ctx, namespace) - if err != nil { - h.logger.Error("Failed to assign home node", zap.Error(err)) - http.Error(w, "Failed to assign home node", http.StatusInternalServerError) - return + // For SQLite databases, the home node is ALWAYS the current node + // because the database file is stored locally on this node's filesystem. + // This is different from deployments which can be load-balanced across nodes. + homeNodeID := h.currentNodeID + if homeNodeID == "" { + // Fallback: if node ID not configured, try to get from HomeNodeManager + // This provides backward compatibility for single-node setups + var err error + homeNodeID, err = h.homeNodeManager.AssignHomeNode(ctx, namespace) + if err != nil { + h.logger.Error("Failed to assign home node", zap.Error(err)) + writeCreateError(w, http.StatusInternalServerError, "Failed to assign home node") + return + } } // Check if database already exists existing, err := h.getDatabaseRecord(ctx, namespace, req.DatabaseName) if err == nil && existing != nil { - http.Error(w, "Database already exists", http.StatusConflict) + writeCreateError(w, http.StatusConflict, "Database already exists") return } @@ -110,7 +125,7 @@ func (h *SQLiteHandler) CreateDatabase(w http.ResponseWriter, r *http.Request) { // Create directory if needed if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { h.logger.Error("Failed to create directory", zap.Error(err)) - http.Error(w, "Failed to create database directory", http.StatusInternalServerError) + writeCreateError(w, http.StatusInternalServerError, "Failed to create database directory") return } @@ -118,7 +133,7 @@ func (h *SQLiteHandler) CreateDatabase(w http.ResponseWriter, r *http.Request) { sqliteDB, err := sql.Open("sqlite3", dbPath) if err != nil { h.logger.Error("Failed to create SQLite database", zap.Error(err)) - http.Error(w, "Failed to create database", http.StatusInternalServerError) + writeCreateError(w, http.StatusInternalServerError, "Failed to create database") return } @@ -141,7 +156,7 @@ func (h *SQLiteHandler) CreateDatabase(w http.ResponseWriter, r *http.Request) { if err != nil { h.logger.Error("Failed to record database", zap.Error(err)) os.Remove(dbPath) // Cleanup - http.Error(w, "Failed to record database", http.StatusInternalServerError) + writeCreateError(w, http.StatusInternalServerError, "Failed to record database") return } diff --git a/pkg/gateway/handlers/sqlite/query_handler.go b/pkg/gateway/handlers/sqlite/query_handler.go index 248a9cd..70d9af2 100644 --- a/pkg/gateway/handlers/sqlite/query_handler.go +++ b/pkg/gateway/handlers/sqlite/query_handler.go @@ -28,35 +28,42 @@ type QueryResponse struct { Error string `json:"error,omitempty"` } +// writeJSONError writes an error response as JSON for consistency +func writeJSONError(w http.ResponseWriter, status int, message string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(QueryResponse{Error: message}) +} + // QueryDatabase executes a SQL query on a namespace database func (h *SQLiteHandler) QueryDatabase(w http.ResponseWriter, r *http.Request) { ctx := r.Context() namespace, ok := ctx.Value(ctxkeys.NamespaceOverride).(string) if !ok || namespace == "" { - http.Error(w, "Namespace not found in context", http.StatusUnauthorized) + writeJSONError(w, http.StatusUnauthorized, "Namespace not found in context") return } var req QueryRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, "Invalid request body", http.StatusBadRequest) + writeJSONError(w, http.StatusBadRequest, "Invalid request body") return } if req.DatabaseName == "" { - http.Error(w, "database_name is required", http.StatusBadRequest) + writeJSONError(w, http.StatusBadRequest, "database_name is required") return } if req.Query == "" { - http.Error(w, "query is required", http.StatusBadRequest) + writeJSONError(w, http.StatusBadRequest, "query is required") return } // Get database metadata dbMeta, err := h.getDatabaseRecord(ctx, namespace, req.DatabaseName) if err != nil { - http.Error(w, "Database not found", http.StatusNotFound) + writeJSONError(w, http.StatusNotFound, "Database not found") return } @@ -70,7 +77,7 @@ func (h *SQLiteHandler) QueryDatabase(w http.ResponseWriter, r *http.Request) { zap.String("home_node", homeNodeID), zap.String("current_node", h.currentNodeID), ) - http.Error(w, "Database is on a different node. Use node-specific URL or wait for routing implementation.", http.StatusMisdirectedRequest) + writeJSONError(w, http.StatusMisdirectedRequest, "Database is on a different node. Use node-specific URL or wait for routing implementation.") return } @@ -83,7 +90,7 @@ func (h *SQLiteHandler) QueryDatabase(w http.ResponseWriter, r *http.Request) { zap.String("namespace", namespace), zap.String("database", req.DatabaseName), ) - http.Error(w, "Database file not found on this node", http.StatusNotFound) + writeJSONError(w, http.StatusNotFound, "Database file not found on this node") return } @@ -91,7 +98,7 @@ func (h *SQLiteHandler) QueryDatabase(w http.ResponseWriter, r *http.Request) { db, err := sql.Open("sqlite3", filePath) if err != nil { h.logger.Error("Failed to open database", zap.Error(err)) - http.Error(w, "Failed to open database", http.StatusInternalServerError) + writeJSONError(w, http.StatusInternalServerError, "Failed to open database") return } defer db.Close() diff --git a/pkg/gateway/middleware.go b/pkg/gateway/middleware.go index 8f9f3db..08bea02 100644 --- a/pkg/gateway/middleware.go +++ b/pkg/gateway/middleware.go @@ -2,7 +2,6 @@ package gateway import ( "context" - "crypto/tls" "encoding/json" "io" "net" @@ -713,9 +712,9 @@ func (g *Gateway) proxyCrossNode(w http.ResponseWriter, r *http.Request, deploym zap.String("current_node", g.nodePeerID), ) - // Proxy to home node via HTTPS - // Use the original Host header so the home node's TLS works correctly - targetURL := "https://" + homeIP + r.URL.Path + // Proxy to home node via internal HTTP port (6001) + // This is node-to-node internal communication - no TLS needed + targetURL := "http://" + homeIP + ":6001" + r.URL.Path if r.URL.RawQuery != "" { targetURL += "?" + r.URL.RawQuery } @@ -726,26 +725,19 @@ func (g *Gateway) proxyCrossNode(w http.ResponseWriter, r *http.Request, deploym return false } - // Copy headers and set Host header to original host + // Copy headers and set Host header to original domain for routing for key, values := range r.Header { for _, value := range values { proxyReq.Header.Add(key, value) } } - proxyReq.Host = r.Host // Keep original host for TLS SNI + proxyReq.Host = r.Host // Keep original host for domain routing on target node proxyReq.Header.Set("X-Forwarded-For", getClientIP(r)) proxyReq.Header.Set("X-Orama-Proxy-Node", g.nodePeerID) // Prevent loops - // Skip TLS verification since we're connecting by IP with a Host header - // The home node has the correct certificate for the domain + // Simple HTTP client for internal node-to-node communication httpClient := &http.Client{ Timeout: 30 * time.Second, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - ServerName: r.Host, // Use original host for SNI - }, - }, } resp, err := httpClient.Do(proxyReq) diff --git a/pkg/ipfs/cluster_peer.go b/pkg/ipfs/cluster_peer.go index b172b93..339ae71 100644 --- a/pkg/ipfs/cluster_peer.go +++ b/pkg/ipfs/cluster_peer.go @@ -1,7 +1,9 @@ package ipfs import ( + "encoding/json" "fmt" + "net/http" "os" "os/exec" "path/filepath" @@ -10,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "go.uber.org/zap" ) @@ -81,13 +84,19 @@ func (cm *ClusterConfigManager) DiscoverClusterPeersFromGateway() ([]ClusterPeer return nil, nil } -// DiscoverClusterPeersFromLibP2P uses libp2p host to find other cluster peers +// DiscoverClusterPeersFromLibP2P discovers IPFS and IPFS Cluster peers by querying +// the /v1/network/status endpoint of connected libp2p peers. +// This is the correct approach since IPFS/Cluster peer IDs are different from libp2p peer IDs. func (cm *ClusterConfigManager) DiscoverClusterPeersFromLibP2P(h host.Host) error { if h == nil { return nil } var clusterPeers []string + var ipfsPeers []IPFSPeerEntry + + // Get unique IPs from connected libp2p peers + peerIPs := make(map[string]bool) for _, p := range h.Peerstore().Peers() { if p == h.ID() { continue @@ -95,20 +104,248 @@ func (cm *ClusterConfigManager) DiscoverClusterPeersFromLibP2P(h host.Host) erro info := h.Peerstore().PeerInfo(p) for _, addr := range info.Addrs { - if strings.Contains(addr.String(), "/tcp/9096") || strings.Contains(addr.String(), "/tcp/9094") { - ma := addr.Encapsulate(multiaddr.StringCast(fmt.Sprintf("/p2p/%s", p.String()))) - clusterPeers = append(clusterPeers, ma.String()) + // Extract IP from multiaddr + ip := extractIPFromMultiaddr(addr) + if ip != "" && !strings.HasPrefix(ip, "127.") && !strings.HasPrefix(ip, "::1") { + peerIPs[ip] = true } } } + if len(peerIPs) == 0 { + return nil + } + + // Query each peer's /v1/network/status endpoint to get IPFS and Cluster info + client := &http.Client{Timeout: 5 * time.Second} + for ip := range peerIPs { + statusURL := fmt.Sprintf("http://%s:6001/v1/network/status", ip) + resp, err := client.Get(statusURL) + if err != nil { + cm.logger.Debug("Failed to query peer status", zap.String("ip", ip), zap.Error(err)) + continue + } + + var status NetworkStatusResponse + if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { + resp.Body.Close() + cm.logger.Debug("Failed to decode peer status", zap.String("ip", ip), zap.Error(err)) + continue + } + resp.Body.Close() + + // Add IPFS Cluster peer if available + if status.IPFSCluster != nil && status.IPFSCluster.PeerID != "" { + for _, addr := range status.IPFSCluster.Addresses { + if strings.Contains(addr, "/tcp/9100") { + clusterPeers = append(clusterPeers, addr) + cm.logger.Info("Discovered IPFS Cluster peer", zap.String("peer", addr)) + } + } + } + + // Add IPFS peer if available + if status.IPFS != nil && status.IPFS.PeerID != "" { + for _, addr := range status.IPFS.SwarmAddresses { + if strings.Contains(addr, "/tcp/4101") && !strings.Contains(addr, "127.0.0.1") { + ipfsPeers = append(ipfsPeers, IPFSPeerEntry{ + ID: status.IPFS.PeerID, + Addrs: []string{addr}, + }) + cm.logger.Info("Discovered IPFS peer", zap.String("peer_id", status.IPFS.PeerID)) + break // One address per peer is enough + } + } + } + } + + // Update IPFS Cluster peer addresses if len(clusterPeers) > 0 { - return cm.UpdatePeerAddresses(clusterPeers) + if err := cm.UpdatePeerAddresses(clusterPeers); err != nil { + cm.logger.Warn("Failed to update cluster peer addresses", zap.Error(err)) + } else { + cm.logger.Info("Updated IPFS Cluster peer addresses", zap.Int("count", len(clusterPeers))) + } + } + + // Update IPFS Peering.Peers + if len(ipfsPeers) > 0 { + if err := cm.UpdateIPFSPeeringConfig(ipfsPeers); err != nil { + cm.logger.Warn("Failed to update IPFS peering config", zap.Error(err)) + } else { + cm.logger.Info("Updated IPFS Peering.Peers", zap.Int("count", len(ipfsPeers))) + } } return nil } +// NetworkStatusResponse represents the response from /v1/network/status +type NetworkStatusResponse struct { + PeerID string `json:"peer_id"` + PeerCount int `json:"peer_count"` + IPFS *NetworkStatusIPFS `json:"ipfs,omitempty"` + IPFSCluster *NetworkStatusIPFSCluster `json:"ipfs_cluster,omitempty"` +} + +type NetworkStatusIPFS struct { + PeerID string `json:"peer_id"` + SwarmAddresses []string `json:"swarm_addresses"` +} + +type NetworkStatusIPFSCluster struct { + PeerID string `json:"peer_id"` + Addresses []string `json:"addresses"` +} + +// IPFSPeerEntry represents an IPFS peer for Peering.Peers config +type IPFSPeerEntry struct { + ID string `json:"ID"` + Addrs []string `json:"Addrs"` +} + +// extractIPFromMultiaddr extracts the IP address from a multiaddr +func extractIPFromMultiaddr(ma multiaddr.Multiaddr) string { + if ma == nil { + return "" + } + + // Try to convert to net.Addr and extract IP + if addr, err := manet.ToNetAddr(ma); err == nil { + addrStr := addr.String() + // Handle "ip:port" format + if idx := strings.LastIndex(addrStr, ":"); idx > 0 { + return addrStr[:idx] + } + return addrStr + } + + // Fallback: parse manually + parts := strings.Split(ma.String(), "/") + for i, part := range parts { + if (part == "ip4" || part == "ip6") && i+1 < len(parts) { + return parts[i+1] + } + } + + return "" +} + +// UpdateIPFSPeeringConfig updates the Peering.Peers section in IPFS config +func (cm *ClusterConfigManager) UpdateIPFSPeeringConfig(peers []IPFSPeerEntry) error { + // Find IPFS config path + ipfsRepoPath := cm.findIPFSRepoPath() + if ipfsRepoPath == "" { + return fmt.Errorf("could not find IPFS repo path") + } + + configPath := filepath.Join(ipfsRepoPath, "config") + + // Read existing config + data, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("failed to read IPFS config: %w", err) + } + + var config map[string]interface{} + if err := json.Unmarshal(data, &config); err != nil { + return fmt.Errorf("failed to parse IPFS config: %w", err) + } + + // Get or create Peering section + peering, ok := config["Peering"].(map[string]interface{}) + if !ok { + peering = make(map[string]interface{}) + } + + // Get existing peers + existingPeers := []IPFSPeerEntry{} + if existingPeersList, ok := peering["Peers"].([]interface{}); ok { + for _, p := range existingPeersList { + if peerMap, ok := p.(map[string]interface{}); ok { + entry := IPFSPeerEntry{} + if id, ok := peerMap["ID"].(string); ok { + entry.ID = id + } + if addrs, ok := peerMap["Addrs"].([]interface{}); ok { + for _, a := range addrs { + if addr, ok := a.(string); ok { + entry.Addrs = append(entry.Addrs, addr) + } + } + } + if entry.ID != "" { + existingPeers = append(existingPeers, entry) + } + } + } + } + + // Merge new peers with existing (avoid duplicates by ID) + seenIDs := make(map[string]bool) + mergedPeers := []interface{}{} + + // Add existing peers first + for _, p := range existingPeers { + seenIDs[p.ID] = true + mergedPeers = append(mergedPeers, map[string]interface{}{ + "ID": p.ID, + "Addrs": p.Addrs, + }) + } + + // Add new peers + for _, p := range peers { + if !seenIDs[p.ID] { + seenIDs[p.ID] = true + mergedPeers = append(mergedPeers, map[string]interface{}{ + "ID": p.ID, + "Addrs": p.Addrs, + }) + } + } + + // Update config + peering["Peers"] = mergedPeers + config["Peering"] = peering + + // Write back + updatedData, err := json.MarshalIndent(config, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal IPFS config: %w", err) + } + + if err := os.WriteFile(configPath, updatedData, 0600); err != nil { + return fmt.Errorf("failed to write IPFS config: %w", err) + } + + return nil +} + +// findIPFSRepoPath finds the IPFS repository path +func (cm *ClusterConfigManager) findIPFSRepoPath() string { + dataDir := cm.cfg.Node.DataDir + if strings.HasPrefix(dataDir, "~") { + home, _ := os.UserHomeDir() + dataDir = filepath.Join(home, dataDir[1:]) + } + + possiblePaths := []string{ + filepath.Join(dataDir, "ipfs", "repo"), + filepath.Join(dataDir, "node-1", "ipfs", "repo"), + filepath.Join(dataDir, "node-2", "ipfs", "repo"), + filepath.Join(filepath.Dir(dataDir), "ipfs", "repo"), + } + + for _, path := range possiblePaths { + if _, err := os.Stat(filepath.Join(path, "config")); err == nil { + return path + } + } + + return "" +} + func (cm *ClusterConfigManager) getPeerID() (string, error) { dataDir := cm.cfg.Node.DataDir if strings.HasPrefix(dataDir, "~") { diff --git a/pkg/node/gateway.go b/pkg/node/gateway.go index 891808c..1612605 100644 --- a/pkg/node/gateway.go +++ b/pkg/node/gateway.go @@ -2,8 +2,6 @@ package node import ( "context" - "crypto/tls" - "fmt" "net" "net/http" "os" @@ -12,11 +10,13 @@ import ( "github.com/DeBrosOfficial/network/pkg/gateway" "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" - "golang.org/x/crypto/acme" - "golang.org/x/crypto/acme/autocert" + "go.uber.org/zap" ) // startHTTPGateway initializes and starts the full API gateway +// The gateway always runs HTTP on the configured port (default :6001). +// When running with Caddy (nameserver mode), Caddy handles external HTTPS +// and proxies requests to this internal HTTP gateway. func (n *Node) startHTTPGateway(ctx context.Context) error { if !n.config.HTTPGateway.Enabled { n.logger.ComponentInfo(logging.ComponentNode, "HTTP Gateway disabled in config") @@ -43,9 +43,6 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { IPFSClusterAPIURL: n.config.HTTPGateway.IPFSClusterAPIURL, IPFSAPIURL: n.config.HTTPGateway.IPFSAPIURL, IPFSTimeout: n.config.HTTPGateway.IPFSTimeout, - EnableHTTPS: n.config.HTTPGateway.HTTPS.Enabled, - DomainName: n.config.HTTPGateway.HTTPS.Domain, - TLSCacheDir: n.config.HTTPGateway.HTTPS.CacheDir, BaseDomain: n.config.HTTPGateway.BaseDomain, } @@ -55,135 +52,28 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { } n.apiGateway = apiGateway - var certManager *autocert.Manager - if gwCfg.EnableHTTPS && gwCfg.DomainName != "" { - tlsCacheDir := gwCfg.TLSCacheDir - if tlsCacheDir == "" { - tlsCacheDir = "/home/debros/.orama/tls-cache" - } - _ = os.MkdirAll(tlsCacheDir, 0700) - - certManager = &autocert.Manager{ - Prompt: autocert.AcceptTOS, - HostPolicy: autocert.HostWhitelist(gwCfg.DomainName), - Cache: autocert.DirCache(tlsCacheDir), - Email: fmt.Sprintf("admin@%s", gwCfg.DomainName), - Client: &acme.Client{ - DirectoryURL: "https://acme-staging-v02.api.letsencrypt.org/directory", - }, - } - n.certManager = certManager - n.certReady = make(chan struct{}) - } - - httpReady := make(chan struct{}) - go func() { - if gwCfg.EnableHTTPS && gwCfg.DomainName != "" && certManager != nil { - httpsPort := 443 - httpPort := 80 - - httpServer := &http.Server{ - Addr: fmt.Sprintf(":%d", httpPort), - Handler: certManager.HTTPHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - target := fmt.Sprintf("https://%s%s", r.Host, r.URL.RequestURI()) - http.Redirect(w, r, target, http.StatusMovedPermanently) - })), - } - - httpListener, err := net.Listen("tcp", fmt.Sprintf(":%d", httpPort)) - if err != nil { - close(httpReady) - return - } - - go httpServer.Serve(httpListener) - - // Pre-provision cert - certReq := &tls.ClientHelloInfo{ServerName: gwCfg.DomainName} - _, certErr := certManager.GetCertificate(certReq) - - if certErr != nil { - close(httpReady) - httpServer.Handler = apiGateway.Routes() - return - } - - close(httpReady) - - tlsConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, - GetCertificate: certManager.GetCertificate, - } - - httpsServer := &http.Server{ - Addr: fmt.Sprintf(":%d", httpsPort), - TLSConfig: tlsConfig, - Handler: apiGateway.Routes(), - } - n.apiGatewayServer = httpsServer - - ln, err := tls.Listen("tcp", fmt.Sprintf(":%d", httpsPort), tlsConfig) - if err == nil { - httpsServer.Serve(ln) - } - } else { - close(httpReady) - server := &http.Server{ - Addr: gwCfg.ListenAddr, - Handler: apiGateway.Routes(), - } - n.apiGatewayServer = server - ln, err := net.Listen("tcp", gwCfg.ListenAddr) - if err == nil { - server.Serve(ln) - } + server := &http.Server{ + Addr: gwCfg.ListenAddr, + Handler: apiGateway.Routes(), } + n.apiGatewayServer = server + + ln, err := net.Listen("tcp", gwCfg.ListenAddr) + if err != nil { + n.logger.ComponentError(logging.ComponentNode, "Failed to bind HTTP gateway", + zap.String("addr", gwCfg.ListenAddr), zap.Error(err)) + return + } + + n.logger.ComponentInfo(logging.ComponentNode, "HTTP gateway started", + zap.String("addr", gwCfg.ListenAddr)) + server.Serve(ln) }() - // SNI Gateway - if n.config.HTTPGateway.SNI.Enabled && n.certManager != nil { - go n.startSNIGateway(ctx, httpReady) - } - return nil } -func (n *Node) startSNIGateway(ctx context.Context, httpReady <-chan struct{}) { - <-httpReady - domain := n.config.HTTPGateway.HTTPS.Domain - if domain == "" { - return - } - - certReq := &tls.ClientHelloInfo{ServerName: domain} - tlsCert, err := n.certManager.GetCertificate(certReq) - if err != nil { - return - } - - tlsCacheDir := n.config.HTTPGateway.HTTPS.CacheDir - if tlsCacheDir == "" { - tlsCacheDir = "/home/debros/.orama/tls-cache" - } - - certPath := filepath.Join(tlsCacheDir, domain+".crt") - keyPath := filepath.Join(tlsCacheDir, domain+".key") - - if err := extractPEMFromTLSCert(tlsCert, certPath, keyPath); err == nil { - if n.certReady != nil { - close(n.certReady) - } - } - - sniCfg := n.config.HTTPGateway.SNI - sniGateway, err := gateway.NewTCPSNIGateway(n.logger, &sniCfg) - if err == nil { - n.sniGateway = sniGateway - sniGateway.Start(ctx) - } -} - // startIPFSClusterConfig initializes and ensures IPFS Cluster configuration func (n *Node) startIPFSClusterConfig() error { n.logger.ComponentInfo(logging.ComponentNode, "Initializing IPFS Cluster configuration") @@ -202,4 +92,3 @@ func (n *Node) startIPFSClusterConfig() error { _ = cm.RepairPeerConfiguration() return nil } - diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go index b63047a..5ad7772 100644 --- a/pkg/node/monitoring.go +++ b/pkg/node/monitoring.go @@ -184,16 +184,18 @@ func (n *Node) GetDiscoveryStatus() map[string]interface{} { // Unlike nodes which need extensive monitoring, clients only need basic health checks. func (n *Node) startConnectionMonitoring() { go func() { - ticker := time.NewTicker(30 * time.Second) // Less frequent than nodes (60s vs 30s) + ticker := time.NewTicker(30 * time.Second) // Ticks every 30 seconds defer ticker.Stop() var lastPeerCount int firstCheck := true + tickCount := 0 for range ticker.C { if n.host == nil { return } + tickCount++ // Get current peer count peers := n.host.Network().Peers() @@ -217,9 +219,9 @@ func (n *Node) startConnectionMonitoring() { // This discovers all cluster peers and updates peer_addresses in service.json // so IPFS Cluster can automatically connect to all discovered peers if n.clusterConfigManager != nil { - // First try to discover from LibP2P connections (works even if cluster peers aren't connected yet) - // This runs every minute to discover peers automatically via LibP2P discovery - if time.Now().Unix()%60 == 0 { + // Discover from LibP2P connections every 2 ticks (once per minute) + // Works even if cluster peers aren't connected yet + if tickCount%2 == 0 { if err := n.clusterConfigManager.DiscoverClusterPeersFromLibP2P(n.host); err != nil { n.logger.ComponentWarn(logging.ComponentNode, "Failed to discover cluster peers from LibP2P", zap.Error(err)) } else { @@ -227,9 +229,9 @@ func (n *Node) startConnectionMonitoring() { } } - // Also try to update from cluster API (works once peers are connected) - // Update all cluster peers every 2 minutes to discover new peers - if time.Now().Unix()%120 == 0 { + // Update from cluster API every 4 ticks (once per 2 minutes) + // Works once peers are already connected + if tickCount%4 == 0 { if err := n.clusterConfigManager.UpdateAllClusterPeers(); err != nil { n.logger.ComponentWarn(logging.ComponentNode, "Failed to update cluster peers during monitoring", zap.Error(err)) } else { diff --git a/pkg/node/node.go b/pkg/node/node.go index 4c07338..62d069d 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -18,7 +18,6 @@ import ( database "github.com/DeBrosOfficial/network/pkg/rqlite" "github.com/libp2p/go-libp2p/core/host" "go.uber.org/zap" - "golang.org/x/crypto/acme/autocert" ) // Node represents a network node with RQLite database @@ -44,17 +43,8 @@ type Node struct { clusterConfigManager *ipfs.ClusterConfigManager // Full gateway (for API, auth, pubsub, and internal service routing) - apiGateway *gateway.Gateway + apiGateway *gateway.Gateway apiGatewayServer *http.Server - - // SNI gateway (for TCP routing of raft, ipfs, olric, etc.) - sniGateway *gateway.TCPSNIGateway - - // Shared certificate manager for HTTPS and SNI - certManager *autocert.Manager - - // Certificate ready signal - closed when TLS certificates are extracted and ready for use - certReady chan struct{} } // NewNode creates a new network node @@ -156,13 +146,6 @@ func (n *Node) Stop() error { n.apiGateway.Close() } - // Stop SNI Gateway - if n.sniGateway != nil { - if err := n.sniGateway.Stop(); err != nil { - n.logger.ComponentWarn(logging.ComponentNode, "SNI Gateway stop error", zap.Error(err)) - } - } - // Stop cluster discovery if n.clusterDiscovery != nil { n.clusterDiscovery.Stop() diff --git a/pkg/node/rqlite.go b/pkg/node/rqlite.go index 8e5523d..359e235 100644 --- a/pkg/node/rqlite.go +++ b/pkg/node/rqlite.go @@ -5,8 +5,6 @@ import ( "fmt" database "github.com/DeBrosOfficial/network/pkg/rqlite" - "go.uber.org/zap" - "time" ) // startRQLite initializes and starts the RQLite database @@ -55,25 +53,6 @@ func (n *Node) startRQLite(ctx context.Context) error { n.logger.Info("Cluster discovery service started (waiting for RQLite)") } - // If node-to-node TLS is configured, wait for certificates to be provisioned - // This ensures RQLite can start with TLS when joining through the SNI gateway - if n.config.Database.NodeCert != "" && n.config.Database.NodeKey != "" && n.certReady != nil { - n.logger.Info("RQLite node TLS configured, waiting for certificates to be provisioned...", - zap.String("node_cert", n.config.Database.NodeCert), - zap.String("node_key", n.config.Database.NodeKey)) - - // Wait for certificate ready signal with timeout - certTimeout := 5 * time.Minute - select { - case <-n.certReady: - n.logger.Info("Certificates ready, proceeding with RQLite startup") - case <-time.After(certTimeout): - return fmt.Errorf("timeout waiting for TLS certificates after %v - ensure HTTPS is configured and ports 80/443 are accessible for ACME challenges", certTimeout) - case <-ctx.Done(): - return fmt.Errorf("context cancelled while waiting for certificates: %w", ctx.Err()) - } - } - // Start RQLite FIRST before updating metadata if err := n.rqliteManager.Start(ctx); err != nil { return err diff --git a/testdata/apps/go-backend/app b/testdata/apps/go-backend/app index 16a5dde..34e7ec0 100755 Binary files a/testdata/apps/go-backend/app and b/testdata/apps/go-backend/app differ diff --git a/testdata/apps/go-backend/main.go b/testdata/apps/go-backend/main.go index 8a75b26..31469ee 100644 --- a/testdata/apps/go-backend/main.go +++ b/testdata/apps/go-backend/main.go @@ -59,13 +59,16 @@ func executeSQL(query string, args ...interface{}) ([]map[string]interface{}, er } // Build the query with parameters + // Gateway expects: database_name, query (not sql), params reqBody := map[string]interface{}{ - "sql": query, - "params": args, + "database_name": databaseName, + "query": query, + "params": args, } bodyBytes, _ := json.Marshal(reqBody) - url := fmt.Sprintf("%s/v1/db/%s/query", gatewayURL, databaseName) + // Gateway endpoint is /v1/db/sqlite/query (not /v1/db/{name}/query) + url := fmt.Sprintf("%s/v1/db/sqlite/query", gatewayURL) req, err := http.NewRequest("POST", url, bytes.NewBuffer(bodyBytes)) if err != nil { return nil, err @@ -87,15 +90,34 @@ func executeSQL(query string, args ...interface{}) ([]map[string]interface{}, er return nil, fmt.Errorf("database error: %s", string(body)) } + // Gateway returns: columns []string, rows [][]interface{} + // We need to convert rows to []map[string]interface{} var result struct { - Rows []map[string]interface{} `json:"rows"` - Columns []string `json:"columns"` + Rows [][]interface{} `json:"rows"` + Columns []string `json:"columns"` + Error string `json:"error,omitempty"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, err } - return result.Rows, nil + if result.Error != "" { + return nil, fmt.Errorf("query error: %s", result.Error) + } + + // Convert [][]interface{} to []map[string]interface{} + rows := make([]map[string]interface{}, 0, len(result.Rows)) + for _, row := range result.Rows { + rowMap := make(map[string]interface{}) + for i, col := range result.Columns { + if i < len(row) { + rowMap[col] = row[i] + } + } + rows = append(rows, rowMap) + } + + return rows, nil } // initDatabase creates the users table if it doesn't exist