From b983066016fc04a8b96733ec3ae112f29a84d6a3 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Wed, 26 Nov 2025 15:36:11 +0200 Subject: [PATCH] refactor: rename DeBros to Orama and update configuration paths - Replaced all instances of DeBros with Orama throughout the codebase, including CLI commands and configuration paths. - Updated documentation to reflect the new naming convention and paths for configuration files. - Removed the outdated PRODUCTION_INSTALL.md file and added new scripts for local domain setup and testing. - Introduced a new interactive TUI installer for Orama Network, enhancing the installation experience. - Improved logging and error handling across various components to provide clearer feedback during operations. --- CHANGELOG.md | 20 +++ Makefile | 2 +- README.md | 27 ++-- cmd/gateway/config.go | 16 +- cmd/node/main.go | 10 +- e2e/env.go | 28 ++-- pkg/cli/basic_commands.go | 7 +- pkg/cli/dev_commands.go | 34 ++--- pkg/cli/prod_commands.go | 55 ++++--- pkg/client/client.go | 40 ++--- pkg/client/connect_bootstrap.go | 12 +- pkg/client/defaults.go | 4 +- pkg/client/defaults_test.go | 6 +- pkg/client/interface.go | 2 +- pkg/config/config.go | 61 ++++---- pkg/config/paths.go | 2 +- pkg/config/validate.go | 73 ++------- pkg/config/validate_test.go | 99 +++--------- pkg/discovery/discovery.go | 4 +- pkg/discovery/rqlite_metadata.go | 2 +- pkg/environments/development/config.go | 34 +++-- pkg/environments/development/runner.go | 4 +- pkg/environments/development/topology.go | 74 ++++----- pkg/environments/production/config.go | 34 ++--- pkg/environments/production/orchestrator.go | 30 ++-- pkg/environments/templates/node.yaml | 1 - pkg/gateway/gateway.go | 14 +- pkg/gateway/http_gateway.go | 3 +- pkg/gateway/http_helpers.go | 32 ++-- pkg/gateway/https.go | 9 +- pkg/gateway/pubsub_handlers.go | 42 ++--- pkg/gateway/push_notifications.go | 1 - pkg/gateway/tcp_sni_gateway.go | 1 - pkg/ipfs/cluster.go | 149 +++++++++--------- pkg/node/monitoring.go | 12 +- pkg/node/node.go | 161 +++++++++----------- pkg/node/node_test.go | 44 +++--- pkg/rqlite/cluster_discovery.go | 29 ++-- pkg/rqlite/metrics.go | 23 ++- pkg/rqlite/rqlite.go | 26 ++-- 40 files changed, 562 insertions(+), 665 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35dba71..b16ec7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,26 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.69.22] - 2025-11-26 + +### Added +- Added 'Peer connection status' to the health check list in the README. + +### Changed +- Unified development environment nodes, renaming 'bootstrap', 'bootstrap2', 'node2', 'node3', 'node4' to 'node-1' through 'node-5'. +- Renamed internal configuration fields and CLI flags from 'bootstrap peers' to 'peers' for consistency across the unified node architecture. +- Updated development environment configuration files and data directories to use the unified 'node-N' naming scheme (e.g., `node-1.yaml`, `data/node-1`). +- Changed the default main gateway port in the development environment from 6001 to 6000, reserving 6001-6005 for individual node gateways. +- Removed the explicit 'node.type' configuration field (bootstrap/node) as all nodes now use a unified configuration. +- Improved RQLite cluster joining logic to prioritize joining the most up-to-date peer (highest Raft log index) instead of prioritizing 'bootstrap' nodes. + +### Deprecated + +### Removed + +### Fixed +- Fixed migration logic to correctly handle the transition from old unified data directories to the new 'node-1' structure. + ## [0.69.21] - 2025-11-26 ### Added diff --git a/Makefile b/Makefile index 2c369a7..63812d4 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.69.21 +VERSION := 0.69.22 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/README.md b/README.md index 6972d18..54a2f2a 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ The cluster automatically performs health checks before declaring success. Check - Node unified gateway ports (6001-6005) - IPFS API endpoints - Olric cache server +- Peer connection status - Example curl commands ### Stop Development Environment @@ -46,23 +47,23 @@ After running `make dev`, test service health using these curl requests: Each node is accessible via a single unified gateway port: ```bash -# Bootstrap (port 6001) +# Node-1 (port 6001) curl http://node-1.local:6001/health curl http://node-1.local:6001/rqlite/http/db/execute -H "Content-Type: application/json" -d '{"sql":"SELECT 1"}' curl http://node-1.local:6001/cluster/health curl http://node-1.local:6001/ipfs/api/v0/version -# Bootstrap2 (port 6002) +# Node-2 (port 6002) curl http://node-2.local:6002/health curl http://node-2.local:6002/rqlite/http/db/execute -H "Content-Type: application/json" -d '{"sql":"SELECT 1"}' -# Node2 (port 6003) +# Node-3 (port 6003) curl http://node-3.local:6003/health -# Node3 (port 6004) +# Node-4 (port 6004) curl http://node-4.local:6004/health -# Node4 (port 6005) +# Node-5 (port 6005) curl http://node-5.local:6005/health ``` @@ -111,11 +112,11 @@ curl http://localhost:3320/stats ### Unified Gateway Ports ``` -Bootstrap: localhost:6001 → /rqlite/http, /rqlite/raft, /cluster, /ipfs/api -Bootstrap2: localhost:6002 → Same routes -Node2: localhost:6003 → Same routes -Node3: localhost:6004 → Same routes -Node4: localhost:6005 → Same routes +Node-1: localhost:6001 → /rqlite/http, /rqlite/raft, /cluster, /ipfs/api +Node-2: localhost:6002 → Same routes +Node-3: localhost:6003 → Same routes +Node-4: localhost:6004 → Same routes +Node-5: localhost:6005 → Same routes ``` ### Direct Service Ports (for debugging) @@ -126,7 +127,7 @@ RQLite Raft: 7001, 7002, 7003, 7004, 7005 IPFS API: 4501, 4502, 4503, 4504, 4505 IPFS Swarm: 4101, 4102, 4103, 4104, 4105 Cluster API: 9094, 9104, 9114, 9124, 9134 -Main Gateway: 6001 +Internal Gateway: 6000 Olric Cache: 3320 Anon SOCKS: 9050 ``` @@ -134,14 +135,14 @@ Anon SOCKS: 9050 ## Development Commands ```bash -# Start full cluster +# Start full cluster (5 nodes + gateway) make dev # Check service status orama dev status # View logs -orama dev logs node-1 # Node 1 logs +orama dev logs node-1 # Node-1 logs orama dev logs node-1 --follow # Follow logs in real-time orama dev logs gateway --follow # Gateway logs diff --git a/cmd/gateway/config.go b/cmd/gateway/config.go index c1bc530..639a84b 100644 --- a/cmd/gateway/config.go +++ b/cmd/gateway/config.go @@ -77,7 +77,7 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { ListenAddr string `yaml:"listen_addr"` ClientNamespace string `yaml:"client_namespace"` RQLiteDSN string `yaml:"rqlite_dsn"` - BootstrapPeers []string `yaml:"bootstrap_peers"` + Peers []string `yaml:"bootstrap_peers"` EnableHTTPS bool `yaml:"enable_https"` DomainName string `yaml:"domain_name"` TLSCacheDir string `yaml:"tls_cache_dir"` @@ -133,16 +133,16 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { if v := strings.TrimSpace(y.RQLiteDSN); v != "" { cfg.RQLiteDSN = v } - if len(y.BootstrapPeers) > 0 { - var bp []string - for _, p := range y.BootstrapPeers { + if len(y.Peers) > 0 { + var peers []string + for _, p := range y.Peers { p = strings.TrimSpace(p) if p != "" { - bp = append(bp, p) + peers = append(peers, p) } } - if len(bp) > 0 { - cfg.BootstrapPeers = bp + if len(peers) > 0 { + cfg.BootstrapPeers = peers } } @@ -205,7 +205,7 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { zap.String("path", configPath), zap.String("addr", cfg.ListenAddr), zap.String("namespace", cfg.ClientNamespace), - zap.Int("bootstrap_peer_count", len(cfg.BootstrapPeers)), + zap.Int("peer_count", len(cfg.BootstrapPeers)), ) return cfg diff --git a/cmd/node/main.go b/cmd/node/main.go index 8213127..98e2568 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -102,8 +102,8 @@ func select_data_dir_check(configName *string) { fmt.Fprintf(os.Stderr, "\n❌ Configuration Error:\n") fmt.Fprintf(os.Stderr, "Config file not found at %s\n", configPath) fmt.Fprintf(os.Stderr, "\nGenerate it with one of:\n") - fmt.Fprintf(os.Stderr, " dbn config init --type bootstrap\n") - fmt.Fprintf(os.Stderr, " dbn config init --type node --bootstrap-peers ''\n") + fmt.Fprintf(os.Stderr, " orama config init --type node\n") + fmt.Fprintf(os.Stderr, " orama config init --type node --peers ''\n") os.Exit(1) } } @@ -135,7 +135,7 @@ func startNode(ctx context.Context, cfg *config.Config, port int) error { } } - // Save the peer ID to a file for CLI access (especially useful for bootstrap) + // Save the peer ID to a file for CLI access peerID := n.GetPeerID() peerInfoFile := filepath.Join(dataDir, "peer.info") @@ -163,7 +163,7 @@ func startNode(ctx context.Context, cfg *config.Config, port int) error { logger.Error("Failed to save peer info: %v", zap.Error(err)) } else { logger.Info("Peer info saved to: %s", zap.String("path", peerInfoFile)) - logger.Info("Bootstrap multiaddr: %s", zap.String("path", peerMultiaddr)) + logger.Info("Peer multiaddr: %s", zap.String("path", peerMultiaddr)) } logger.Info("Node started successfully") @@ -316,7 +316,7 @@ func main() { zap.Strings("listen_addresses", cfg.Node.ListenAddresses), zap.Int("rqlite_http_port", cfg.Database.RQLitePort), zap.Int("rqlite_raft_port", cfg.Database.RQLiteRaftPort), - zap.Strings("bootstrap_peers", cfg.Discovery.BootstrapPeers), + zap.Strings("peers", cfg.Discovery.BootstrapPeers), zap.String("rqlite_join_address", cfg.Database.RQLiteJoinAddress), zap.String("data_directory", cfg.Node.DataDir)) diff --git a/e2e/env.go b/e2e/env.go index 390c042..e9fd8f8 100644 --- a/e2e/env.go +++ b/e2e/env.go @@ -55,7 +55,7 @@ func loadGatewayConfig() (map[string]interface{}, error) { return cfg, nil } -// loadNodeConfig loads node configuration from ~/.orama/node.yaml or bootstrap.yaml +// loadNodeConfig loads node configuration from ~/.orama/node-*.yaml func loadNodeConfig(filename string) (map[string]interface{}, error) { configPath, err := config.DefaultPath(filename) if err != nil { @@ -111,8 +111,8 @@ func GetRQLiteNodes() []string { } cacheMutex.RUnlock() - // Try bootstrap.yaml first, then all node variants - for _, cfgFile := range []string{"bootstrap.yaml", "bootstrap2.yaml", "node.yaml", "node2.yaml", "node3.yaml", "node4.yaml"} { + // Try all node config files + for _, cfgFile := range []string{"node-1.yaml", "node-2.yaml", "node-3.yaml", "node-4.yaml", "node-5.yaml"} { nodeCfg, err := loadNodeConfig(cfgFile) if err != nil { continue @@ -141,13 +141,13 @@ func queryAPIKeyFromRQLite() (string, error) { return "", fmt.Errorf("failed to get home directory: %w", err) } - // Try bootstrap first, then all nodes + // Try all node data directories dbPaths := []string{ - filepath.Join(homeDir, ".orama", "bootstrap", "rqlite", "db.sqlite"), - filepath.Join(homeDir, ".orama", "bootstrap2", "rqlite", "db.sqlite"), - filepath.Join(homeDir, ".orama", "node2", "rqlite", "db.sqlite"), - filepath.Join(homeDir, ".orama", "node3", "rqlite", "db.sqlite"), - filepath.Join(homeDir, ".orama", "node4", "rqlite", "db.sqlite"), + filepath.Join(homeDir, ".orama", "data", "node-1", "rqlite", "db.sqlite"), + filepath.Join(homeDir, ".orama", "data", "node-2", "rqlite", "db.sqlite"), + filepath.Join(homeDir, ".orama", "data", "node-3", "rqlite", "db.sqlite"), + filepath.Join(homeDir, ".orama", "data", "node-4", "rqlite", "db.sqlite"), + filepath.Join(homeDir, ".orama", "data", "node-5", "rqlite", "db.sqlite"), } for _, dbPath := range dbPaths { @@ -221,7 +221,7 @@ func GetBootstrapPeers() []string { } cacheMutex.RUnlock() - configFiles := []string{"bootstrap.yaml", "bootstrap2.yaml", "node.yaml", "node2.yaml", "node3.yaml", "node4.yaml"} + configFiles := []string{"node-1.yaml", "node-2.yaml", "node-3.yaml", "node-4.yaml", "node-5.yaml"} seen := make(map[string]struct{}) var peers []string @@ -272,7 +272,7 @@ func GetIPFSClusterURL() string { cacheMutex.RUnlock() // Try to load from node config - for _, cfgFile := range []string{"bootstrap.yaml", "bootstrap2.yaml", "node.yaml", "node2.yaml", "node3.yaml", "node4.yaml"} { + for _, cfgFile := range []string{"node-1.yaml", "node-2.yaml", "node-3.yaml", "node-4.yaml", "node-5.yaml"} { nodeCfg, err := loadNodeConfig(cfgFile) if err != nil { continue @@ -304,7 +304,7 @@ func GetIPFSAPIURL() string { cacheMutex.RUnlock() // Try to load from node config - for _, cfgFile := range []string{"bootstrap.yaml", "bootstrap2.yaml", "node.yaml", "node2.yaml", "node3.yaml", "node4.yaml"} { + for _, cfgFile := range []string{"node-1.yaml", "node-2.yaml", "node-3.yaml", "node-4.yaml", "node-5.yaml"} { nodeCfg, err := loadNodeConfig(cfgFile) if err != nil { continue @@ -329,7 +329,7 @@ func GetIPFSAPIURL() string { // GetClientNamespace returns the test client namespace from config func GetClientNamespace() string { // Try to load from node config - for _, cfgFile := range []string{"bootstrap.yaml", "bootstrap2.yaml", "node.yaml", "node2.yaml", "node3.yaml", "node4.yaml"} { + for _, cfgFile := range []string{"node-1.yaml", "node-2.yaml", "node-3.yaml", "node-4.yaml", "node-5.yaml"} { nodeCfg, err := loadNodeConfig(cfgFile) if err != nil { continue @@ -562,7 +562,7 @@ func CleanupDatabaseTable(t *testing.T, tableName string) { return } - dbPath := filepath.Join(homeDir, ".orama", "bootstrap", "rqlite", "db.sqlite") + dbPath := filepath.Join(homeDir, ".orama", "data", "node-1", "rqlite", "db.sqlite") db, err := sql.Open("sqlite3", dbPath) if err != nil { t.Logf("warning: failed to open database for cleanup: %v", err) diff --git a/pkg/cli/basic_commands.go b/pkg/cli/basic_commands.go index ade1ecf..968e657 100644 --- a/pkg/cli/basic_commands.go +++ b/pkg/cli/basic_commands.go @@ -249,14 +249,11 @@ func createClient() (client.NetworkClient, error) { gatewayURL := getGatewayURL() config.GatewayURL = gatewayURL - // Try to get bootstrap peers from active environment - // For now, we'll use the default bootstrap peers from config - // In the future, environments could specify their own bootstrap peers + // Try to get peer configuration from active environment env, err := GetActiveEnvironment() if err == nil && env != nil { // Environment loaded successfully - gateway URL already set above - // Bootstrap peers could be added to Environment struct in the future - _ = env // Use env if we add bootstrap peers to it + _ = env // Reserve for future peer configuration } // Check for existing credentials using enhanced authentication diff --git a/pkg/cli/dev_commands.go b/pkg/cli/dev_commands.go index ab5787e..2d289af 100644 --- a/pkg/cli/dev_commands.go +++ b/pkg/cli/dev_commands.go @@ -40,18 +40,18 @@ func HandleDevCommand(args []string) { func showDevHelp() { fmt.Printf("🚀 Development Environment Commands\n\n") - fmt.Printf("Usage: dbn dev [options]\n\n") + fmt.Printf("Usage: orama dev [options]\n\n") fmt.Printf("Subcommands:\n") - fmt.Printf(" up - Start development environment (2 bootstraps + 3 nodes + gateway)\n") + fmt.Printf(" up - Start development environment (5 nodes + gateway)\n") fmt.Printf(" down - Stop all development services\n") fmt.Printf(" status - Show status of running services\n") fmt.Printf(" logs - Tail logs for a component\n") fmt.Printf(" help - Show this help\n\n") fmt.Printf("Examples:\n") - fmt.Printf(" dbn dev up\n") - fmt.Printf(" dbn dev down\n") - fmt.Printf(" dbn dev status\n") - fmt.Printf(" dbn dev logs bootstrap --follow\n") + fmt.Printf(" orama dev up\n") + fmt.Printf(" orama dev down\n") + fmt.Printf(" orama dev status\n") + fmt.Printf(" orama dev logs node-1 --follow\n") } func handleDevUp(args []string) { @@ -108,18 +108,18 @@ func handleDevUp(args []string) { fmt.Printf("🎉 Development environment is running!\n\n") fmt.Printf("Key endpoints:\n") fmt.Printf(" Gateway: http://localhost:6001\n") - fmt.Printf(" Bootstrap IPFS: http://localhost:4501\n") - fmt.Printf(" Bootstrap2 IPFS: http://localhost:4511\n") - fmt.Printf(" Node2 IPFS: http://localhost:4502\n") - fmt.Printf(" Node3 IPFS: http://localhost:4503\n") - fmt.Printf(" Node4 IPFS: http://localhost:4504\n") + fmt.Printf(" Node-1 IPFS: http://localhost:4501\n") + fmt.Printf(" Node-2 IPFS: http://localhost:4502\n") + fmt.Printf(" Node-3 IPFS: http://localhost:4503\n") + fmt.Printf(" Node-4 IPFS: http://localhost:4504\n") + fmt.Printf(" Node-5 IPFS: http://localhost:4505\n") fmt.Printf(" Anon SOCKS: 127.0.0.1:9050\n") fmt.Printf(" Olric Cache: http://localhost:3320\n\n") fmt.Printf("Useful commands:\n") - fmt.Printf(" dbn dev status - Show status\n") - fmt.Printf(" dbn dev logs bootstrap - Bootstrap logs\n") - fmt.Printf(" dbn dev logs bootstrap2 - Bootstrap2 logs\n") - fmt.Printf(" dbn dev down - Stop all services\n\n") + fmt.Printf(" orama dev status - Show status\n") + fmt.Printf(" orama dev logs node-1 - Node-1 logs\n") + fmt.Printf(" orama dev logs node-2 - Node-2 logs\n") + fmt.Printf(" orama dev down - Stop all services\n\n") fmt.Printf("Logs directory: %s/logs\n\n", oramaDir) } @@ -138,7 +138,7 @@ func handleDevDown(args []string) { fmt.Fprintf(os.Stderr, "⚠️ Error stopping services: %v\n", err) os.Exit(1) } - + fmt.Printf("✅ All services have been stopped\n\n") } @@ -159,7 +159,7 @@ func handleDevStatus(args []string) { func handleDevLogs(args []string) { if len(args) == 0 { fmt.Fprintf(os.Stderr, "Usage: dbn dev logs [--follow]\n") - fmt.Fprintf(os.Stderr, "\nComponents: bootstrap, bootstrap2, node2, node3, node4, gateway, ipfs-bootstrap, ipfs-bootstrap2, ipfs-node2, ipfs-node3, ipfs-node4, olric, anon\n") + fmt.Fprintf(os.Stderr, "\nComponents: node-1, node-2, node-3, node-4, node-5, gateway, ipfs-node-1, ipfs-node-2, ipfs-node-3, ipfs-node-4, ipfs-node-5, olric, anon\n") os.Exit(1) } diff --git a/pkg/cli/prod_commands.go b/pkg/cli/prod_commands.go index 6ef5a30..5e7551e 100644 --- a/pkg/cli/prod_commands.go +++ b/pkg/cli/prod_commands.go @@ -49,8 +49,8 @@ func runInteractiveInstaller() { handleProdInstall(args) } -// normalizeBootstrapPeers normalizes and validates bootstrap peer multiaddrs -func normalizeBootstrapPeers(peersStr string) ([]string, error) { +// normalizePeers normalizes and validates peer multiaddrs +func normalizePeers(peersStr string) ([]string, error) { if peersStr == "" { return nil, nil } @@ -139,7 +139,7 @@ func showProdHelp() { fmt.Printf(" --restart - Automatically restart services after upgrade\n") fmt.Printf(" --branch BRANCH - Git branch to use (main or nightly)\n") fmt.Printf(" --no-pull - Skip git clone/pull, use existing source\n") - fmt.Printf(" migrate - Migrate from old bootstrap/node setup (requires root/sudo)\n") + fmt.Printf(" migrate - Migrate from old unified setup (requires root/sudo)\n") fmt.Printf(" Options:\n") fmt.Printf(" --dry-run - Show what would be migrated without making changes\n") fmt.Printf(" status - Show status of production services\n") @@ -204,7 +204,7 @@ func handleProdInstall(args []string) { } // Normalize and validate peers - peers, err := normalizeBootstrapPeers(*peersStr) + peers, err := normalizePeers(*peersStr) if err != nil { fmt.Fprintf(os.Stderr, "❌ Invalid peers: %v\n", err) fmt.Fprintf(os.Stderr, " Example: --peers /ip4/10.0.0.1/tcp/4001/p2p/Qm...,/ip4/10.0.0.2/tcp/4001/p2p/Qm...\n") @@ -218,12 +218,12 @@ func handleProdInstall(args []string) { } // Validate VPS IP is provided - if *vpsIP == "" { + if *vpsIP == "" { fmt.Fprintf(os.Stderr, "❌ --vps-ip is required\n") fmt.Fprintf(os.Stderr, " Usage: sudo orama install --vps-ip \n") fmt.Fprintf(os.Stderr, " Or run: sudo orama install --interactive\n") - os.Exit(1) - } + os.Exit(1) + } // Determine if this is the first node (creates new cluster) or joining existing cluster isFirstNode := len(peers) == 0 && *joinAddress == "" @@ -485,18 +485,18 @@ func handleProdUpgrade(args []string) { domain := "" // Helper function to extract multiaddr list from config - extractBootstrapPeers := func(configPath string) []string { + extractPeers := func(configPath string) []string { var peers []string if data, err := os.ReadFile(configPath); err == nil { configStr := string(data) - inBootstrapPeers := false + inPeersList := false for _, line := range strings.Split(configStr, "\n") { trimmed := strings.TrimSpace(line) - if strings.HasPrefix(trimmed, "bootstrap_peers:") || strings.HasPrefix(trimmed, "bootstrap peers:") { - inBootstrapPeers = true + if strings.HasPrefix(trimmed, "bootstrap_peers:") || strings.HasPrefix(trimmed, "peers:") { + inPeersList = true continue } - if inBootstrapPeers { + if inPeersList { if strings.HasPrefix(trimmed, "-") { // Extract multiaddr after the dash parts := strings.SplitN(trimmed, "-", 2) @@ -508,7 +508,7 @@ func handleProdUpgrade(args []string) { } } } else if trimmed == "" || !strings.HasPrefix(trimmed, "-") { - // End of bootstrap_peers list + // End of peers list break } } @@ -522,7 +522,7 @@ func handleProdUpgrade(args []string) { nodeConfigPath := filepath.Join(oramaDir, "configs", "node.yaml") // Extract peers from existing node config - peers := extractBootstrapPeers(nodeConfigPath) + peers := extractPeers(nodeConfigPath) // Extract VPS IP and join address from advertise addresses vpsIP := "" @@ -1370,7 +1370,7 @@ func handleProdUninstall() { fmt.Printf(" To remove all data: rm -rf /home/debros/.orama\n\n") } -// handleProdMigrate migrates from old bootstrap/node setup to unified node setup +// handleProdMigrate migrates from old unified setup to new unified setup func handleProdMigrate(args []string) { // Parse flags fs := flag.NewFlagSet("migrate", flag.ContinueOnError) @@ -1396,17 +1396,14 @@ func handleProdMigrate(args []string) { // Check for old-style installations oldDataDirs := []string{ - filepath.Join(oramaDir, "data", "bootstrap"), + filepath.Join(oramaDir, "data", "node-1"), filepath.Join(oramaDir, "data", "node"), } oldServices := []string{ - "debros-ipfs-bootstrap", - "debros-ipfs-node", - "debros-ipfs-cluster-bootstrap", - "debros-ipfs-cluster-node", - "debros-node-bootstrap", - "debros-node-node", + "debros-ipfs", + "debros-ipfs-cluster", + "debros-node", } oldConfigs := []string{ @@ -1466,10 +1463,10 @@ func handleProdMigrate(args []string) { newDataDir := filepath.Join(oramaDir, "data") fmt.Printf("\n Migrating data directories...\n") - // Prefer bootstrap data if it exists, otherwise use node data + // Prefer node-1 data if it exists, otherwise use node data sourceDir := "" - if _, err := os.Stat(filepath.Join(oramaDir, "data", "bootstrap")); err == nil { - sourceDir = filepath.Join(oramaDir, "data", "bootstrap") + if _, err := os.Stat(filepath.Join(oramaDir, "data", "node-1")); err == nil { + sourceDir = filepath.Join(oramaDir, "data", "node-1") } else if _, err := os.Stat(filepath.Join(oramaDir, "data", "node")); err == nil { sourceDir = filepath.Join(oramaDir, "data", "node") } @@ -1497,15 +1494,15 @@ func handleProdMigrate(args []string) { // Migrate config files fmt.Printf("\n Migrating config files...\n") - oldBootstrapConfig := filepath.Join(oramaDir, "configs", "bootstrap.yaml") + oldNodeConfig := filepath.Join(oramaDir, "configs", "bootstrap.yaml") newNodeConfig := filepath.Join(oramaDir, "configs", "node.yaml") - if _, err := os.Stat(oldBootstrapConfig); err == nil { + if _, err := os.Stat(oldNodeConfig); err == nil { if _, err := os.Stat(newNodeConfig); os.IsNotExist(err) { - if err := os.Rename(oldBootstrapConfig, newNodeConfig); err == nil { + if err := os.Rename(oldNodeConfig, newNodeConfig); err == nil { fmt.Printf(" ✓ Renamed bootstrap.yaml → node.yaml\n") } } else { - os.Remove(oldBootstrapConfig) + os.Remove(oldNodeConfig) fmt.Printf(" ✓ Removed old bootstrap.yaml (node.yaml already exists)\n") } } diff --git a/pkg/client/client.go b/pkg/client/client.go index 8a2aa45..d5ca094 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -195,49 +195,49 @@ func (c *Client) Connect() error { c.pubsub = &pubSubBridge{client: c, adapter: adapter} c.logger.Info("Pubsub bridge created successfully") - c.logger.Info("Starting bootstrap peer connections...") + c.logger.Info("Starting peer connections...") - // Connect to bootstrap peers FIRST + // Connect to peers FIRST ctx, cancel := context.WithTimeout(context.Background(), c.config.ConnectTimeout) defer cancel() - bootstrapPeersConnected := 0 - for _, bootstrapAddr := range c.config.BootstrapPeers { - c.logger.Info("Attempting to connect to bootstrap peer", zap.String("addr", bootstrapAddr)) - if err := c.connectToBootstrap(ctx, bootstrapAddr); err != nil { - c.logger.Warn("Failed to connect to bootstrap peer", - zap.String("addr", bootstrapAddr), + peersConnected := 0 + for _, peerAddr := range c.config.BootstrapPeers { + c.logger.Info("Attempting to connect to peer", zap.String("addr", peerAddr)) + if err := c.connectToPeer(ctx, peerAddr); err != nil { + c.logger.Warn("Failed to connect to peer", + zap.String("addr", peerAddr), zap.Error(err)) continue } - bootstrapPeersConnected++ - c.logger.Info("Successfully connected to bootstrap peer", zap.String("addr", bootstrapAddr)) + peersConnected++ + c.logger.Info("Successfully connected to peer", zap.String("addr", peerAddr)) } - if bootstrapPeersConnected == 0 { - c.logger.Warn("No bootstrap peers connected, continuing anyway") + if peersConnected == 0 { + c.logger.Warn("No peers connected, continuing anyway") } else { - c.logger.Info("Bootstrap peer connections completed", zap.Int("connected_count", bootstrapPeersConnected)) + c.logger.Info("Peer connections completed", zap.Int("connected_count", peersConnected)) } - c.logger.Info("Adding bootstrap peers to peerstore...") + c.logger.Info("Adding peers to peerstore...") - // Add bootstrap peers to peerstore so we can connect to them later - for _, bootstrapAddr := range c.config.BootstrapPeers { - if ma, err := multiaddr.NewMultiaddr(bootstrapAddr); err == nil { + // Add peers to peerstore so we can connect to them later + for _, peerAddr := range c.config.BootstrapPeers { + if ma, err := multiaddr.NewMultiaddr(peerAddr); err == nil { if peerInfo, err := peer.AddrInfoFromP2pAddr(ma); err == nil { c.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24) - c.logger.Debug("Added bootstrap peer to peerstore", + c.logger.Debug("Added peer to peerstore", zap.String("peer", peerInfo.ID.String())) } } } - c.logger.Info("Bootstrap peers added to peerstore") + c.logger.Info("Peers added to peerstore") c.logger.Info("Starting connection monitoring...") // Client is a lightweight P2P participant - no discovery needed - // We only connect to known bootstrap peers and let nodes handle discovery + // We only connect to known peers and let nodes handle discovery c.logger.Debug("Client configured as lightweight P2P participant (no discovery)") // Start minimal connection monitoring diff --git a/pkg/client/connect_bootstrap.go b/pkg/client/connect_bootstrap.go index 7307ad4..b004a60 100644 --- a/pkg/client/connect_bootstrap.go +++ b/pkg/client/connect_bootstrap.go @@ -9,8 +9,8 @@ import ( "go.uber.org/zap" ) -// connectToBootstrap connects to a bootstrap peer -func (c *Client) connectToBootstrap(ctx context.Context, addr string) error { +// connectToPeer connects to a peer address +func (c *Client) connectToPeer(ctx context.Context, addr string) error { ma, err := multiaddr.NewMultiaddr(addr) if err != nil { return fmt.Errorf("invalid multiaddr: %w", err) @@ -20,14 +20,14 @@ func (c *Client) connectToBootstrap(ctx context.Context, addr string) error { peerInfo, err := peer.AddrInfoFromP2pAddr(ma) if err != nil { // If there's no peer ID, we can't connect - c.logger.Warn("Bootstrap address missing peer ID, skipping", + c.logger.Warn("Peer address missing peer ID, skipping", zap.String("addr", addr)) return nil } - // Avoid dialing ourselves: if the bootstrap address resolves to our own peer ID, skip. + // Avoid dialing ourselves: if the peer address resolves to our own peer ID, skip. if c.host != nil && peerInfo.ID == c.host.ID() { - c.logger.Debug("Skipping bootstrap address because it resolves to self", + c.logger.Debug("Skipping peer address because it resolves to self", zap.String("addr", addr), zap.String("peer_id", peerInfo.ID.String())) return nil @@ -38,7 +38,7 @@ func (c *Client) connectToBootstrap(ctx context.Context, addr string) error { return fmt.Errorf("failed to connect to peer: %w", err) } - c.logger.Debug("Connected to bootstrap peer", + c.logger.Debug("Connected to peer", zap.String("peer_id", peerInfo.ID.String()), zap.String("addr", addr)) diff --git a/pkg/client/defaults.go b/pkg/client/defaults.go index a12cabb..567bec8 100644 --- a/pkg/client/defaults.go +++ b/pkg/client/defaults.go @@ -9,7 +9,7 @@ import ( "github.com/multiformats/go-multiaddr" ) -// DefaultBootstrapPeers returns the library's default bootstrap peer multiaddrs. +// DefaultBootstrapPeers returns the default peer multiaddrs. // These can be overridden by environment variables or config. func DefaultBootstrapPeers() []string { // Check environment variable first @@ -48,7 +48,7 @@ func DefaultDatabaseEndpoints() []string { } } - // Try to derive from bootstrap peers if available + // Try to derive from configured peers if available peers := DefaultBootstrapPeers() if len(peers) > 0 { endpoints := make([]string, 0, len(peers)) diff --git a/pkg/client/defaults_test.go b/pkg/client/defaults_test.go index 82cdbc2..cbc7561 100644 --- a/pkg/client/defaults_test.go +++ b/pkg/client/defaults_test.go @@ -10,15 +10,15 @@ import ( func TestDefaultBootstrapPeersNonEmpty(t *testing.T) { old := os.Getenv("DEBROS_BOOTSTRAP_PEERS") t.Cleanup(func() { os.Setenv("DEBROS_BOOTSTRAP_PEERS", old) }) - // Set a valid bootstrap peer + // Set a valid peer validPeer := "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj" _ = os.Setenv("DEBROS_BOOTSTRAP_PEERS", validPeer) peers := DefaultBootstrapPeers() if len(peers) == 0 { - t.Fatalf("expected non-empty default bootstrap peers") + t.Fatalf("expected non-empty default peers") } if peers[0] != validPeer { - t.Fatalf("expected bootstrap peer %s, got %s", validPeer, peers[0]) + t.Fatalf("expected peer %s, got %s", validPeer, peers[0]) } } diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 31cdd9c..bae966e 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -158,7 +158,7 @@ type StorageStatus struct { type ClientConfig struct { AppName string `json:"app_name"` DatabaseName string `json:"database_name"` - BootstrapPeers []string `json:"bootstrap_peers"` + BootstrapPeers []string `json:"peers"` DatabaseEndpoints []string `json:"database_endpoints"` GatewayURL string `json:"gateway_url"` // Gateway URL for HTTP API access (e.g., "http://localhost:6001") ConnectTimeout time.Duration `json:"connect_timeout"` diff --git a/pkg/config/config.go b/pkg/config/config.go index 41b0c91..3b1add3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -8,21 +8,21 @@ import ( // Config represents the main configuration for a network node type Config struct { - Node NodeConfig `yaml:"node"` - Database DatabaseConfig `yaml:"database"` - Discovery DiscoveryConfig `yaml:"discovery"` - Security SecurityConfig `yaml:"security"` - Logging LoggingConfig `yaml:"logging"` - HTTPGateway HTTPGatewayConfig `yaml:"http_gateway"` + Node NodeConfig `yaml:"node"` + Database DatabaseConfig `yaml:"database"` + Discovery DiscoveryConfig `yaml:"discovery"` + Security SecurityConfig `yaml:"security"` + Logging LoggingConfig `yaml:"logging"` + HTTPGateway HTTPGatewayConfig `yaml:"http_gateway"` } // NodeConfig contains node-specific configuration type NodeConfig struct { ID string `yaml:"id"` // Auto-generated if empty - Type string `yaml:"type"` // "bootstrap" or "node" ListenAddresses []string `yaml:"listen_addresses"` // LibP2P listen addresses DataDir string `yaml:"data_dir"` // Data directory MaxConnections int `yaml:"max_connections"` // Maximum peer connections + Domain string `yaml:"domain"` // Domain for this node (e.g., node-1.orama.network) } // DatabaseConfig contains database-related configuration @@ -76,9 +76,9 @@ type IPFSConfig struct { // DiscoveryConfig contains peer discovery configuration type DiscoveryConfig struct { - BootstrapPeers []string `yaml:"bootstrap_peers"` // Bootstrap peer addresses + BootstrapPeers []string `yaml:"bootstrap_peers"` // Peer addresses to connect to DiscoveryInterval time.Duration `yaml:"discovery_interval"` // Discovery announcement interval - BootstrapPort int `yaml:"bootstrap_port"` // Default port for bootstrap nodes + BootstrapPort int `yaml:"bootstrap_port"` // Default port for peer discovery HttpAdvAddress string `yaml:"http_adv_address"` // HTTP advertisement address RaftAdvAddress string `yaml:"raft_adv_address"` // Raft advertisement NodeNamespace string `yaml:"node_namespace"` // Namespace for node identifiers @@ -100,34 +100,34 @@ type LoggingConfig struct { // HTTPGatewayConfig contains HTTP reverse proxy gateway configuration type HTTPGatewayConfig struct { - Enabled bool `yaml:"enabled"` // Enable HTTP gateway - ListenAddr string `yaml:"listen_addr"` // Address to listen on (e.g., ":8080") - NodeName string `yaml:"node_name"` // Node name for routing - Routes map[string]RouteConfig `yaml:"routes"` // Service routes - HTTPS HTTPSConfig `yaml:"https"` // HTTPS/TLS configuration - SNI SNIConfig `yaml:"sni"` // SNI-based TCP routing configuration + Enabled bool `yaml:"enabled"` // Enable HTTP gateway + ListenAddr string `yaml:"listen_addr"` // Address to listen on (e.g., ":8080") + NodeName string `yaml:"node_name"` // Node name for routing + Routes map[string]RouteConfig `yaml:"routes"` // Service routes + HTTPS HTTPSConfig `yaml:"https"` // HTTPS/TLS configuration + SNI SNIConfig `yaml:"sni"` // SNI-based TCP routing configuration } // HTTPSConfig contains HTTPS/TLS configuration for the gateway type HTTPSConfig struct { - Enabled bool `yaml:"enabled"` // Enable HTTPS (port 443) - Domain string `yaml:"domain"` // Primary domain (e.g., node-123.orama.network) - AutoCert bool `yaml:"auto_cert"` // Use Let's Encrypt for automatic certificate - CertFile string `yaml:"cert_file"` // Path to certificate file (if not using auto_cert) - KeyFile string `yaml:"key_file"` // Path to key file (if not using auto_cert) - CacheDir string `yaml:"cache_dir"` // Directory for Let's Encrypt certificate cache - HTTPPort int `yaml:"http_port"` // HTTP port for ACME challenge (default: 80) - HTTPSPort int `yaml:"https_port"` // HTTPS port (default: 443) - Email string `yaml:"email"` // Email for Let's Encrypt account + Enabled bool `yaml:"enabled"` // Enable HTTPS (port 443) + Domain string `yaml:"domain"` // Primary domain (e.g., node-123.orama.network) + AutoCert bool `yaml:"auto_cert"` // Use Let's Encrypt for automatic certificate + CertFile string `yaml:"cert_file"` // Path to certificate file (if not using auto_cert) + KeyFile string `yaml:"key_file"` // Path to key file (if not using auto_cert) + CacheDir string `yaml:"cache_dir"` // Directory for Let's Encrypt certificate cache + HTTPPort int `yaml:"http_port"` // HTTP port for ACME challenge (default: 80) + HTTPSPort int `yaml:"https_port"` // HTTPS port (default: 443) + Email string `yaml:"email"` // Email for Let's Encrypt account } // SNIConfig contains SNI-based TCP routing configuration for port 7001 type SNIConfig struct { - Enabled bool `yaml:"enabled"` // Enable SNI-based TCP routing - ListenAddr string `yaml:"listen_addr"` // Address to listen on (e.g., ":7001") - Routes map[string]string `yaml:"routes"` // SNI hostname -> backend address mapping - CertFile string `yaml:"cert_file"` // Path to certificate file - KeyFile string `yaml:"key_file"` // Path to key file + Enabled bool `yaml:"enabled"` // Enable SNI-based TCP routing + ListenAddr string `yaml:"listen_addr"` // Address to listen on (e.g., ":7001") + Routes map[string]string `yaml:"routes"` // SNI hostname -> backend address mapping + CertFile string `yaml:"cert_file"` // Path to certificate file + KeyFile string `yaml:"key_file"` // Path to key file } // RouteConfig defines a single reverse proxy route @@ -164,7 +164,6 @@ func (c *Config) ParseMultiaddrs() ([]multiaddr.Multiaddr, error) { func DefaultConfig() *Config { return &Config{ Node: NodeConfig{ - Type: "node", ListenAddresses: []string{ "/ip4/0.0.0.0/tcp/4001", // TCP only - compatible with Anyone proxy/SOCKS5 }, @@ -181,7 +180,7 @@ func DefaultConfig() *Config { // RQLite-specific configuration RQLitePort: 5001, RQLiteRaftPort: 7001, - RQLiteJoinAddress: "", // Empty for bootstrap node + RQLiteJoinAddress: "", // Empty for first node (creates cluster) // Dynamic discovery (always enabled) ClusterSyncInterval: 30 * time.Second, diff --git a/pkg/config/paths.go b/pkg/config/paths.go index f973f00..4335c77 100644 --- a/pkg/config/paths.go +++ b/pkg/config/paths.go @@ -28,7 +28,7 @@ func EnsureConfigDir() (string, error) { } // DefaultPath returns the path to the config file for the given component name. -// component should be e.g., "node.yaml", "bootstrap.yaml", "gateway.yaml" +// component should be e.g., "node.yaml", "gateway.yaml" // It checks ~/.orama/data/, ~/.orama/configs/, and ~/.orama/ for backward compatibility. // If component is already an absolute path, it returns it as-is. func DefaultPath(component string) (string, error) { diff --git a/pkg/config/validate.go b/pkg/config/validate.go index d8c33e6..3f3c802 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -15,7 +15,7 @@ import ( // ValidationError represents a single validation error with context. type ValidationError struct { - Path string // e.g., "discovery.bootstrap_peers[0]" + Path string // e.g., "discovery.bootstrap_peers[0]" or "discovery.peers[0]" Message string // e.g., "invalid multiaddr" Hint string // e.g., "expected /ip{4,6}/.../tcp//p2p/" } @@ -61,14 +61,6 @@ func (c *Config) validateNode() []error { }) } - // Validate type - if nc.Type != "bootstrap" && nc.Type != "node" { - errs = append(errs, ValidationError{ - Path: "node.type", - Message: fmt.Sprintf("must be one of [bootstrap node]; got %q", nc.Type), - }) - } - // Validate listen_addresses if len(nc.ListenAddresses) == 0 { errs = append(errs, ValidationError{ @@ -218,33 +210,15 @@ func (c *Config) validateDatabase() []error { }) } - // Validate rqlite_join_address context-dependently - if c.Node.Type == "node" { - if dc.RQLiteJoinAddress == "" { + // Validate rqlite_join_address format if provided (optional for all nodes) + // The first node in a cluster won't have a join address; subsequent nodes will + if dc.RQLiteJoinAddress != "" { + if err := validateHostPort(dc.RQLiteJoinAddress); err != nil { errs = append(errs, ValidationError{ Path: "database.rqlite_join_address", - Message: "required for node type (non-bootstrap)", + Message: err.Error(), + Hint: "expected format: host:port", }) - } else { - if err := validateHostPort(dc.RQLiteJoinAddress); err != nil { - errs = append(errs, ValidationError{ - Path: "database.rqlite_join_address", - Message: err.Error(), - Hint: "expected format: host:port", - }) - } - } - } else if c.Node.Type == "bootstrap" { - // Bootstrap nodes can optionally join another bootstrap's RQLite cluster - // This allows secondary bootstraps to synchronize with the primary - if dc.RQLiteJoinAddress != "" { - if err := validateHostPort(dc.RQLiteJoinAddress); err != nil { - errs = append(errs, ValidationError{ - Path: "database.rqlite_join_address", - Message: err.Error(), - Hint: "expected format: host:port", - }) - } } } @@ -297,7 +271,7 @@ func (c *Config) validateDiscovery() []error { }) } - // Validate bootstrap_port + // Validate peer discovery port if disc.BootstrapPort < 1 || disc.BootstrapPort > 65535 { errs = append(errs, ValidationError{ Path: "discovery.bootstrap_port", @@ -305,17 +279,10 @@ func (c *Config) validateDiscovery() []error { }) } - // Validate bootstrap_peers context-dependently - if c.Node.Type == "node" { - if len(disc.BootstrapPeers) == 0 { - errs = append(errs, ValidationError{ - Path: "discovery.bootstrap_peers", - Message: "required for node type (must not be empty)", - }) - } - } + // Validate peer addresses (optional - can be empty for first node) + // All nodes are unified, so peer addresses are optional - // Validate each bootstrap peer multiaddr + // Validate each peer multiaddr seenPeers := make(map[string]bool) for i, peer := range disc.BootstrapPeers { path := fmt.Sprintf("discovery.bootstrap_peers[%d]", i) @@ -363,7 +330,7 @@ func (c *Config) validateDiscovery() []error { if seenPeers[peer] { errs = append(errs, ValidationError{ Path: path, - Message: "duplicate bootstrap peer", + Message: "duplicate peer", }) } seenPeers[peer] = true @@ -486,22 +453,6 @@ func (c *Config) validateLogging() []error { func (c *Config) validateCrossFields() []error { var errs []error - - // If node.type is invalid, don't run cross-checks - if c.Node.Type != "bootstrap" && c.Node.Type != "node" { - return errs - } - - // Cross-check rqlite_join_address vs node type - // Note: Bootstrap nodes can optionally join another bootstrap's cluster - - if c.Node.Type == "node" && c.Database.RQLiteJoinAddress == "" { - errs = append(errs, ValidationError{ - Path: "database.rqlite_join_address", - Message: "required for non-bootstrap node type", - }) - } - return errs } diff --git a/pkg/config/validate_test.go b/pkg/config/validate_test.go index 38b989f..46f0c22 100644 --- a/pkg/config/validate_test.go +++ b/pkg/config/validate_test.go @@ -5,12 +5,11 @@ import ( "time" ) -// validConfigForType returns a valid config for the given node type -func validConfigForType(nodeType string) *Config { +// validConfigForNode returns a valid config +func validConfigForNode() *Config { validPeer := "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj" cfg := &Config{ Node: NodeConfig{ - Type: nodeType, ID: "test-node-id", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", @@ -25,6 +24,7 @@ func validConfigForType(nodeType string) *Config { RQLitePort: 5001, RQLiteRaftPort: 7001, MinClusterSize: 1, + RQLiteJoinAddress: "", // Optional - first node creates cluster, others join }, Discovery: DiscoveryConfig{ BootstrapPeers: []string{validPeer}, @@ -40,51 +40,9 @@ func validConfigForType(nodeType string) *Config { }, } - // Set rqlite_join_address based on node type - if nodeType == "node" { - cfg.Database.RQLiteJoinAddress = "localhost:5001" - // Node type requires bootstrap peers - cfg.Discovery.BootstrapPeers = []string{validPeer} - } else { - // Bootstrap type: empty join address and peers optional - cfg.Database.RQLiteJoinAddress = "" - cfg.Discovery.BootstrapPeers = []string{} - } - return cfg } -func TestValidateNodeType(t *testing.T) { - tests := []struct { - name string - nodeType string - shouldError bool - }{ - {"bootstrap", "bootstrap", false}, - {"node", "node", false}, - {"invalid", "invalid-type", true}, - {"empty", "", true}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("bootstrap") // Start with valid bootstrap - if tt.nodeType == "node" { - cfg = validConfigForType("node") - } else { - cfg.Node.Type = tt.nodeType - } - errs := cfg.Validate() - if tt.shouldError && len(errs) == 0 { - t.Errorf("expected error, got none") - } - if !tt.shouldError && len(errs) > 0 { - t.Errorf("unexpected errors: %v", errs) - } - }) - } -} - func TestValidateListenAddresses(t *testing.T) { tests := []struct { name string @@ -102,7 +60,7 @@ func TestValidateListenAddresses(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("node") + cfg := validConfigForNode() cfg.Node.ListenAddresses = tt.addresses errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -130,7 +88,7 @@ func TestValidateReplicationFactor(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("node") + cfg := validConfigForNode() cfg.Database.ReplicationFactor = tt.replication errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -160,7 +118,7 @@ func TestValidateRQLitePorts(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("node") + cfg := validConfigForNode() cfg.Database.RQLitePort = tt.httpPort cfg.Database.RQLiteRaftPort = tt.raftPort errs := cfg.Validate() @@ -177,21 +135,18 @@ func TestValidateRQLitePorts(t *testing.T) { func TestValidateRQLiteJoinAddress(t *testing.T) { tests := []struct { name string - nodeType string joinAddr string shouldError bool }{ - {"node with join", "node", "localhost:5001", false}, - {"node without join", "node", "", true}, - {"bootstrap with join", "bootstrap", "localhost:5001", false}, - {"bootstrap without join", "bootstrap", "", false}, - {"invalid join format", "node", "localhost", true}, - {"invalid join port", "node", "localhost:99999", true}, + {"node with join", "localhost:5001", false}, + {"node without join", "", false}, // Join address is optional (first node creates cluster) + {"invalid join format", "localhost", true}, + {"invalid join port", "localhost:99999", true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType(tt.nodeType) + cfg := validConfigForNode() cfg.Database.RQLiteJoinAddress = tt.joinAddr errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -204,27 +159,24 @@ func TestValidateRQLiteJoinAddress(t *testing.T) { } } -func TestValidateBootstrapPeers(t *testing.T) { +func TestValidatePeerAddresses(t *testing.T) { validPeer := "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj" tests := []struct { name string - nodeType string peers []string shouldError bool }{ - {"node with peer", "node", []string{validPeer}, false}, - {"node without peer", "node", []string{}, true}, - {"bootstrap with peer", "bootstrap", []string{validPeer}, false}, - {"bootstrap without peer", "bootstrap", []string{}, false}, - {"invalid multiaddr", "node", []string{"invalid"}, true}, - {"missing p2p", "node", []string{"/ip4/127.0.0.1/tcp/4001"}, true}, - {"duplicate peer", "node", []string{validPeer, validPeer}, true}, - {"invalid port", "node", []string{"/ip4/127.0.0.1/tcp/99999/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, true}, + {"node with peer", []string{validPeer}, false}, + {"node without peer", []string{}, true}, // All nodes need peer addresses + {"invalid multiaddr", []string{"invalid"}, true}, + {"missing p2p", []string{"/ip4/127.0.0.1/tcp/4001"}, true}, + {"duplicate peer", []string{validPeer, validPeer}, true}, + {"invalid port", []string{"/ip4/127.0.0.1/tcp/99999/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType(tt.nodeType) + cfg := validConfigForNode() cfg.Discovery.BootstrapPeers = tt.peers errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -253,7 +205,7 @@ func TestValidateLoggingLevel(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("node") + cfg := validConfigForNode() cfg.Logging.Level = tt.level errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -280,7 +232,7 @@ func TestValidateLoggingFormat(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("node") + cfg := validConfigForNode() cfg.Logging.Format = tt.format errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -307,7 +259,7 @@ func TestValidateMaxConnections(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("node") + cfg := validConfigForNode() cfg.Node.MaxConnections = tt.maxConn errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -334,7 +286,7 @@ func TestValidateDiscoveryInterval(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("node") + cfg := validConfigForNode() cfg.Discovery.DiscoveryInterval = tt.interval errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -347,7 +299,7 @@ func TestValidateDiscoveryInterval(t *testing.T) { } } -func TestValidateBootstrapPort(t *testing.T) { +func TestValidatePeerDiscoveryPort(t *testing.T) { tests := []struct { name string port int @@ -361,7 +313,7 @@ func TestValidateBootstrapPort(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := validConfigForType("node") + cfg := validConfigForNode() cfg.Discovery.BootstrapPort = tt.port errs := cfg.Validate() if tt.shouldError && len(errs) == 0 { @@ -378,7 +330,6 @@ func TestValidateCompleteConfig(t *testing.T) { // Test a complete valid config validCfg := &Config{ Node: NodeConfig{ - Type: "node", ID: "node1", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4002"}, DataDir: ".", diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 56d1456..0d6be27 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -214,7 +214,7 @@ func (d *Manager) Stop() { } // discoverPeers discovers and connects to new peers using non-DHT strategies: -// - Peerstore entries (bootstrap peers added to peerstore by the caller) +// - Peerstore entries (peers added to peerstore by the caller) // - Peer exchange: query currently connected peers' peerstore entries func (d *Manager) discoverPeers(ctx context.Context, config Config) { connectedPeers := d.host.Network().Peers() @@ -242,7 +242,7 @@ func (d *Manager) discoverPeers(ctx context.Context, config Config) { } // discoverViaPeerstore attempts to connect to peers found in the host's peerstore. -// This is useful for bootstrap peers that have been pre-populated into the peerstore. +// This is useful for peers that have been pre-populated into the peerstore. func (d *Manager) discoverViaPeerstore(ctx context.Context, maxConnections int) int { if maxConnections <= 0 { return 0 diff --git a/pkg/discovery/rqlite_metadata.go b/pkg/discovery/rqlite_metadata.go index 65a7048..e70d263 100644 --- a/pkg/discovery/rqlite_metadata.go +++ b/pkg/discovery/rqlite_metadata.go @@ -9,7 +9,7 @@ type RQLiteNodeMetadata struct { NodeID string `json:"node_id"` // RQLite node ID (from config) RaftAddress string `json:"raft_address"` // Raft port address (e.g., "51.83.128.181:7001") HTTPAddress string `json:"http_address"` // HTTP API address (e.g., "51.83.128.181:5001") - NodeType string `json:"node_type"` // "bootstrap" or "node" + NodeType string `json:"node_type"` // Node type identifier RaftLogIndex uint64 `json:"raft_log_index"` // Current Raft log index (for data comparison) LastSeen time.Time `json:"last_seen"` // Updated on every announcement ClusterVersion string `json:"cluster_version"` // For compatibility checking diff --git a/pkg/environments/development/config.go b/pkg/environments/development/config.go index 9fed94f..2ece6d4 100644 --- a/pkg/environments/development/config.go +++ b/pkg/environments/development/config.go @@ -43,25 +43,27 @@ func (ce *ConfigEnsurer) EnsureAll() error { // Load topology topology := DefaultTopology() - // Generate identities for all bootstrap nodes and collect multiaddrs - bootstrapAddrs := []string{} - for _, nodeSpec := range topology.GetBootstrapNodes() { + // Generate identities for first two nodes and collect their multiaddrs as peer addresses + // All nodes use these addresses for initial peer discovery + peerAddrs := []string{} + for i := 0; i < 2 && i < len(topology.Nodes); i++ { + nodeSpec := topology.Nodes[i] addr, err := ce.ensureNodeIdentity(nodeSpec) if err != nil { return fmt.Errorf("failed to ensure identity for %s: %w", nodeSpec.Name, err) } - bootstrapAddrs = append(bootstrapAddrs, addr) + peerAddrs = append(peerAddrs, addr) } - // Ensure configs for all bootstrap and regular nodes + // Ensure configs for all nodes for _, nodeSpec := range topology.Nodes { - if err := ce.ensureNodeConfig(nodeSpec, bootstrapAddrs); err != nil { + if err := ce.ensureNodeConfig(nodeSpec, peerAddrs); err != nil { return fmt.Errorf("failed to ensure config for %s: %w", nodeSpec.Name, err) } } // Ensure gateway config - if err := ce.ensureGateway(bootstrapAddrs); err != nil { + if err := ce.ensureGateway(peerAddrs); err != nil { return fmt.Errorf("failed to ensure gateway: %w", err) } @@ -133,7 +135,7 @@ func (ce *ConfigEnsurer) ensureNodeIdentity(nodeSpec NodeSpec) (string, error) { } // ensureNodeConfig creates or updates a node configuration -func (ce *ConfigEnsurer) ensureNodeConfig(nodeSpec NodeSpec, bootstrapAddrs []string) error { +func (ce *ConfigEnsurer) ensureNodeConfig(nodeSpec NodeSpec, peerAddrs []string) error { nodeDir := filepath.Join(ce.oramaDir, nodeSpec.DataDir) configPath := filepath.Join(ce.oramaDir, nodeSpec.ConfigFilename) @@ -141,7 +143,7 @@ func (ce *ConfigEnsurer) ensureNodeConfig(nodeSpec NodeSpec, bootstrapAddrs []st return fmt.Errorf("failed to create node directory: %w", err) } - // Generate node config (unified - no bootstrap/node distinction) + // Generate node config (all nodes are unified) data := templates.NodeConfigData{ NodeID: nodeSpec.Name, P2PPort: nodeSpec.P2PPort, @@ -149,7 +151,7 @@ func (ce *ConfigEnsurer) ensureNodeConfig(nodeSpec NodeSpec, bootstrapAddrs []st RQLiteHTTPPort: nodeSpec.RQLiteHTTPPort, RQLiteRaftPort: nodeSpec.RQLiteRaftPort, RQLiteJoinAddress: nodeSpec.RQLiteJoinTarget, - BootstrapPeers: bootstrapAddrs, + BootstrapPeers: peerAddrs, ClusterAPIPort: nodeSpec.ClusterAPIPort, IPFSAPIPort: nodeSpec.IPFSAPIPort, UnifiedGatewayPort: nodeSpec.UnifiedGatewayPort, @@ -170,19 +172,19 @@ func (ce *ConfigEnsurer) ensureNodeConfig(nodeSpec NodeSpec, bootstrapAddrs []st } // ensureGateway creates gateway config -func (ce *ConfigEnsurer) ensureGateway(bootstrapAddrs []string) error { +func (ce *ConfigEnsurer) ensureGateway(peerAddrs []string) error { configPath := filepath.Join(ce.oramaDir, "gateway.yaml") - // Get first bootstrap's cluster API port for default + // Get first node's cluster API port for default topology := DefaultTopology() - firstBootstrap := topology.GetBootstrapNodes()[0] + firstNode := topology.GetFirstNode() data := templates.GatewayConfigData{ ListenPort: topology.GatewayPort, - BootstrapPeers: bootstrapAddrs, + BootstrapPeers: peerAddrs, OlricServers: []string{fmt.Sprintf("127.0.0.1:%d", topology.OlricHTTPPort)}, - ClusterAPIPort: firstBootstrap.ClusterAPIPort, - IPFSAPIPort: firstBootstrap.IPFSAPIPort, + ClusterAPIPort: firstNode.ClusterAPIPort, + IPFSAPIPort: firstNode.IPFSAPIPort, } config, err := templates.RenderGatewayConfig(data) diff --git a/pkg/environments/development/runner.go b/pkg/environments/development/runner.go index 764ef7f..a1c431b 100644 --- a/pkg/environments/development/runner.go +++ b/pkg/environments/development/runner.go @@ -238,7 +238,7 @@ func (pm *ProcessManager) Status(ctx context.Context) { } fmt.Fprintf(pm.logWriter, "\nConfiguration files in %s:\n", pm.oramaDir) - configFiles := []string{"bootstrap.yaml", "bootstrap2.yaml", "node2.yaml", "node3.yaml", "node4.yaml", "gateway.yaml", "olric-config.yaml"} + configFiles := []string{"node-1.yaml", "node-2.yaml", "node-3.yaml", "node-4.yaml", "node-5.yaml", "gateway.yaml", "olric-config.yaml"} for _, f := range configFiles { path := filepath.Join(pm.oramaDir, f) if _, err := os.Stat(path); err == nil { @@ -269,7 +269,7 @@ func (pm *ProcessManager) buildIPFSNodes(topology *Topology) []ipfsNodeInfo { return nodes } -// startNodes starts all network nodes (bootstraps and regular) +// startNodes starts all network nodes func (pm *ProcessManager) startNodes(ctx context.Context) error { topology := DefaultTopology() for _, nodeSpec := range topology.Nodes { diff --git a/pkg/environments/development/topology.go b/pkg/environments/development/topology.go index 1631798..31c4de0 100644 --- a/pkg/environments/development/topology.go +++ b/pkg/environments/development/topology.go @@ -4,9 +4,8 @@ import "fmt" // NodeSpec defines configuration for a single dev environment node type NodeSpec struct { - Name string // bootstrap, bootstrap2, node2, node3, node4 - Role string // "bootstrap" or "node" - ConfigFilename string // bootstrap.yaml, bootstrap2.yaml, node2.yaml, etc. + Name string // node-1, node-2, node-3, node-4, node-5 + ConfigFilename string // node-1.yaml, node-2.yaml, etc. DataDir string // relative path from .orama root P2PPort int // LibP2P listen port IPFSAPIPort int // IPFS API port @@ -17,8 +16,8 @@ type NodeSpec struct { ClusterAPIPort int // IPFS Cluster REST API port ClusterPort int // IPFS Cluster P2P port UnifiedGatewayPort int // Unified gateway port (proxies all services) - RQLiteJoinTarget string // which bootstrap RQLite port to join (leave empty for bootstraps that lead) - ClusterJoinTarget string // which bootstrap cluster to join (leave empty for bootstrap that leads) + RQLiteJoinTarget string // which node's RQLite Raft port to join (empty for first node) + ClusterJoinTarget string // which node's cluster to join (empty for first node) } // Topology defines the complete development environment topology @@ -35,10 +34,9 @@ func DefaultTopology() *Topology { return &Topology{ Nodes: []NodeSpec{ { - Name: "bootstrap", - Role: "bootstrap", - ConfigFilename: "bootstrap.yaml", - DataDir: "bootstrap", + Name: "node-1", + ConfigFilename: "node-1.yaml", + DataDir: "node-1", P2PPort: 4001, IPFSAPIPort: 4501, IPFSSwarmPort: 4101, @@ -48,14 +46,13 @@ func DefaultTopology() *Topology { ClusterAPIPort: 9094, ClusterPort: 9096, UnifiedGatewayPort: 6001, - RQLiteJoinTarget: "", + RQLiteJoinTarget: "", // First node - creates cluster ClusterJoinTarget: "", }, { - Name: "bootstrap2", - Role: "bootstrap", - ConfigFilename: "bootstrap2.yaml", - DataDir: "bootstrap2", + Name: "node-2", + ConfigFilename: "node-2.yaml", + DataDir: "node-2", P2PPort: 4011, IPFSAPIPort: 4511, IPFSSwarmPort: 4111, @@ -69,10 +66,9 @@ func DefaultTopology() *Topology { ClusterJoinTarget: "localhost:9096", }, { - Name: "node2", - Role: "node", - ConfigFilename: "node2.yaml", - DataDir: "node2", + Name: "node-3", + ConfigFilename: "node-3.yaml", + DataDir: "node-3", P2PPort: 4002, IPFSAPIPort: 4502, IPFSSwarmPort: 4102, @@ -86,10 +82,9 @@ func DefaultTopology() *Topology { ClusterJoinTarget: "localhost:9096", }, { - Name: "node3", - Role: "node", - ConfigFilename: "node3.yaml", - DataDir: "node3", + Name: "node-4", + ConfigFilename: "node-4.yaml", + DataDir: "node-4", P2PPort: 4003, IPFSAPIPort: 4503, IPFSSwarmPort: 4103, @@ -103,10 +98,9 @@ func DefaultTopology() *Topology { ClusterJoinTarget: "localhost:9096", }, { - Name: "node4", - Role: "node", - ConfigFilename: "node4.yaml", - DataDir: "node4", + Name: "node-5", + ConfigFilename: "node-5.yaml", + DataDir: "node-5", P2PPort: 4004, IPFSAPIPort: 4504, IPFSSwarmPort: 4104, @@ -120,7 +114,7 @@ func DefaultTopology() *Topology { ClusterJoinTarget: "localhost:9096", }, }, - GatewayPort: 6001, + GatewayPort: 6000, // Main gateway on 6000 (nodes use 6001-6005) OlricHTTPPort: 3320, OlricMemberPort: 3322, AnonSOCKSPort: 9050, @@ -181,26 +175,20 @@ func (t *Topology) PortMap() map[int]string { return portMap } -// GetBootstrapNodes returns only the bootstrap nodes -func (t *Topology) GetBootstrapNodes() []NodeSpec { - var bootstraps []NodeSpec - for _, node := range t.Nodes { - if node.Role == "bootstrap" { - bootstraps = append(bootstraps, node) - } +// GetFirstNode returns the first node (the one that creates the cluster) +func (t *Topology) GetFirstNode() *NodeSpec { + if len(t.Nodes) > 0 { + return &t.Nodes[0] } - return bootstraps + return nil } -// GetRegularNodes returns only the regular (non-bootstrap) nodes -func (t *Topology) GetRegularNodes() []NodeSpec { - var regulars []NodeSpec - for _, node := range t.Nodes { - if node.Role == "node" { - regulars = append(regulars, node) - } +// GetJoiningNodes returns all nodes except the first one (they join the cluster) +func (t *Topology) GetJoiningNodes() []NodeSpec { + if len(t.Nodes) > 1 { + return t.Nodes[1:] } - return regulars + return nil } // GetNodeByName returns a node by its name, or nil if not found diff --git a/pkg/environments/production/config.go b/pkg/environments/production/config.go index 3bd407b..36ab1a6 100644 --- a/pkg/environments/production/config.go +++ b/pkg/environments/production/config.go @@ -30,7 +30,7 @@ func NewConfigGenerator(oramaDir string) *ConfigGenerator { } } -// extractIPFromMultiaddr extracts the IP address from a bootstrap peer multiaddr +// extractIPFromMultiaddr extracts the IP address from a peer multiaddr // Supports IP4, IP6, DNS4, DNS6, and DNSADDR protocols // Returns the IP address as a string, or empty string if extraction/resolution fails func extractIPFromMultiaddr(multiaddrStr string) string { @@ -76,12 +76,12 @@ func extractIPFromMultiaddr(multiaddrStr string) string { return "" } -// inferBootstrapIP extracts the IP address from bootstrap peer multiaddrs -// Iterates through all bootstrap peers to find a valid IP (supports DNS resolution) +// inferPeerIP extracts the IP address from peer multiaddrs +// Iterates through all peers to find a valid IP (supports DNS resolution) // Falls back to vpsIP if provided, otherwise returns empty string -func inferBootstrapIP(bootstrapPeers []string, vpsIP string) string { - // Try to extract IP from each bootstrap peer (in order) - for _, peer := range bootstrapPeers { +func inferPeerIP(peers []string, vpsIP string) string { + // Try to extract IP from each peer (in order) + for _, peer := range peers { if ip := extractIPFromMultiaddr(peer); ip != "" { return ip } @@ -93,8 +93,8 @@ func inferBootstrapIP(bootstrapPeers []string, vpsIP string) string { return "" } -// GenerateNodeConfig generates node.yaml configuration (unified - no bootstrap/node distinction) -func (cg *ConfigGenerator) GenerateNodeConfig(bootstrapPeers []string, vpsIP string, joinAddress string, domain string) (string, error) { +// GenerateNodeConfig generates node.yaml configuration (unified architecture) +func (cg *ConfigGenerator) GenerateNodeConfig(peerAddresses []string, vpsIP string, joinAddress string, domain string) (string, error) { // Generate node ID from domain or use default nodeID := "node" if domain != "" { @@ -121,11 +121,11 @@ func (cg *ConfigGenerator) GenerateNodeConfig(bootstrapPeers []string, vpsIP str if joinAddress != "" { // Use explicitly provided join address rqliteJoinAddr = joinAddress - } else if len(bootstrapPeers) > 0 { - // Infer join address from bootstrap peers - bootstrapIP := inferBootstrapIP(bootstrapPeers, "") - if bootstrapIP != "" { - rqliteJoinAddr = net.JoinHostPort(bootstrapIP, "7001") + } else if len(peerAddresses) > 0 { + // Infer join address from peers + peerIP := inferPeerIP(peerAddresses, "") + if peerIP != "" { + rqliteJoinAddr = net.JoinHostPort(peerIP, "7001") // Validate that join address doesn't match this node's own raft address (would cause self-join) if rqliteJoinAddr == raftAdvAddr { rqliteJoinAddr = "" // Clear it - this is the first node @@ -134,7 +134,7 @@ func (cg *ConfigGenerator) GenerateNodeConfig(bootstrapPeers []string, vpsIP str } // If no join address and no peers, this is the first node - it will create the cluster - // Unified data directory (no bootstrap/node distinction) + // Unified data directory (all nodes equal) data := templates.NodeConfigData{ NodeID: nodeID, P2PPort: 4001, @@ -142,7 +142,7 @@ func (cg *ConfigGenerator) GenerateNodeConfig(bootstrapPeers []string, vpsIP str RQLiteHTTPPort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: rqliteJoinAddr, - BootstrapPeers: bootstrapPeers, + BootstrapPeers: peerAddresses, ClusterAPIPort: 9094, IPFSAPIPort: 4501, HTTPAdvAddress: httpAdvAddr, @@ -154,7 +154,7 @@ func (cg *ConfigGenerator) GenerateNodeConfig(bootstrapPeers []string, vpsIP str } // GenerateGatewayConfig generates gateway.yaml configuration -func (cg *ConfigGenerator) GenerateGatewayConfig(bootstrapPeers []string, enableHTTPS bool, domain string, olricServers []string) (string, error) { +func (cg *ConfigGenerator) GenerateGatewayConfig(peerAddresses []string, enableHTTPS bool, domain string, olricServers []string) (string, error) { tlsCacheDir := "" if enableHTTPS { tlsCacheDir = filepath.Join(cg.oramaDir, "tls-cache") @@ -162,7 +162,7 @@ func (cg *ConfigGenerator) GenerateGatewayConfig(bootstrapPeers []string, enable data := templates.GatewayConfigData{ ListenPort: 6001, - BootstrapPeers: bootstrapPeers, + BootstrapPeers: peerAddresses, OlricServers: olricServers, ClusterAPIPort: 9094, IPFSAPIPort: 4501, diff --git a/pkg/environments/production/orchestrator.go b/pkg/environments/production/orchestrator.go index 71ab176..6355743 100644 --- a/pkg/environments/production/orchestrator.go +++ b/pkg/environments/production/orchestrator.go @@ -272,7 +272,7 @@ func (ps *ProductionSetup) Phase2bInstallBinaries() error { } // Phase2cInitializeServices initializes service repositories and configurations -func (ps *ProductionSetup) Phase2cInitializeServices(bootstrapPeers []string, vpsIP string) error { +func (ps *ProductionSetup) Phase2cInitializeServices(peerAddresses []string, vpsIP string) error { ps.logf("Phase 2c: Initializing services...") // Ensure directories exist (unified structure) @@ -280,7 +280,7 @@ func (ps *ProductionSetup) Phase2cInitializeServices(bootstrapPeers []string, vp return fmt.Errorf("failed to create directories: %w", err) } - // Build paths - unified data directory (no bootstrap/node distinction) + // Build paths - unified data directory (all nodes equal) dataDir := filepath.Join(ps.oramaDir, "data") // Initialize IPFS repo with correct path structure @@ -297,13 +297,13 @@ func (ps *ProductionSetup) Phase2cInitializeServices(bootstrapPeers []string, vp return fmt.Errorf("failed to get cluster secret: %w", err) } - // Get cluster peer addresses from bootstrap peers if available + // Get cluster peer addresses from peers if available var clusterPeers []string - if len(bootstrapPeers) > 0 { - // Infer IP from bootstrap peers - bootstrapIP := inferBootstrapIP(bootstrapPeers, vpsIP) - if bootstrapIP != "" { - ps.logf(" ℹ️ Will attempt to connect to cluster peers at %s", bootstrapIP) + if len(peerAddresses) > 0 { + // Infer IP from peers + peerIP := inferPeerIP(peerAddresses, vpsIP) + if peerIP != "" { + ps.logf(" ℹ️ Will attempt to connect to cluster peers at %s", peerIP) } } @@ -343,7 +343,7 @@ func (ps *ProductionSetup) Phase3GenerateSecrets() error { } ps.logf(" ✓ IPFS swarm key ensured") - // Node identity (unified - no bootstrap/node distinction) + // Node identity (unified architecture) peerID, err := ps.secretGenerator.EnsureNodeIdentity() if err != nil { return fmt.Errorf("failed to ensure node identity: %w", err) @@ -356,7 +356,7 @@ func (ps *ProductionSetup) Phase3GenerateSecrets() error { } // Phase4GenerateConfigs generates node, gateway, and service configs -func (ps *ProductionSetup) Phase4GenerateConfigs(bootstrapPeers []string, vpsIP string, enableHTTPS bool, domain string, joinAddress string) error { +func (ps *ProductionSetup) Phase4GenerateConfigs(peerAddresses []string, vpsIP string, enableHTTPS bool, domain string, joinAddress string) error { if ps.IsUpdate() { ps.logf("Phase 4: Updating configurations...") ps.logf(" (Existing configs will be updated to latest format)") @@ -364,8 +364,8 @@ func (ps *ProductionSetup) Phase4GenerateConfigs(bootstrapPeers []string, vpsIP ps.logf("Phase 4: Generating configurations...") } - // Node config (unified - no bootstrap/node distinction) - nodeConfig, err := ps.configGenerator.GenerateNodeConfig(bootstrapPeers, vpsIP, joinAddress, domain) + // Node config (unified architecture) + nodeConfig, err := ps.configGenerator.GenerateNodeConfig(peerAddresses, vpsIP, joinAddress, domain) if err != nil { return fmt.Errorf("failed to generate node config: %w", err) } @@ -388,14 +388,14 @@ func (ps *ProductionSetup) Phase4GenerateConfigs(bootstrapPeers []string, vpsIP } // If joining existing cluster, also include peer Olric servers - if len(bootstrapPeers) > 0 { - peerIP := inferBootstrapIP(bootstrapPeers, "") + if len(peerAddresses) > 0 { + peerIP := inferPeerIP(peerAddresses, "") if peerIP != "" && peerIP != vpsIP { olricServers = append(olricServers, net.JoinHostPort(peerIP, "3320")) } } - gatewayConfig, err := ps.configGenerator.GenerateGatewayConfig(bootstrapPeers, enableHTTPS, domain, olricServers) + gatewayConfig, err := ps.configGenerator.GenerateGatewayConfig(peerAddresses, enableHTTPS, domain, olricServers) if err != nil { return fmt.Errorf("failed to generate gateway config: %w", err) } diff --git a/pkg/environments/templates/node.yaml b/pkg/environments/templates/node.yaml index 9773add..7f28294 100644 --- a/pkg/environments/templates/node.yaml +++ b/pkg/environments/templates/node.yaml @@ -1,6 +1,5 @@ node: id: "{{.NodeID}}" - type: "node" listen_addresses: - "/ip4/0.0.0.0/tcp/{{.P2PPort}}" data_dir: "{{.DataDir}}" diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 0c565ed..dfb1c62 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -115,7 +115,7 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.ComponentInfo(logging.ComponentClient, "Network client connected", zap.String("namespace", cliCfg.AppName), - zap.Int("bootstrap_peer_count", len(cliCfg.BootstrapPeers)), + zap.Int("peer_count", len(cliCfg.BootstrapPeers)), ) logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway instance...") @@ -465,10 +465,10 @@ func discoverOlricServers(networkClient client.NetworkClient, logger *zap.Logger } } - // Also check bootstrap peers from config + // Also check peers from config if cfg := networkClient.Config(); cfg != nil { - for _, bootstrapAddr := range cfg.BootstrapPeers { - ma, err := multiaddr.NewMultiaddr(bootstrapAddr) + for _, peerAddr := range cfg.BootstrapPeers { + ma, err := multiaddr.NewMultiaddr(peerAddr) if err != nil { continue } @@ -514,7 +514,7 @@ type ipfsDiscoveryResult struct { } // discoverIPFSFromNodeConfigs discovers IPFS configuration from node.yaml files -// Checks bootstrap.yaml first, then bootstrap2.yaml, node.yaml, node2.yaml, node3.yaml, node4.yaml +// Checks node-1.yaml through node-5.yaml for IPFS configuration func discoverIPFSFromNodeConfigs(logger *zap.Logger) ipfsDiscoveryResult { homeDir, err := os.UserHomeDir() if err != nil { @@ -524,8 +524,8 @@ func discoverIPFSFromNodeConfigs(logger *zap.Logger) ipfsDiscoveryResult { configDir := filepath.Join(homeDir, ".orama") - // Try bootstrap.yaml first, then bootstrap2.yaml, node.yaml, node2.yaml, node3.yaml, node4.yaml - configFiles := []string{"bootstrap.yaml", "bootstrap2.yaml", "node.yaml", "node2.yaml", "node3.yaml", "node4.yaml"} + // Try all node config files for IPFS settings + configFiles := []string{"node-1.yaml", "node-2.yaml", "node-3.yaml", "node-4.yaml", "node-5.yaml"} for _, filename := range configFiles { configPath := filepath.Join(configDir, filename) diff --git a/pkg/gateway/http_gateway.go b/pkg/gateway/http_gateway.go index bf764c8..528f069 100644 --- a/pkg/gateway/http_gateway.go +++ b/pkg/gateway/http_gateway.go @@ -128,7 +128,7 @@ func (hg *HTTPGateway) initializeRoutes() error { // registerRouteHandler registers a route handler with the router func (hg *HTTPGateway) registerRouteHandler(name string, routeConfig config.RouteConfig, proxy *httputil.ReverseProxy) { pathPrefix := strings.TrimSuffix(routeConfig.PathPrefix, "/") - + // Use Mount instead of Route for wildcard path handling hg.router.Mount(pathPrefix, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { hg.handleProxyRequest(w, req, routeConfig, proxy) @@ -255,4 +255,3 @@ func isWebSocketRequest(r *http.Request) bool { return r.Header.Get("Connection") == "Upgrade" && r.Header.Get("Upgrade") == "websocket" } - diff --git a/pkg/gateway/http_helpers.go b/pkg/gateway/http_helpers.go index 388b6bb..2e78695 100644 --- a/pkg/gateway/http_helpers.go +++ b/pkg/gateway/http_helpers.go @@ -1,11 +1,11 @@ package gateway import ( - "bufio" - "encoding/json" - "fmt" - "net" - "net/http" + "bufio" + "encoding/json" + "fmt" + "net" + "net/http" ) type statusResponseWriter struct { @@ -28,23 +28,23 @@ func (w *statusResponseWriter) Write(b []byte) (int, error) { // Ensure websocket upgrades work by preserving Hijacker/Flusher/Pusher // interfaces when the underlying ResponseWriter supports them. func (w *statusResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { - if h, ok := w.ResponseWriter.(http.Hijacker); ok { - return h.Hijack() - } - return nil, nil, fmt.Errorf("hijacker not supported") + if h, ok := w.ResponseWriter.(http.Hijacker); ok { + return h.Hijack() + } + return nil, nil, fmt.Errorf("hijacker not supported") } func (w *statusResponseWriter) Flush() { - if f, ok := w.ResponseWriter.(http.Flusher); ok { - f.Flush() - } + if f, ok := w.ResponseWriter.(http.Flusher); ok { + f.Flush() + } } func (w *statusResponseWriter) Push(target string, opts *http.PushOptions) error { - if p, ok := w.ResponseWriter.(http.Pusher); ok { - return p.Push(target, opts) - } - return http.ErrNotSupported + if p, ok := w.ResponseWriter.(http.Pusher); ok { + return p.Push(target, opts) + } + return http.ErrNotSupported } // writeJSON writes JSON with status code diff --git a/pkg/gateway/https.go b/pkg/gateway/https.go index 0bd2bc3..a7218a4 100644 --- a/pkg/gateway/https.go +++ b/pkg/gateway/https.go @@ -18,10 +18,10 @@ import ( // HTTPSGateway extends HTTPGateway with HTTPS/TLS support type HTTPSGateway struct { *HTTPGateway - httpsConfig *config.HTTPSConfig - certManager *autocert.Manager - httpsServer *http.Server - httpServer *http.Server // For ACME challenge and redirect + httpsConfig *config.HTTPSConfig + certManager *autocert.Manager + httpsServer *http.Server + httpServer *http.Server // For ACME challenge and redirect } // NewHTTPSGateway creates a new HTTPS gateway with Let's Encrypt autocert @@ -214,4 +214,3 @@ func (g *HTTPSGateway) Stop() error { g.logger.ComponentInfo(logging.ComponentGeneral, "HTTPS Gateway shutdown complete") return nil } - diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index af87641..8a951c2 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -60,24 +60,24 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) // Channel to deliver PubSub messages to WS writer msgs := make(chan []byte, 128) - + // NEW: Register as local subscriber for direct message delivery localSub := &localSubscriber{ msgChan: msgs, namespace: ns, } topicKey := fmt.Sprintf("%s.%s", ns, topic) - + g.mu.Lock() g.localSubscribers[topicKey] = append(g.localSubscribers[topicKey], localSub) subscriberCount := len(g.localSubscribers[topicKey]) g.mu.Unlock() - + g.logger.ComponentInfo("gateway", "pubsub ws: registered local subscriber", zap.String("topic", topic), zap.String("namespace", ns), zap.Int("total_subscribers", subscriberCount)) - + // Unregister on close defer func() { g.mu.Lock() @@ -97,12 +97,12 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) zap.String("topic", topic), zap.Int("remaining_subscribers", remainingCount)) }() - + // Use internal auth context when interacting with client to avoid circular auth requirements ctx := client.WithInternalAuth(r.Context()) // Apply namespace isolation ctx = pubsub.WithNamespace(ctx, ns) - + // Writer loop - START THIS FIRST before libp2p subscription done := make(chan struct{}) go func() { @@ -122,11 +122,11 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) close(done) return } - + g.logger.ComponentInfo("gateway", "pubsub ws: sending message to client", zap.String("topic", topic), zap.Int("data_len", len(b))) - + // Format message as JSON envelope with data (base64 encoded), timestamp, and topic // This matches the SDK's Message interface: {data: string, timestamp: number, topic: string} envelope := map[string]interface{}{ @@ -141,11 +141,11 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) zap.Error(err)) continue } - + g.logger.ComponentDebug("gateway", "pubsub ws: envelope created", zap.String("topic", topic), zap.Int("envelope_len", len(envelopeJSON))) - + conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) if err := conn.WriteMessage(websocket.TextMessage, envelopeJSON); err != nil { g.logger.ComponentWarn("gateway", "pubsub ws: failed to write to websocket", @@ -154,7 +154,7 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) close(done) return } - + g.logger.ComponentInfo("gateway", "pubsub ws: message sent successfully", zap.String("topic", topic)) case <-ticker.C: @@ -173,7 +173,7 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) g.logger.ComponentInfo("gateway", "pubsub ws: received message from libp2p", zap.String("topic", topic), zap.Int("data_len", len(data))) - + select { case msgs <- data: g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client", @@ -195,7 +195,7 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) } g.logger.ComponentInfo("gateway", "pubsub ws: libp2p subscription established", zap.String("topic", topic)) - + // Keep subscription alive until done <-done _ = g.client.PubSub().Unsubscribe(ctx, topic) @@ -212,7 +212,7 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) if mt != websocket.TextMessage && mt != websocket.BinaryMessage { continue } - + // Filter out WebSocket heartbeat messages // Don't publish them to the topic var msg map[string]interface{} @@ -222,7 +222,7 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) continue } } - + if err := g.client.PubSub().Publish(ctx, topic, data); err != nil { // Best-effort notify client _ = conn.WriteMessage(websocket.TextMessage, []byte("publish_error")) @@ -259,12 +259,12 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "invalid base64 data") return } - + // NEW: Check for local websocket subscribers FIRST and deliver directly g.mu.RLock() localSubs := g.getLocalSubscribers(body.Topic, ns) g.mu.RUnlock() - + localDeliveryCount := 0 if len(localSubs) > 0 { for _, sub := range localSubs { @@ -280,20 +280,20 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { } } } - + g.logger.ComponentInfo("gateway", "pubsub publish: processing message", zap.String("topic", body.Topic), zap.String("namespace", ns), zap.Int("data_len", len(data)), zap.Int("local_subscribers", len(localSubs)), zap.Int("local_delivered", localDeliveryCount)) - + // Publish to libp2p asynchronously for cross-node delivery // This prevents blocking the HTTP response if libp2p network is slow go func() { publishCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - + ctx := pubsub.WithNamespace(client.WithInternalAuth(publishCtx), ns) if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil { g.logger.ComponentWarn("gateway", "async libp2p publish failed", @@ -304,7 +304,7 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { zap.String("topic", body.Topic)) } }() - + // Return immediately after local delivery // Local WebSocket subscribers already received the message writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) diff --git a/pkg/gateway/push_notifications.go b/pkg/gateway/push_notifications.go index dd7a5bb..693948a 100644 --- a/pkg/gateway/push_notifications.go +++ b/pkg/gateway/push_notifications.go @@ -181,4 +181,3 @@ func (pns *PushNotificationService) sendExpoRequest(ctx context.Context, message return nil } - diff --git a/pkg/gateway/tcp_sni_gateway.go b/pkg/gateway/tcp_sni_gateway.go index b1041d9..dbcc96d 100644 --- a/pkg/gateway/tcp_sni_gateway.go +++ b/pkg/gateway/tcp_sni_gateway.go @@ -209,4 +209,3 @@ func (g *TCPSNIGateway) Stop() error { g.logger.ComponentInfo(logging.ComponentGeneral, "TCP SNI Gateway shutdown complete") return nil } - diff --git a/pkg/ipfs/cluster.go b/pkg/ipfs/cluster.go index ee8c655..c711192 100644 --- a/pkg/ipfs/cluster.go +++ b/pkg/ipfs/cluster.go @@ -86,9 +86,9 @@ func NewClusterConfigManager(cfg *config.Config, logger *zap.Logger) (*ClusterCo } // Determine cluster path based on data directory structure - // Check if dataDir contains specific node names (e.g., ~/.orama/bootstrap, ~/.orama/bootstrap2, ~/.orama/node2-4) + // Check if dataDir contains specific node names (e.g., ~/.orama/node-1, ~/.orama/node-2, etc.) clusterPath := filepath.Join(dataDir, "ipfs-cluster") - nodeNames := []string{"bootstrap", "bootstrap2", "node2", "node3", "node4"} + nodeNames := []string{"node-1", "node-2", "node-3", "node-4", "node-5"} for _, nodeName := range nodeNames { if strings.Contains(dataDir, nodeName) { // Check if this is a direct child @@ -144,20 +144,23 @@ func (cm *ClusterConfigManager) EnsureConfig() error { return fmt.Errorf("failed to parse IPFS API URL: %w", err) } - // Determine node name - nodeName := cm.cfg.Node.Type - if nodeName == "node" || nodeName == "bootstrap" { - // Try to extract from data dir or ID - possibleNames := []string{"bootstrap", "bootstrap2", "node2", "node3", "node4"} + // Determine node name from ID or DataDir + nodeName := "node-1" // Default fallback + possibleNames := []string{"node-1", "node-2", "node-3", "node-4", "node-5"} + for _, name := range possibleNames { + if strings.Contains(cm.cfg.Node.DataDir, name) || strings.Contains(cm.cfg.Node.ID, name) { + nodeName = name + break + } + } + // If ID contains a node identifier, use it + if cm.cfg.Node.ID != "" { for _, name := range possibleNames { - if strings.Contains(cm.cfg.Node.DataDir, name) || strings.Contains(cm.cfg.Node.ID, name) { + if strings.Contains(cm.cfg.Node.ID, name) { nodeName = name break } } - if nodeName == "node" || nodeName == "bootstrap" { - nodeName = cm.cfg.Node.Type - } } // Calculate ports based on pattern @@ -219,52 +222,52 @@ func (cm *ClusterConfigManager) EnsureConfig() error { return nil } -// UpdateBootstrapPeers updates peer_addresses and peerstore with bootstrap peer information -// Returns true if update was successful, false if bootstrap is not available yet (non-fatal) -func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bool, error) { +// UpdatePeerAddresses updates peer_addresses and peerstore with peer information +// Returns true if update was successful, false if peer is not available yet (non-fatal) +func (cm *ClusterConfigManager) UpdatePeerAddresses(peerAPIURL string) (bool, error) { if cm.cfg.Database.IPFS.ClusterAPIURL == "" { return false, nil // IPFS not configured } - // Skip if this is the bootstrap node itself - if cm.cfg.Node.Type == "bootstrap" { + // Skip if this is the first node (creates the cluster, no join address) + if cm.cfg.Database.RQLiteJoinAddress == "" { return false, nil } - // Query bootstrap cluster API to get peer ID - peerID, err := getBootstrapPeerID(bootstrapAPIURL) + // Query peer cluster API to get peer ID + peerID, err := getPeerID(peerAPIURL) if err != nil { - // Non-fatal: bootstrap might not be available yet - cm.logger.Debug("Bootstrap peer not available yet, will retry", - zap.String("bootstrap_api", bootstrapAPIURL), + // Non-fatal: peer might not be available yet + cm.logger.Debug("Peer not available yet, will retry", + zap.String("peer_api", peerAPIURL), zap.Error(err)) return false, nil } if peerID == "" { - cm.logger.Debug("Bootstrap peer ID not available yet") + cm.logger.Debug("Peer ID not available yet") return false, nil } - // Extract bootstrap host and cluster port from URL - bootstrapHost, clusterPort, err := parseBootstrapHostAndPort(bootstrapAPIURL) + // Extract peer host and cluster port from URL + peerHost, clusterPort, err := parsePeerHostAndPort(peerAPIURL) if err != nil { - return false, fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err) + return false, fmt.Errorf("failed to parse peer cluster API URL: %w", err) } - // Bootstrap cluster LibP2P listens on clusterPort + 4 + // Peer cluster LibP2P listens on clusterPort + 4 // (REST API is 9094, LibP2P is 9098 = 9094 + 4) - bootstrapClusterPort := clusterPort + 4 + peerClusterPort := clusterPort + 4 // Determine IP protocol (ip4 or ip6) based on the host var ipProtocol string - if net.ParseIP(bootstrapHost).To4() != nil { + if net.ParseIP(peerHost).To4() != nil { ipProtocol = "ip4" } else { ipProtocol = "ip6" } - bootstrapPeerAddr := fmt.Sprintf("/%s/%s/tcp/%d/p2p/%s", ipProtocol, bootstrapHost, bootstrapClusterPort, peerID) + peerAddr := fmt.Sprintf("/%s/%s/tcp/%d/p2p/%s", ipProtocol, peerHost, peerClusterPort, peerID) // Load current config serviceJSONPath := filepath.Join(cm.clusterPath, "service.json") @@ -284,32 +287,32 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo if peerstoreData, err := os.ReadFile(peerstorePath); err == nil { // Only skip update if peerstore contains EXACTLY the correct address and nothing else existingAddrs := strings.Split(strings.TrimSpace(string(peerstoreData)), "\n") - if len(existingAddrs) == 1 && strings.TrimSpace(existingAddrs[0]) == bootstrapPeerAddr { - cm.logger.Debug("Bootstrap peer address already correct in peerstore", zap.String("addr", bootstrapPeerAddr)) + if len(existingAddrs) == 1 && strings.TrimSpace(existingAddrs[0]) == peerAddr { + cm.logger.Debug("Peer address already correct in peerstore", zap.String("addr", peerAddr)) needsUpdate = false } } if needsUpdate { - // Write ONLY the correct bootstrap peer address, removing any stale entries - if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil { + // Write ONLY the correct peer address, removing any stale entries + if err := os.WriteFile(peerstorePath, []byte(peerAddr+"\n"), 0644); err != nil { return false, fmt.Errorf("failed to write peerstore: %w", err) } - cm.logger.Info("Updated peerstore with bootstrap peer (cleaned stale entries)", - zap.String("addr", bootstrapPeerAddr), + cm.logger.Info("Updated peerstore with peer (cleaned stale entries)", + zap.String("addr", peerAddr), zap.String("peerstore_path", peerstorePath)) } // Then sync service.json from peerstore to keep them in sync - cfg.Cluster.PeerAddresses = []string{bootstrapPeerAddr} + cfg.Cluster.PeerAddresses = []string{peerAddr} // Save config if err := cm.saveConfig(serviceJSONPath, cfg); err != nil { return false, fmt.Errorf("failed to save config: %w", err) } - cm.logger.Info("Updated bootstrap peer configuration", - zap.String("bootstrap_peer_addr", bootstrapPeerAddr), + cm.logger.Info("Updated peer configuration", + zap.String("peer_addr", peerAddr), zap.String("peerstore_path", peerstorePath)) return true, nil @@ -480,52 +483,52 @@ func (cm *ClusterConfigManager) UpdateAllClusterPeers() (bool, error) { return true, nil } -// RepairBootstrapPeers automatically discovers and repairs bootstrap peer configuration -// Tries multiple methods: config-based discovery, bootstrap peer multiaddr, or discovery service -func (cm *ClusterConfigManager) RepairBootstrapPeers() (bool, error) { +// RepairPeerConfiguration automatically discovers and repairs peer configuration +// Tries multiple methods: config-based discovery, peer multiaddr, or discovery service +func (cm *ClusterConfigManager) RepairPeerConfiguration() (bool, error) { if cm.cfg.Database.IPFS.ClusterAPIURL == "" { return false, nil // IPFS not configured } - // Skip if this is the bootstrap node itself - if cm.cfg.Node.Type == "bootstrap" { + // Skip if this is the first node (creates the cluster, no join address) + if cm.cfg.Database.RQLiteJoinAddress == "" { return false, nil } - // Method 1: Try to use bootstrap API URL from config if available - // Check if we have a bootstrap node's cluster API URL in discovery metadata - // For now, we'll infer from bootstrap peers multiaddr + // Method 1: Try to use peer API URL from config if available + // Check if we have a peer's cluster API URL in discovery metadata + // For now, we'll infer from peers multiaddr - var bootstrapAPIURL string + var peerAPIURL string - // Try to extract from bootstrap peers multiaddr + // Try to extract from peers multiaddr if len(cm.cfg.Discovery.BootstrapPeers) > 0 { if ip := extractIPFromMultiaddrForCluster(cm.cfg.Discovery.BootstrapPeers[0]); ip != "" { // Default cluster API port is 9094 - bootstrapAPIURL = fmt.Sprintf("http://%s:9094", ip) - cm.logger.Debug("Inferred bootstrap cluster API from bootstrap peer", - zap.String("bootstrap_api", bootstrapAPIURL)) + peerAPIURL = fmt.Sprintf("http://%s:9094", ip) + cm.logger.Debug("Inferred peer cluster API from peer", + zap.String("peer_api", peerAPIURL)) } } // Fallback to localhost if nothing found (for local development) - if bootstrapAPIURL == "" { - bootstrapAPIURL = "http://localhost:9094" - cm.logger.Debug("Using localhost fallback for bootstrap cluster API") + if peerAPIURL == "" { + peerAPIURL = "http://localhost:9094" + cm.logger.Debug("Using localhost fallback for peer cluster API") } - // Try to update bootstrap peers - success, err := cm.UpdateBootstrapPeers(bootstrapAPIURL) + // Try to update peers + success, err := cm.UpdatePeerAddresses(peerAPIURL) if err != nil { return false, err } if success { - cm.logger.Info("Successfully repaired bootstrap peer configuration") + cm.logger.Info("Successfully repaired peer configuration") return true, nil } - // If update failed (bootstrap not available), return false but no error + // If update failed (peer not available), return false but no error // This allows retries later return false, nil } @@ -817,16 +820,16 @@ func ensureRequiredSection(parent map[string]interface{}, key string, defaults m } } -// parseBootstrapHostAndPort extracts host and REST API port from bootstrap API URL -func parseBootstrapHostAndPort(bootstrapAPIURL string) (host string, restAPIPort int, err error) { - u, err := url.Parse(bootstrapAPIURL) +// parsePeerHostAndPort extracts host and REST API port from peer API URL +func parsePeerHostAndPort(peerAPIURL string) (host string, restAPIPort int, err error) { + u, err := url.Parse(peerAPIURL) if err != nil { return "", 0, err } host = u.Hostname() if host == "" { - return "", 0, fmt.Errorf("no host in URL: %s", bootstrapAPIURL) + return "", 0, fmt.Errorf("no host in URL: %s", peerAPIURL) } portStr := u.Port() @@ -908,8 +911,8 @@ func parseIPFSPort(apiURL string) (int, error) { return port, nil } -// getBootstrapPeerID queries the bootstrap cluster API to get the peer ID -func getBootstrapPeerID(apiURL string) (string, error) { +// getPeerID queries the cluster API to get the peer ID +func getPeerID(apiURL string) (string, error) { // Simple HTTP client to query /peers endpoint client := &standardHTTPClient{} resp, err := client.Get(fmt.Sprintf("%s/peers", apiURL)) @@ -918,7 +921,7 @@ func getBootstrapPeerID(apiURL string) (string, error) { } // The /peers endpoint returns NDJSON (newline-delimited JSON) - // We need to read the first peer object to get the bootstrap peer ID + // We need to read the first peer object to get the peer ID dec := json.NewDecoder(bytes.NewReader(resp)) var firstPeer struct { ID string `json:"id"` @@ -1026,15 +1029,15 @@ func (cm *ClusterConfigManager) FixIPFSConfigAddresses() error { } // Try to find IPFS repo path - // Check common locations: dataDir/ipfs/repo, or dataDir/bootstrap/ipfs/repo, etc. + // Check common locations: dataDir/ipfs/repo, dataDir/node-1/ipfs/repo, etc. possiblePaths := []string{ filepath.Join(dataDir, "ipfs", "repo"), - filepath.Join(dataDir, "bootstrap", "ipfs", "repo"), - filepath.Join(dataDir, "node2", "ipfs", "repo"), - filepath.Join(dataDir, "node3", "ipfs", "repo"), - filepath.Join(filepath.Dir(dataDir), "bootstrap", "ipfs", "repo"), - filepath.Join(filepath.Dir(dataDir), "node2", "ipfs", "repo"), - filepath.Join(filepath.Dir(dataDir), "node3", "ipfs", "repo"), + filepath.Join(dataDir, "node-1", "ipfs", "repo"), + filepath.Join(dataDir, "node-2", "ipfs", "repo"), + filepath.Join(dataDir, "node-3", "ipfs", "repo"), + filepath.Join(filepath.Dir(dataDir), "node-1", "ipfs", "repo"), + filepath.Join(filepath.Dir(dataDir), "node-2", "ipfs", "repo"), + filepath.Join(filepath.Dir(dataDir), "node-3", "ipfs", "repo"), } var ipfsRepoPath string @@ -1056,7 +1059,7 @@ func (cm *ClusterConfigManager) FixIPFSConfigAddresses() error { return fmt.Errorf("failed to parse IPFS API URL: %w", err) } - // Determine gateway port (typically API port + 3079, or 8080 for bootstrap, 8081 for node2, etc.) + // Determine gateway port (typically API port + 3079, or 8080 for node-1, 8081 for node-2, etc.) gatewayPort := 8080 if strings.Contains(dataDir, "node2") { gatewayPort = 8081 diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go index 1b21b6d..af3f46e 100644 --- a/pkg/node/monitoring.go +++ b/pkg/node/monitoring.go @@ -236,13 +236,11 @@ func (n *Node) startConnectionMonitoring() { n.logger.ComponentInfo(logging.ComponentNode, "Cluster peer addresses updated during monitoring") } - // Also try to repair bootstrap peers if this is not a bootstrap node - if n.config.Node.Type != "bootstrap" { - if success, err := n.clusterConfigManager.RepairBootstrapPeers(); err != nil { - n.logger.ComponentWarn(logging.ComponentNode, "Failed to repair bootstrap peers during monitoring", zap.Error(err)) - } else if success { - n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap peer configuration repaired during monitoring") - } + // Try to repair peer configuration + if success, err := n.clusterConfigManager.RepairPeerConfiguration(); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to repair peer addresses during monitoring", zap.Error(err)) + } else if success { + n.logger.ComponentInfo(logging.ComponentNode, "Peer configuration repaired during monitoring") } } } diff --git a/pkg/node/node.go b/pkg/node/node.go index 7e9315b..397805b 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -41,7 +41,7 @@ type Node struct { clusterDiscovery *database.ClusterDiscoveryService // Peer discovery - bootstrapCancel context.CancelFunc + peerDiscoveryCancel context.CancelFunc // PubSub pubsub *pubsub.ClientAdapter @@ -77,11 +77,8 @@ func (n *Node) startRQLite(ctx context.Context) error { // Determine node identifier for log filename - use node ID for unique filenames nodeID := n.config.Node.ID if nodeID == "" { - // Fallback to type if ID is not set - nodeID = n.config.Node.Type - if nodeID == "" { - nodeID = "node" - } + // Default to "node" if ID is not set + nodeID = "node" } // Create RQLite manager @@ -90,19 +87,13 @@ func (n *Node) startRQLite(ctx context.Context) error { // Initialize cluster discovery service if LibP2P host is available if n.host != nil && n.discoveryManager != nil { - // Determine node type for cluster discovery (bootstrap or node) - discoveryNodeType := "node" - if n.config.Node.Type == "bootstrap" { - discoveryNodeType = "bootstrap" - } - - // Create cluster discovery service + // Create cluster discovery service (all nodes are unified) n.clusterDiscovery = database.NewClusterDiscoveryService( n.host, n.discoveryManager, n.rqliteManager, n.config.Node.ID, - discoveryNodeType, + "node", // Unified node type n.config.Discovery.RaftAdvAddress, n.config.Discovery.HttpAdvAddress, n.config.Node.DataDir, @@ -147,7 +138,7 @@ func (n *Node) startRQLite(ctx context.Context) error { return nil } -// extractIPFromMultiaddr extracts the IP address from a bootstrap peer multiaddr +// extractIPFromMultiaddr extracts the IP address from a peer multiaddr // Supports IP4, IP6, DNS4, DNS6, and DNSADDR protocols func extractIPFromMultiaddr(multiaddrStr string) string { ma, err := multiaddr.NewMultiaddr(multiaddrStr) @@ -192,25 +183,25 @@ func extractIPFromMultiaddr(multiaddrStr string) string { return "" } -// bootstrapPeerSource returns a PeerSource that yields peers from BootstrapPeers. -func bootstrapPeerSource(bootstrapAddrs []string, logger *zap.Logger) func(context.Context, int) <-chan peer.AddrInfo { +// peerSource returns a PeerSource that yields peers from configured peers. +func peerSource(peerAddrs []string, logger *zap.Logger) func(context.Context, int) <-chan peer.AddrInfo { return func(ctx context.Context, num int) <-chan peer.AddrInfo { out := make(chan peer.AddrInfo, num) go func() { defer close(out) count := 0 - for _, s := range bootstrapAddrs { + for _, s := range peerAddrs { if count >= num { return } ma, err := multiaddr.NewMultiaddr(s) if err != nil { - logger.Debug("invalid bootstrap multiaddr", zap.String("addr", s), zap.Error(err)) + logger.Debug("invalid peer multiaddr", zap.String("addr", s), zap.Error(err)) continue } ai, err := peer.AddrInfoFromP2pAddr(ma) if err != nil { - logger.Debug("failed to parse bootstrap peer", zap.String("addr", s), zap.Error(err)) + logger.Debug("failed to parse peer address", zap.String("addr", s), zap.Error(err)) continue } select { @@ -225,8 +216,8 @@ func bootstrapPeerSource(bootstrapAddrs []string, logger *zap.Logger) func(conte } } -// hasBootstrapConnections checks if we're connected to any bootstrap peers -func (n *Node) hasBootstrapConnections() bool { +// hasPeerConnections checks if we're connected to any peers +func (n *Node) hasPeerConnections() bool { if n.host == nil || len(n.config.Discovery.BootstrapPeers) == 0 { return false } @@ -236,10 +227,10 @@ func (n *Node) hasBootstrapConnections() bool { return false } - // Parse bootstrap peer IDs - bootstrapPeerIDs := make(map[peer.ID]bool) - for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers { - ma, err := multiaddr.NewMultiaddr(bootstrapAddr) + // Parse peer IDs + peerIDs := make(map[peer.ID]bool) + for _, peerAddr := range n.config.Discovery.BootstrapPeers { + ma, err := multiaddr.NewMultiaddr(peerAddr) if err != nil { continue } @@ -247,12 +238,12 @@ func (n *Node) hasBootstrapConnections() bool { if err != nil { continue } - bootstrapPeerIDs[peerInfo.ID] = true + peerIDs[peerInfo.ID] = true } - // Check if any connected peer is a bootstrap peer + // Check if any connected peer is in our peer list for _, peerID := range connectedPeers { - if bootstrapPeerIDs[peerID] { + if peerIDs[peerID] { return true } } @@ -287,8 +278,8 @@ func addJitter(interval time.Duration) time.Duration { return result } -// connectToBootstrapPeer connects to a single bootstrap peer -func (n *Node) connectToBootstrapPeer(ctx context.Context, addr string) error { +// connectToPeerAddr connects to a single peer address +func (n *Node) connectToPeerAddr(ctx context.Context, addr string) error { ma, err := multiaddr.NewMultiaddr(addr) if err != nil { return fmt.Errorf("invalid multiaddr: %w", err) @@ -300,16 +291,16 @@ func (n *Node) connectToBootstrapPeer(ctx context.Context, addr string) error { return fmt.Errorf("failed to extract peer info: %w", err) } - // Avoid dialing ourselves: if the bootstrap address resolves to our own peer ID, skip. + // Avoid dialing ourselves: if the address resolves to our own peer ID, skip. if n.host != nil && peerInfo.ID == n.host.ID() { - n.logger.ComponentDebug(logging.ComponentNode, "Skipping bootstrap address because it resolves to self", + n.logger.ComponentDebug(logging.ComponentNode, "Skipping peer address because it resolves to self", zap.String("addr", addr), zap.String("peer_id", peerInfo.ID.String())) return nil } // Log resolved peer info prior to connect - n.logger.ComponentDebug(logging.ComponentNode, "Resolved bootstrap peer", + n.logger.ComponentDebug(logging.ComponentNode, "Resolved peer", zap.String("peer_id", peerInfo.ID.String()), zap.String("addr", addr), zap.Int("addr_count", len(peerInfo.Addrs)), @@ -320,28 +311,28 @@ func (n *Node) connectToBootstrapPeer(ctx context.Context, addr string) error { return fmt.Errorf("failed to connect to peer: %w", err) } - n.logger.Info("Connected to bootstrap peer", + n.logger.Info("Connected to peer", zap.String("peer", peerInfo.ID.String()), zap.String("addr", addr)) return nil } -// connectToBootstrapPeers connects to configured LibP2P bootstrap peers -func (n *Node) connectToBootstrapPeers(ctx context.Context) error { +// connectToPeers connects to configured LibP2P peers +func (n *Node) connectToPeers(ctx context.Context) error { if len(n.config.Discovery.BootstrapPeers) == 0 { - n.logger.ComponentDebug(logging.ComponentNode, "No bootstrap peers configured") + n.logger.ComponentDebug(logging.ComponentNode, "No peers configured") return nil } - // Use passed context with a reasonable timeout for bootstrap connections + // Use passed context with a reasonable timeout for peer connections connectCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers { - if err := n.connectToBootstrapPeer(connectCtx, bootstrapAddr); err != nil { - n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to bootstrap peer", - zap.String("addr", bootstrapAddr), + for _, peerAddr := range n.config.Discovery.BootstrapPeers { + if err := n.connectToPeerAddr(connectCtx, peerAddr); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to peer", + zap.String("addr", peerAddr), zap.Error(err)) continue } @@ -400,7 +391,7 @@ func (n *Node) startLibP2P() error { libp2p.EnableRelay(), libp2p.NATPortMap(), libp2p.EnableAutoRelayWithPeerSource( - bootstrapPeerSource(n.config.Discovery.BootstrapPeers, n.logger.Logger), + peerSource(n.config.Discovery.BootstrapPeers, n.logger.Logger), ), ) } @@ -426,59 +417,59 @@ func (n *Node) startLibP2P() error { n.pubsub = pubsub.NewClientAdapter(ps, n.config.Discovery.NodeNamespace) n.logger.Info("Initialized pubsub adapter on namespace", zap.String("namespace", n.config.Discovery.NodeNamespace)) - // Log configured bootstrap peers + // Log configured peers if len(n.config.Discovery.BootstrapPeers) > 0 { - n.logger.ComponentInfo(logging.ComponentNode, "Configured bootstrap peers", + n.logger.ComponentInfo(logging.ComponentNode, "Configured peers", zap.Strings("peers", n.config.Discovery.BootstrapPeers)) } else { - n.logger.ComponentDebug(logging.ComponentNode, "No bootstrap peers configured") + n.logger.ComponentDebug(logging.ComponentNode, "No peers configured") } - // Connect to LibP2P bootstrap peers if configured - if err := n.connectToBootstrapPeers(context.Background()); err != nil { - n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to bootstrap peers", zap.Error(err)) - // Don't fail - continue without bootstrap connections + // Connect to LibP2P peers if configured + if err := n.connectToPeers(context.Background()); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to connect to peers", zap.Error(err)) + // Don't fail - continue without peer connections } - // Start exponential backoff reconnection for bootstrap peers + // Start exponential backoff reconnection for peers if len(n.config.Discovery.BootstrapPeers) > 0 { - bootstrapCtx, cancel := context.WithCancel(context.Background()) - n.bootstrapCancel = cancel + peerCtx, cancel := context.WithCancel(context.Background()) + n.peerDiscoveryCancel = cancel go func() { interval := 5 * time.Second consecutiveFailures := 0 - n.logger.ComponentInfo(logging.ComponentNode, "Starting bootstrap peer reconnection with exponential backoff", + n.logger.ComponentInfo(logging.ComponentNode, "Starting peer reconnection with exponential backoff", zap.Duration("initial_interval", interval), zap.Duration("max_interval", 10*time.Minute)) for { select { - case <-bootstrapCtx.Done(): - n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap reconnection loop stopped") + case <-peerCtx.Done(): + n.logger.ComponentDebug(logging.ComponentNode, "Peer reconnection loop stopped") return default: } // Check if we need to attempt connection - if !n.hasBootstrapConnections() { - n.logger.ComponentDebug(logging.ComponentNode, "Attempting bootstrap peer connection", + if !n.hasPeerConnections() { + n.logger.ComponentDebug(logging.ComponentNode, "Attempting peer connection", zap.Duration("current_interval", interval), zap.Int("consecutive_failures", consecutiveFailures)) - if err := n.connectToBootstrapPeers(context.Background()); err != nil { + if err := n.connectToPeers(context.Background()); err != nil { consecutiveFailures++ // Calculate next backoff interval jitteredInterval := addJitter(interval) - n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap connection failed, backing off", + n.logger.ComponentDebug(logging.ComponentNode, "Peer connection failed, backing off", zap.Error(err), zap.Duration("next_attempt_in", jitteredInterval), zap.Int("consecutive_failures", consecutiveFailures)) // Sleep with jitter select { - case <-bootstrapCtx.Done(): + case <-peerCtx.Done(): return case <-time.After(jitteredInterval): } @@ -488,14 +479,14 @@ func (n *Node) startLibP2P() error { // Log interval increases occasionally to show progress if consecutiveFailures%5 == 0 { - n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap connection still failing", + n.logger.ComponentInfo(logging.ComponentNode, "Peer connection still failing", zap.Int("consecutive_failures", consecutiveFailures), zap.Duration("current_interval", interval)) } } else { // Success! Reset interval and counters if consecutiveFailures > 0 { - n.logger.ComponentInfo(logging.ComponentNode, "Successfully connected to bootstrap peers", + n.logger.ComponentInfo(logging.ComponentNode, "Successfully connected to peers", zap.Int("failures_overcome", consecutiveFailures)) } interval = 5 * time.Second @@ -503,15 +494,15 @@ func (n *Node) startLibP2P() error { // Wait 30 seconds before checking connection again select { - case <-bootstrapCtx.Done(): + case <-peerCtx.Done(): return case <-time.After(30 * time.Second): } } } else { - // We have bootstrap connections, just wait and check periodically + // We have peer connections, just wait and check periodically select { - case <-bootstrapCtx.Done(): + case <-peerCtx.Done(): return case <-time.After(30 * time.Second): } @@ -520,15 +511,15 @@ func (n *Node) startLibP2P() error { }() } - // Add bootstrap peers to peerstore for peer exchange + // Add peers to peerstore for peer exchange if len(n.config.Discovery.BootstrapPeers) > 0 { - n.logger.ComponentInfo(logging.ComponentNode, "Adding bootstrap peers to peerstore") - for _, bootstrapAddr := range n.config.Discovery.BootstrapPeers { - if ma, err := multiaddr.NewMultiaddr(bootstrapAddr); err == nil { + n.logger.ComponentInfo(logging.ComponentNode, "Adding peers to peerstore") + for _, peerAddr := range n.config.Discovery.BootstrapPeers { + if ma, err := multiaddr.NewMultiaddr(peerAddr); err == nil { if peerInfo, err := peer.AddrInfoFromP2pAddr(ma); err == nil { // Add to peerstore with longer TTL for peer exchange n.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24) - n.logger.ComponentDebug(logging.ComponentNode, "Added bootstrap peer to peerstore", + n.logger.ComponentDebug(logging.ComponentNode, "Added peer to peerstore", zap.String("peer", peerInfo.ID.String())) } } @@ -651,9 +642,9 @@ func (n *Node) Stop() error { n.clusterDiscovery.Stop() } - // Stop bootstrap reconnection loop - if n.bootstrapCancel != nil { - n.bootstrapCancel() + // Stop peer reconnection loop + if n.peerDiscoveryCancel != nil { + n.peerDiscoveryCancel() } // Stop peer discovery @@ -685,7 +676,7 @@ func (n *Node) startHTTPGateway(ctx context.Context) error { // Create separate logger for unified gateway logFile := filepath.Join(os.ExpandEnv(n.config.Node.DataDir), "..", "logs", fmt.Sprintf("gateway-%s.log", n.config.HTTPGateway.NodeName)) - + // Ensure logs directory exists logsDir := filepath.Dir(logFile) if err := os.MkdirAll(logsDir, 0755); err != nil { @@ -795,16 +786,14 @@ func (n *Node) startIPFSClusterConfig() error { return fmt.Errorf("failed to ensure cluster config: %w", err) } - // Try to repair bootstrap peer configuration automatically - // This will be retried periodically if bootstrap is not available yet - if n.config.Node.Type != "bootstrap" { - if success, err := cm.RepairBootstrapPeers(); err != nil { - n.logger.ComponentWarn(logging.ComponentNode, "Failed to repair bootstrap peers, will retry later", zap.Error(err)) - } else if success { - n.logger.ComponentInfo(logging.ComponentNode, "Bootstrap peer configuration repaired successfully") - } else { - n.logger.ComponentDebug(logging.ComponentNode, "Bootstrap peer not available yet, will retry periodically") - } + // Try to repair peer configuration automatically + // This will be retried periodically if peer is not available yet + if success, err := cm.RepairPeerConfiguration(); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to repair peer configuration, will retry later", zap.Error(err)) + } else if success { + n.logger.ComponentInfo(logging.ComponentNode, "Peer configuration repaired successfully") + } else { + n.logger.ComponentDebug(logging.ComponentNode, "Peer not available yet, will retry periodically") } n.logger.ComponentInfo(logging.ComponentNode, "IPFS Cluster configuration initialized") diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 8ee0ab4..edcb6c9 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -140,7 +140,7 @@ func TestLoadOrCreateIdentity(t *testing.T) { }) } -func TestHashBootstrapConnections(t *testing.T) { +func TestHasPeerConnections(t *testing.T) { cfg := &config.Config{} n, err := NewNode(cfg) @@ -148,8 +148,8 @@ func TestHashBootstrapConnections(t *testing.T) { t.Fatalf("NewNode() error: %v", err) } - // Assert: Does not have bootstrap connections - conns := n.hasBootstrapConnections() + // Assert: Does not have peer connections + conns := n.hasPeerConnections() if conns != false { t.Fatalf("expected false, got %v", conns) } @@ -162,13 +162,13 @@ func TestHashBootstrapConnections(t *testing.T) { defer h.Close() n.host = h - conns = n.hasBootstrapConnections() + conns = n.hasPeerConnections() if conns != false { t.Fatalf("expected false, got %v", conns) } - // Assert: Return true if connected to at least one bootstrap peer - t.Run("returns true when connected to at least one configured bootstrap peer", func(t *testing.T) { + // Assert: Return true if connected to at least one peer + t.Run("returns true when connected to at least one configured peer", func(t *testing.T) { // Fresh node and config cfg := &config.Config{} n2, err := NewNode(cfg) @@ -189,7 +189,7 @@ func TestHashBootstrapConnections(t *testing.T) { } defer hB.Close() - // Build B's bootstrap multiaddr: /p2p/ + // Build B's peer multiaddr: /p2p/ var base multiaddr.Multiaddr for _, a := range hB.Addrs() { if strings.Contains(a.String(), "/tcp/") { @@ -204,11 +204,11 @@ func TestHashBootstrapConnections(t *testing.T) { if err != nil { t.Fatalf("NewMultiaddr(/p2p/): %v", err) } - bootstrap := base.Encapsulate(pidMA).String() + peerAddr := base.Encapsulate(pidMA).String() - // Configure node A with B as a bootstrap peer + // Configure node A with B as a peer n2.host = hA - n2.config.Discovery.BootstrapPeers = []string{bootstrap} + n2.config.Discovery.BootstrapPeers = []string{peerAddr} // Connect A -> B ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -229,13 +229,13 @@ func TestHashBootstrapConnections(t *testing.T) { time.Sleep(10 * time.Millisecond) } - // Assert: hasBootstrapConnections returns true - if !n2.hasBootstrapConnections() { - t.Fatalf("expected hasBootstrapConnections() to be true") + // Assert: hasPeerConnections returns true + if !n2.hasPeerConnections() { + t.Fatalf("expected hasPeerConnections() to be true") } }) - t.Run("returns false when connected peers are not in the bootstrap list", func(t *testing.T) { + t.Run("returns false when connected peers are not in the peer list", func(t *testing.T) { // Fresh node and config cfg := &config.Config{} n2, err := NewNode(cfg) @@ -262,7 +262,7 @@ func TestHashBootstrapConnections(t *testing.T) { } defer hC.Close() - // Build C's bootstrap multiaddr: /p2p/ + // Build C's peer multiaddr: /p2p/ var baseC multiaddr.Multiaddr for _, a := range hC.Addrs() { if strings.Contains(a.String(), "/tcp/") { @@ -277,13 +277,13 @@ func TestHashBootstrapConnections(t *testing.T) { if err != nil { t.Fatalf("NewMultiaddr(/p2p/): %v", err) } - bootstrapC := baseC.Encapsulate(pidC).String() + peerC := baseC.Encapsulate(pidC).String() - // Configure node A with ONLY C as a bootstrap peer + // Configure node A with ONLY C as a peer n2.host = hA - n2.config.Discovery.BootstrapPeers = []string{bootstrapC} + n2.config.Discovery.BootstrapPeers = []string{peerC} - // Connect A -> B (but C is in the bootstrap list, not B) + // Connect A -> B (but C is in the peer list, not B) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() if err := hA.Connect(ctx, peer.AddrInfo{ID: hB.ID(), Addrs: hB.Addrs()}); err != nil { @@ -302,9 +302,9 @@ func TestHashBootstrapConnections(t *testing.T) { time.Sleep(10 * time.Millisecond) } - // Assert: hasBootstrapConnections should be false (connected peer is not in bootstrap list) - if n2.hasBootstrapConnections() { - t.Fatalf("expected hasBootstrapConnections() to be false") + // Assert: hasPeerConnections should be false (connected peer is not in peer list) + if n2.hasPeerConnections() { + t.Fatalf("expected hasPeerConnections() to be false") } }) diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index 50204b1..dd357da 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -433,7 +433,7 @@ func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{ for _, peer := range c.knownPeers { // CRITICAL FIX: Include ALL peers (including self) in peers.json - // When using bootstrap-expect with recovery, RQLite needs the complete + // When using expect configuration with recovery, RQLite needs the complete // expected cluster configuration to properly form consensus. // The peers.json file is used by RQLite's recovery mechanism to know // what the full cluster membership should be, including the local node. @@ -584,25 +584,34 @@ func (c *ClusterDiscoveryService) HasRecentPeersJSON() bool { return time.Since(c.lastUpdate) < 5*time.Minute } -// FindJoinTargets discovers join targets via LibP2P, prioritizing bootstrap nodes +// FindJoinTargets discovers join targets via LibP2P func (c *ClusterDiscoveryService) FindJoinTargets() []string { c.mu.RLock() defer c.mu.RUnlock() targets := []string{} - // Prioritize bootstrap nodes + // All nodes are equal - prioritize by Raft log index (more advanced = better) + type nodeWithIndex struct { + address string + logIndex uint64 + } + var nodes []nodeWithIndex for _, peer := range c.knownPeers { - if peer.NodeType == "bootstrap" { - targets = append(targets, peer.RaftAddress) + nodes = append(nodes, nodeWithIndex{peer.RaftAddress, peer.RaftLogIndex}) + } + + // Sort by log index descending (higher log index = more up-to-date) + for i := 0; i < len(nodes)-1; i++ { + for j := i + 1; j < len(nodes); j++ { + if nodes[j].logIndex > nodes[i].logIndex { + nodes[i], nodes[j] = nodes[j], nodes[i] + } } } - // Add other nodes as fallback - for _, peer := range c.knownPeers { - if peer.NodeType != "bootstrap" { - targets = append(targets, peer.RaftAddress) - } + for _, n := range nodes { + targets = append(targets, n.address) } return targets diff --git a/pkg/rqlite/metrics.go b/pkg/rqlite/metrics.go index de21c5f..3e8516a 100644 --- a/pkg/rqlite/metrics.go +++ b/pkg/rqlite/metrics.go @@ -8,18 +8,18 @@ import ( func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics { c.mu.RLock() defer c.mu.RUnlock() - + activeCount := 0 inactiveCount := 0 totalHealth := 0.0 currentLeader := "" - + now := time.Now() - + for nodeID, health := range c.peerHealth { if health.Status == "active" { activeCount++ - + // Calculate health score (0-100) based on last seen timeSinceLastSeen := now.Sub(health.LastSeen) healthScore := 100.0 @@ -34,22 +34,22 @@ func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics { } else { inactiveCount++ } - - // Try to determine leader + + // Try to determine leader (highest log index is likely the leader) if peer, ok := c.knownPeers[nodeID]; ok { // We'd need to check the actual leader status from RQLite - // For now, bootstrap nodes are more likely to be leader - if peer.NodeType == "bootstrap" && currentLeader == "" { + // For now, use highest log index as heuristic + if currentLeader == "" || peer.RaftLogIndex > c.knownPeers[currentLeader].RaftLogIndex { currentLeader = nodeID } } } - + averageHealth := 0.0 if activeCount > 0 { averageHealth = totalHealth / float64(activeCount) } - + // Determine discovery status discoveryStatus := "healthy" if len(c.knownPeers) == 0 { @@ -59,7 +59,7 @@ func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics { } else if averageHealth < 50 { discoveryStatus = "degraded" } - + return &ClusterMetrics{ ClusterSize: len(c.knownPeers), ActiveNodes: activeCount, @@ -71,4 +71,3 @@ func (c *ClusterDiscoveryService) GetMetrics() *ClusterMetrics { AveragePeerHealth: averageHealth, } } - diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 9cfa881..c00fa55 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -25,7 +25,7 @@ type RQLiteManager struct { config *config.DatabaseConfig discoverConfig *config.DiscoveryConfig dataDir string - nodeType string // "bootstrap" or "node" + nodeType string // Node type identifier logger *zap.Logger cmd *exec.Cmd connection *gorqlite.Connection @@ -81,7 +81,7 @@ func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) { r.discoveryService = service } -// SetNodeType sets the node type for this RQLite manager ("bootstrap" or "node") +// SetNodeType sets the node type for this RQLite manager func (r *RQLiteManager) SetNodeType(nodeType string) { if nodeType != "" { r.nodeType = nodeType @@ -120,7 +120,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error { // CRITICAL FIX: Ensure peers.json exists with minimum cluster size BEFORE starting RQLite // This prevents split-brain where each node starts as a single-node cluster // We NEVER start as a single-node cluster - we wait indefinitely until minimum cluster size is met - // This applies to ALL nodes (bootstrap AND regular nodes with join addresses) + // This applies to ALL nodes (with or without join addresses) if r.discoveryService != nil { r.logger.Info("Ensuring peers.json exists with minimum cluster size before RQLite startup", zap.String("policy", "will wait indefinitely - never start as single-node cluster"), @@ -289,23 +289,23 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) if nodeType == "" { nodeType = "node" } - + // Create logs directory logsDir := filepath.Join(filepath.Dir(r.dataDir), "logs") if err := os.MkdirAll(logsDir, 0755); err != nil { return fmt.Errorf("failed to create logs directory at %s: %w", logsDir, err) } - + // Open log file for RQLite output logPath := filepath.Join(logsDir, fmt.Sprintf("rqlite-%s.log", nodeType)) logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return fmt.Errorf("failed to open RQLite log file at %s: %w", logPath, err) } - + r.logger.Info("RQLite logs will be written to file", zap.String("path", logPath)) - + r.cmd.Stdout = logFile r.cmd.Stderr = logFile @@ -464,7 +464,7 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error { // All nodes may need time to open the store during recovery // Use consistent timeout for cluster consistency - maxAttempts := 180 // 180 seconds (3 minutes) for all nodes + maxAttempts := 180 // 180 seconds (3 minutes) for all nodes for i := 0; i < maxAttempts; i++ { select { @@ -517,7 +517,7 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error { return fmt.Errorf("RQLite did not become ready within timeout") } -// waitForLeadership waits for RQLite to establish leadership (for bootstrap nodes) +// GetConnection returns the RQLite connection // GetConnection returns the RQLite connection func (r *RQLiteManager) GetConnection() *gorqlite.Connection { return r.connection @@ -708,7 +708,6 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error { return nil } - // exponentialBackoff calculates exponential backoff duration with jitter func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration, maxDelay time.Duration) time.Duration { // Calculate exponential backoff: baseDelay * 2^attempt @@ -745,7 +744,7 @@ func (r *RQLiteManager) recoverCluster(ctx context.Context, peersJSONPath string } // Restart RQLite using launchProcess to ensure all join/backoff logic is applied - // This includes: join address handling, join retries, bootstrap-expect, etc. + // This includes: join address handling, join retries, expect configuration, etc. r.logger.Info("Restarting RQLite (will auto-recover using peers.json)") if err := r.launchProcess(ctx, rqliteDataDir); err != nil { return fmt.Errorf("failed to restart RQLite process: %w", err) @@ -864,7 +863,6 @@ func (r *RQLiteManager) clearRaftState(rqliteDataDir string) error { return nil } - // isInSplitBrainState detects if we're in a split-brain scenario where all nodes // are followers with no peers (each node thinks it's alone) func (r *RQLiteManager) isInSplitBrainState() bool { @@ -1182,9 +1180,9 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql } // CRITICAL FIX: Skip recovery if no peers were discovered (other than ourselves) - // Only ourselves in the cluster means this is a fresh bootstrap, not a recovery scenario + // Only ourselves in the cluster means this is a fresh cluster, not a recovery scenario if discoveredPeers <= 1 { - r.logger.Info("No peers discovered during pre-start discovery window - skipping recovery (fresh bootstrap)", + r.logger.Info("No peers discovered during pre-start discovery window - skipping recovery (fresh cluster)", zap.Int("discovered_peers", discoveredPeers)) return nil }