From 0b24c66d56ad01c6f718040eba0f55db3a63c59d Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Sun, 9 Nov 2025 19:20:08 +0200 Subject: [PATCH] works --- CHANGELOG.md | 15 ++ Makefile | 2 +- pkg/environments/development/config.go | 146 ++++++------- pkg/environments/development/health.go | 6 +- pkg/environments/development/runner.go | 274 +++++++++++++++++++++++-- 5 files changed, 353 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c56bb54..f0b549d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.60.1] - 2025-11-09 + +### Added +- Improved IPFS Cluster startup logic in development environment to ensure proper peer discovery and configuration. + +### Changed +- Refactored IPFS Cluster initialization in the development environment to use a multi-phase startup (bootstrap first, then followers) and explicitly clean stale cluster state (pebble, peerstore) before initialization. + +### Deprecated + +### Removed + +### Fixed +- Fixed an issue where IPFS Cluster nodes in the development environment might fail to join due to incorrect bootstrap configuration or stale state. + ## [0.60.0] - 2025-11-09 ### Added diff --git a/Makefile b/Makefile index 6ee06dc..7946d0f 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.60.0 +VERSION := 0.60.1 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/environments/development/config.go b/pkg/environments/development/config.go index a45375b..f1daa52 100644 --- a/pkg/environments/development/config.go +++ b/pkg/environments/development/config.go @@ -120,26 +120,26 @@ func (ce *ConfigEnsurer) ensureBootstrap() error { // Ensure bootstrap config - always regenerate to ensure template fixes are applied bootstrapConfigPath := filepath.Join(ce.debrosDir, "bootstrap.yaml") - data := templates.BootstrapConfigData{ - NodeID: "bootstrap", - P2PPort: 4001, - DataDir: bootstrapDir, - RQLiteHTTPPort: 5001, - RQLiteRaftPort: 7001, - ClusterAPIPort: 9094, - IPFSAPIPort: 4501, - } + data := templates.BootstrapConfigData{ + NodeID: "bootstrap", + P2PPort: 4001, + DataDir: bootstrapDir, + RQLiteHTTPPort: 5001, + RQLiteRaftPort: 7001, + ClusterAPIPort: 9094, + IPFSAPIPort: 4501, + } - config, err := templates.RenderBootstrapConfig(data) - if err != nil { - return fmt.Errorf("failed to render bootstrap config: %w", err) - } + config, err := templates.RenderBootstrapConfig(data) + if err != nil { + return fmt.Errorf("failed to render bootstrap config: %w", err) + } - if err := os.WriteFile(bootstrapConfigPath, []byte(config), 0644); err != nil { - return fmt.Errorf("failed to write bootstrap config: %w", err) - } + if err := os.WriteFile(bootstrapConfigPath, []byte(config), 0644); err != nil { + return fmt.Errorf("failed to write bootstrap config: %w", err) + } - fmt.Printf("✓ Generated bootstrap.yaml\n") + fmt.Printf("✓ Generated bootstrap.yaml\n") return nil } @@ -171,32 +171,32 @@ func (ce *ConfigEnsurer) ensureNode2And3() error { configPath := filepath.Join(ce.debrosDir, fmt.Sprintf("%s.yaml", node.name)) // Always regenerate to ensure template fixes are applied - if err := os.MkdirAll(nodeDir, 0755); err != nil { - return fmt.Errorf("failed to create %s directory: %w", node.name, err) - } + if err := os.MkdirAll(nodeDir, 0755); err != nil { + return fmt.Errorf("failed to create %s directory: %w", node.name, err) + } - data := templates.NodeConfigData{ - NodeID: node.name, - P2PPort: node.p2pPort, - DataDir: nodeDir, - RQLiteHTTPPort: node.rqliteHTTPPort, - RQLiteRaftPort: node.rqliteRaftPort, - RQLiteJoinAddress: "localhost:7001", - BootstrapPeers: []string{bootstrapMultiaddr}, - ClusterAPIPort: node.clusterAPIPort, - IPFSAPIPort: node.ipfsAPIPort, - } + data := templates.NodeConfigData{ + NodeID: node.name, + P2PPort: node.p2pPort, + DataDir: nodeDir, + RQLiteHTTPPort: node.rqliteHTTPPort, + RQLiteRaftPort: node.rqliteRaftPort, + RQLiteJoinAddress: "localhost:7001", + BootstrapPeers: []string{bootstrapMultiaddr}, + ClusterAPIPort: node.clusterAPIPort, + IPFSAPIPort: node.ipfsAPIPort, + } - config, err := templates.RenderNodeConfig(data) - if err != nil { - return fmt.Errorf("failed to render %s config: %w", node.name, err) - } + config, err := templates.RenderNodeConfig(data) + if err != nil { + return fmt.Errorf("failed to render %s config: %w", node.name, err) + } - if err := os.WriteFile(configPath, []byte(config), 0644); err != nil { - return fmt.Errorf("failed to write %s config: %w", node.name, err) - } + if err := os.WriteFile(configPath, []byte(config), 0644); err != nil { + return fmt.Errorf("failed to write %s config: %w", node.name, err) + } - fmt.Printf("✓ Generated %s.yaml\n", node.name) + fmt.Printf("✓ Generated %s.yaml\n", node.name) } return nil @@ -207,32 +207,32 @@ func (ce *ConfigEnsurer) ensureGateway() error { configPath := filepath.Join(ce.debrosDir, "gateway.yaml") // Always regenerate to ensure template fixes are applied - // Get bootstrap multiaddr - bootstrapInfo, err := encryption.LoadIdentity(filepath.Join(ce.debrosDir, "bootstrap", "identity.key")) - if err != nil { - return fmt.Errorf("failed to load bootstrap identity: %w", err) - } + // Get bootstrap multiaddr + bootstrapInfo, err := encryption.LoadIdentity(filepath.Join(ce.debrosDir, "bootstrap", "identity.key")) + if err != nil { + return fmt.Errorf("failed to load bootstrap identity: %w", err) + } - bootstrapMultiaddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/4001/p2p/%s", bootstrapInfo.PeerID.String()) + bootstrapMultiaddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/4001/p2p/%s", bootstrapInfo.PeerID.String()) - data := templates.GatewayConfigData{ - ListenPort: 6001, - BootstrapPeers: []string{bootstrapMultiaddr}, - OlricServers: []string{"127.0.0.1:3320"}, - ClusterAPIPort: 9094, - IPFSAPIPort: 4501, - } + data := templates.GatewayConfigData{ + ListenPort: 6001, + BootstrapPeers: []string{bootstrapMultiaddr}, + OlricServers: []string{"127.0.0.1:3320"}, + ClusterAPIPort: 9094, + IPFSAPIPort: 4501, + } - config, err := templates.RenderGatewayConfig(data) - if err != nil { - return fmt.Errorf("failed to render gateway config: %w", err) - } + config, err := templates.RenderGatewayConfig(data) + if err != nil { + return fmt.Errorf("failed to render gateway config: %w", err) + } - if err := os.WriteFile(configPath, []byte(config), 0644); err != nil { - return fmt.Errorf("failed to write gateway config: %w", err) - } + if err := os.WriteFile(configPath, []byte(config), 0644); err != nil { + return fmt.Errorf("failed to write gateway config: %w", err) + } - fmt.Printf("✓ Generated gateway.yaml\n") + fmt.Printf("✓ Generated gateway.yaml\n") return nil } @@ -241,22 +241,22 @@ func (ce *ConfigEnsurer) ensureOlric() error { configPath := filepath.Join(ce.debrosDir, "olric-config.yaml") // Always regenerate to ensure template fixes are applied - data := templates.OlricConfigData{ - BindAddr: "127.0.0.1", - HTTPPort: 3320, - MemberlistPort: 3322, - } + data := templates.OlricConfigData{ + BindAddr: "127.0.0.1", + HTTPPort: 3320, + MemberlistPort: 3322, + } - config, err := templates.RenderOlricConfig(data) - if err != nil { - return fmt.Errorf("failed to render olric config: %w", err) - } + config, err := templates.RenderOlricConfig(data) + if err != nil { + return fmt.Errorf("failed to render olric config: %w", err) + } - if err := os.WriteFile(configPath, []byte(config), 0644); err != nil { - return fmt.Errorf("failed to write olric config: %w", err) - } + if err := os.WriteFile(configPath, []byte(config), 0644); err != nil { + return fmt.Errorf("failed to write olric config: %w", err) + } - fmt.Printf("✓ Generated olric-config.yaml\n") + fmt.Printf("✓ Generated olric-config.yaml\n") return nil } diff --git a/pkg/environments/development/health.go b/pkg/environments/development/health.go index 3e5c930..e4aa274 100644 --- a/pkg/environments/development/health.go +++ b/pkg/environments/development/health.go @@ -37,8 +37,8 @@ func (pm *ProcessManager) IPFSHealthCheck(ctx context.Context, nodes []ipfsNodeI for _, line := range peerLines { if strings.TrimSpace(line) != "" { peerCount++ - } - } + } +} if peerCount < 2 { result.Details += fmt.Sprintf("%s: only %d peers (want 2+); ", node.name, peerCount) @@ -99,7 +99,7 @@ func (pm *ProcessManager) checkRQLiteNode(ctx context.Context, name string, http if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { result.Details = fmt.Sprintf("decode error: %v", err) return result - } +} // Check the store.raft structure (RQLite 8 format) store, ok := status["store"].(map[string]interface{}) diff --git a/pkg/environments/development/runner.go b/pkg/environments/development/runner.go index c63135c..8715247 100644 --- a/pkg/environments/development/runner.go +++ b/pkg/environments/development/runner.go @@ -418,14 +418,108 @@ func (pm *ProcessManager) startIPFSCluster(ctx context.Context) error { } } - for _, node := range nodes { - serviceJSON := filepath.Join(node.clusterPath, "service.json") - if _, err := os.Stat(serviceJSON); os.IsNotExist(err) { - os.MkdirAll(node.clusterPath, 0755) - fmt.Fprintf(pm.logWriter, " Initializing IPFS Cluster (%s)...\n", node.name) - cmd := exec.CommandContext(ctx, "ipfs-cluster-service", "init", "--force") - cmd.Env = append(os.Environ(), fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath)) - cmd.Run() + // Read cluster secret to ensure all nodes use the same PSK + secretPath := filepath.Join(pm.debrosDir, "cluster-secret") + clusterSecret, err := os.ReadFile(secretPath) + if err != nil { + return fmt.Errorf("failed to read cluster secret: %w", err) + } + clusterSecretHex := strings.TrimSpace(string(clusterSecret)) + + // Phase 1: Initialize and start bootstrap IPFS Cluster, then read its identity + bootstrapMultiaddr := "" + { + node := nodes[0] // bootstrap + + // Always clean stale cluster state to ensure fresh initialization with correct secret + if err := pm.cleanClusterState(node.clusterPath); err != nil { + fmt.Fprintf(pm.logWriter, " Warning: failed to clean cluster state for %s: %v\n", node.name, err) + } + + os.MkdirAll(node.clusterPath, 0755) + fmt.Fprintf(pm.logWriter, " Initializing IPFS Cluster (%s)...\n", node.name) + cmd := exec.CommandContext(ctx, "ipfs-cluster-service", "init", "--force") + cmd.Env = append(os.Environ(), + fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath), + fmt.Sprintf("CLUSTER_SECRET=%s", clusterSecretHex), + ) + if output, err := cmd.CombinedOutput(); err != nil { + fmt.Fprintf(pm.logWriter, " Warning: ipfs-cluster-service init failed: %v (output: %s)\n", err, string(output)) + } + + // Ensure correct ports in service.json BEFORE starting daemon + // This is critical: it sets the cluster listen port to clusterPort, not the default + if err := pm.ensureIPFSClusterPorts(node.clusterPath, node.restAPIPort, node.clusterPort); err != nil { + fmt.Fprintf(pm.logWriter, " Warning: failed to update IPFS Cluster config for %s: %v\n", node.name, err) + } + + // Verify the config was written correctly (debug: read it back) + serviceJSONPath := filepath.Join(node.clusterPath, "service.json") + if data, err := os.ReadFile(serviceJSONPath); err == nil { + var verifyConfig map[string]interface{} + if err := json.Unmarshal(data, &verifyConfig); err == nil { + if cluster, ok := verifyConfig["cluster"].(map[string]interface{}); ok { + if listenAddrs, ok := cluster["listen_multiaddress"].([]interface{}); ok { + fmt.Fprintf(pm.logWriter, " Config verified: %s cluster listening on %v\n", node.name, listenAddrs) + } + } + } + } + + // Start bootstrap cluster service + pidPath := filepath.Join(pm.pidsDir, fmt.Sprintf("ipfs-cluster-%s.pid", node.name)) + logPath := filepath.Join(pm.debrosDir, "logs", fmt.Sprintf("ipfs-cluster-%s.log", node.name)) + + cmd = exec.CommandContext(ctx, "ipfs-cluster-service", "daemon") + cmd.Env = append(os.Environ(), fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath)) + logFile, _ := os.Create(logPath) + cmd.Stdout = logFile + cmd.Stderr = logFile + + if err := cmd.Start(); err != nil { + fmt.Fprintf(pm.logWriter, " ⚠️ Failed to start ipfs-cluster-%s: %v\n", node.name, err) + return err + } + + os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0644) + fmt.Fprintf(pm.logWriter, "✓ IPFS Cluster (%s) started (PID: %d, API: %d)\n", node.name, cmd.Process.Pid, node.restAPIPort) + + // Wait for bootstrap to be ready and read its identity + if err := pm.waitClusterReady(ctx, node.name, node.restAPIPort); err != nil { + fmt.Fprintf(pm.logWriter, " Warning: IPFS Cluster %s did not become ready: %v\n", node.name, err) + } + + // Add a brief delay to allow identity.json to be written + time.Sleep(500 * time.Millisecond) + + // Read bootstrap peer ID for follower nodes to join + peerID, err := pm.waitForClusterPeerID(ctx, filepath.Join(node.clusterPath, "identity.json")) + if err != nil { + fmt.Fprintf(pm.logWriter, " Warning: failed to read bootstrap peer ID: %v\n", err) + } else { + bootstrapMultiaddr = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/%s", node.clusterPort, peerID) + fmt.Fprintf(pm.logWriter, " Bootstrap multiaddress: %s\n", bootstrapMultiaddr) + } + } + + // Phase 2: Initialize and start follower IPFS Cluster nodes with bootstrap flag + for i := 1; i < len(nodes); i++ { + node := nodes[i] + + // Always clean stale cluster state to ensure fresh initialization with correct secret + if err := pm.cleanClusterState(node.clusterPath); err != nil { + fmt.Fprintf(pm.logWriter, " Warning: failed to clean cluster state for %s: %v\n", node.name, err) + } + + os.MkdirAll(node.clusterPath, 0755) + fmt.Fprintf(pm.logWriter, " Initializing IPFS Cluster (%s)...\n", node.name) + cmd := exec.CommandContext(ctx, "ipfs-cluster-service", "init", "--force") + cmd.Env = append(os.Environ(), + fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath), + fmt.Sprintf("CLUSTER_SECRET=%s", clusterSecretHex), + ) + if output, err := cmd.CombinedOutput(); err != nil { + fmt.Fprintf(pm.logWriter, " Warning: ipfs-cluster-service init failed for %s: %v (output: %s)\n", node.name, err, string(output)) } // Ensure correct ports in service.json BEFORE starting daemon @@ -433,11 +527,29 @@ func (pm *ProcessManager) startIPFSCluster(ctx context.Context) error { fmt.Fprintf(pm.logWriter, " Warning: failed to update IPFS Cluster config for %s: %v\n", node.name, err) } - // Start cluster service + // Verify the config was written correctly (debug: read it back) + serviceJSONPath := filepath.Join(node.clusterPath, "service.json") + if data, err := os.ReadFile(serviceJSONPath); err == nil { + var verifyConfig map[string]interface{} + if err := json.Unmarshal(data, &verifyConfig); err == nil { + if cluster, ok := verifyConfig["cluster"].(map[string]interface{}); ok { + if listenAddrs, ok := cluster["listen_multiaddress"].([]interface{}); ok { + fmt.Fprintf(pm.logWriter, " Config verified: %s cluster listening on %v\n", node.name, listenAddrs) + } + } + } + } + + // Start follower cluster service with bootstrap flag pidPath := filepath.Join(pm.pidsDir, fmt.Sprintf("ipfs-cluster-%s.pid", node.name)) logPath := filepath.Join(pm.debrosDir, "logs", fmt.Sprintf("ipfs-cluster-%s.log", node.name)) - cmd := exec.CommandContext(ctx, "ipfs-cluster-service", "daemon") + args := []string{"daemon"} + if bootstrapMultiaddr != "" { + args = append(args, "--bootstrap", bootstrapMultiaddr) + } + + cmd = exec.CommandContext(ctx, "ipfs-cluster-service", args...) cmd.Env = append(os.Environ(), fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath)) logFile, _ := os.Create(logPath) cmd.Stdout = logFile @@ -450,12 +562,145 @@ func (pm *ProcessManager) startIPFSCluster(ctx context.Context) error { os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0644) fmt.Fprintf(pm.logWriter, "✓ IPFS Cluster (%s) started (PID: %d, API: %d)\n", node.name, cmd.Process.Pid, node.restAPIPort) + + // Wait for follower node to connect to the bootstrap peer + if err := pm.waitClusterReady(ctx, node.name, node.restAPIPort); err != nil { + fmt.Fprintf(pm.logWriter, " Warning: IPFS Cluster %s did not become ready: %v\n", node.name, err) + } + } + + // Phase 3: Wait for all cluster peers to discover each other + fmt.Fprintf(pm.logWriter, " Waiting for IPFS Cluster peers to form...\n") + if err := pm.waitClusterFormed(ctx, nodes[0].restAPIPort); err != nil { + fmt.Fprintf(pm.logWriter, " Warning: IPFS Cluster did not form fully: %v\n", err) } time.Sleep(1 * time.Second) return nil } +// waitForClusterPeerID polls the identity.json file until it appears and extracts the peer ID +func (pm *ProcessManager) waitForClusterPeerID(ctx context.Context, identityPath string) (string, error) { + maxRetries := 30 + retryInterval := 500 * time.Millisecond + + for attempt := 0; attempt < maxRetries; attempt++ { + data, err := os.ReadFile(identityPath) + if err == nil { + var identity map[string]interface{} + if err := json.Unmarshal(data, &identity); err == nil { + if id, ok := identity["id"].(string); ok { + return id, nil + } + } + } + + select { + case <-time.After(retryInterval): + continue + case <-ctx.Done(): + return "", ctx.Err() + } + } + + return "", fmt.Errorf("could not read cluster peer ID after %d seconds", (maxRetries * int(retryInterval.Milliseconds()) / 1000)) +} + +// waitClusterReady polls the cluster REST API until it's ready +func (pm *ProcessManager) waitClusterReady(ctx context.Context, name string, restAPIPort int) error { + maxRetries := 30 + retryInterval := 500 * time.Millisecond + + for attempt := 0; attempt < maxRetries; attempt++ { + httpURL := fmt.Sprintf("http://127.0.0.1:%d/peers", restAPIPort) + resp, err := http.Get(httpURL) + if err == nil && resp.StatusCode == 200 { + resp.Body.Close() + return nil + } + if resp != nil { + resp.Body.Close() + } + + select { + case <-time.After(retryInterval): + continue + case <-ctx.Done(): + return ctx.Err() + } + } + + return fmt.Errorf("IPFS Cluster %s did not become ready after %d seconds", name, (maxRetries * int(retryInterval.Seconds()))) +} + +// waitClusterFormed waits for all cluster peers to be visible from the bootstrap node +func (pm *ProcessManager) waitClusterFormed(ctx context.Context, bootstrapRestAPIPort int) error { + maxRetries := 30 + retryInterval := 1 * time.Second + requiredPeers := 3 // bootstrap, node2, node3 + + for attempt := 0; attempt < maxRetries; attempt++ { + httpURL := fmt.Sprintf("http://127.0.0.1:%d/peers", bootstrapRestAPIPort) + resp, err := http.Get(httpURL) + if err == nil && resp.StatusCode == 200 { + var peers []interface{} + if err := json.NewDecoder(resp.Body).Decode(&peers); err == nil { + resp.Body.Close() + if len(peers) >= requiredPeers { + return nil // All peers have formed + } + } else { + resp.Body.Close() + } + } + if resp != nil { + resp.Body.Close() + } + + select { + case <-time.After(retryInterval): + continue + case <-ctx.Done(): + return ctx.Err() + } + } + + return fmt.Errorf("IPFS Cluster did not form fully after %d seconds", (maxRetries * int(retryInterval.Seconds()))) +} + +// cleanClusterState removes stale cluster state files to ensure fresh initialization +// This prevents PSK (private network key) mismatches when cluster secret changes +func (pm *ProcessManager) cleanClusterState(clusterPath string) error { + // Remove pebble datastore (contains persisted PSK state) + pebblePath := filepath.Join(clusterPath, "pebble") + if err := os.RemoveAll(pebblePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove pebble directory: %w", err) + } + + // Remove peerstore (contains peer addresses and metadata) + peerstorePath := filepath.Join(clusterPath, "peerstore") + if err := os.Remove(peerstorePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove peerstore: %w", err) + } + + // Remove service.json (will be regenerated with correct ports and secret) + serviceJSONPath := filepath.Join(clusterPath, "service.json") + if err := os.Remove(serviceJSONPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove service.json: %w", err) + } + + // Remove cluster.lock if it exists (from previous run) + lockPath := filepath.Join(clusterPath, "cluster.lock") + if err := os.Remove(lockPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove cluster.lock: %w", err) + } + + // Note: We keep identity.json as it's tied to the node's peer ID + // The secret will be updated via CLUSTER_SECRET env var during init + + return nil +} + // ensureIPFSClusterPorts updates service.json with correct per-node ports and IPFS connector settings func (pm *ProcessManager) ensureIPFSClusterPorts(clusterPath string, restAPIPort int, clusterPort int) error { serviceJSONPath := filepath.Join(clusterPath, "service.json") @@ -502,10 +747,13 @@ func (pm *ProcessManager) ensureIPFSClusterPorts(clusterPath string, restAPIPort } } - // Update cluster listen multiaddress + // Update cluster listen multiaddress to match the correct port + // Replace all old listen addresses with new ones for the correct port if cluster, ok := config["cluster"].(map[string]interface{}); ok { - listenAddrs := make([]string, 0) - listenAddrs = append(listenAddrs, fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", clusterPort)) + listenAddrs := []string{ + fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", clusterPort), + fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort), + } cluster["listen_multiaddress"] = listenAddrs }