diff --git a/Makefile b/Makefile index 355cce4..27e3b85 100644 --- a/Makefile +++ b/Makefile @@ -117,45 +117,61 @@ dev: build ipfs-cluster-service --version >/dev/null 2>&1 && openssl rand -hex 32 > $$CLUSTER_SECRET || echo "0000000000000000000000000000000000000000000000000000000000000000" > $$CLUSTER_SECRET; \ fi; \ SECRET=$$(cat $$CLUSTER_SECRET); \ + SWARM_KEY=$$HOME/.debros/swarm.key; \ + if [ ! -f $$SWARM_KEY ]; then \ + echo " Generating private swarm key..."; \ + KEY_HEX=$$(openssl rand -hex 32 | tr '[:lower:]' '[:upper:]'); \ + printf "/key/swarm/psk/1.0.0/\n/base16/\n%s\n" "$$KEY_HEX" > $$SWARM_KEY; \ + chmod 600 $$SWARM_KEY; \ + fi; \ echo " Setting up bootstrap node (IPFS: 5001, Cluster: 9094)..."; \ if [ ! -d $$HOME/.debros/bootstrap/ipfs/repo ]; then \ echo " Initializing IPFS..."; \ mkdir -p $$HOME/.debros/bootstrap/ipfs; \ IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs init --profile=server 2>&1 | grep -v "generating" | grep -v "peer identity" || true; \ - IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.API '["/ip4/localhost/tcp/5001"]' 2>&1 | grep -v "generating" || true; \ - IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/localhost/tcp/8080"]' 2>&1 | grep -v "generating" || true; \ + cp $$SWARM_KEY $$HOME/.debros/bootstrap/ipfs/repo/swarm.key; \ + IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.API '["/ip4/127.0.0.1/tcp/5001"]' 2>&1 | grep -v "generating" || true; \ + IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/127.0.0.1/tcp/8080"]' 2>&1 | grep -v "generating" || true; \ IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.Swarm '["/ip4/0.0.0.0/tcp/4101","/ip6/::/tcp/4101"]' 2>&1 | grep -v "generating" || true; \ + else \ + if [ ! -f $$HOME/.debros/bootstrap/ipfs/repo/swarm.key ]; then \ + cp $$SWARM_KEY $$HOME/.debros/bootstrap/ipfs/repo/swarm.key; \ + fi; \ fi; \ - echo " Initializing IPFS Cluster..."; \ + echo " Creating IPFS Cluster directories (config will be managed by Go code)..."; \ mkdir -p $$HOME/.debros/bootstrap/ipfs-cluster; \ - env IPFS_CLUSTER_PATH=$$HOME/.debros/bootstrap/ipfs-cluster ipfs-cluster-service init --force >/dev/null 2>&1 || true; \ - jq '.cluster.peername = "bootstrap" | .cluster.secret = "'$$SECRET'" | .cluster.listen_multiaddress = ["/ip4/0.0.0.0/tcp/9096"] | .consensus.crdt.cluster_name = "debros-cluster" | .consensus.crdt.trusted_peers = ["*"] | .api.restapi.http_listen_multiaddress = "/ip4/0.0.0.0/tcp/9094" | .api.ipfsproxy.listen_multiaddress = "/ip4/127.0.0.1/tcp/9095" | .api.pinsvcapi.http_listen_multiaddress = "/ip4/127.0.0.1/tcp/9097" | .ipfs_connector.ipfshttp.node_multiaddress = "/ip4/127.0.0.1/tcp/5001"' $$HOME/.debros/bootstrap/ipfs-cluster/service.json > $$HOME/.debros/bootstrap/ipfs-cluster/service.json.tmp && mv $$HOME/.debros/bootstrap/ipfs-cluster/service.json.tmp $$HOME/.debros/bootstrap/ipfs-cluster/service.json; \ echo " Setting up node2 (IPFS: 5002, Cluster: 9104)..."; \ if [ ! -d $$HOME/.debros/node2/ipfs/repo ]; then \ echo " Initializing IPFS..."; \ mkdir -p $$HOME/.debros/node2/ipfs; \ IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs init --profile=server 2>&1 | grep -v "generating" | grep -v "peer identity" || true; \ - IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.API '["/ip4/localhost/tcp/5002"]' 2>&1 | grep -v "generating" || true; \ - IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/localhost/tcp/8081"]' 2>&1 | grep -v "generating" || true; \ + cp $$SWARM_KEY $$HOME/.debros/node2/ipfs/repo/swarm.key; \ + IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.API '["/ip4/127.0.0.1/tcp/5002"]' 2>&1 | grep -v "generating" || true; \ + IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/127.0.0.1/tcp/8081"]' 2>&1 | grep -v "generating" || true; \ IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.Swarm '["/ip4/0.0.0.0/tcp/4102","/ip6/::/tcp/4102"]' 2>&1 | grep -v "generating" || true; \ + else \ + if [ ! -f $$HOME/.debros/node2/ipfs/repo/swarm.key ]; then \ + cp $$SWARM_KEY $$HOME/.debros/node2/ipfs/repo/swarm.key; \ + fi; \ fi; \ - echo " Initializing IPFS Cluster..."; \ + echo " Creating IPFS Cluster directories (config will be managed by Go code)..."; \ mkdir -p $$HOME/.debros/node2/ipfs-cluster; \ - env IPFS_CLUSTER_PATH=$$HOME/.debros/node2/ipfs-cluster ipfs-cluster-service init --force >/dev/null 2>&1 || true; \ - jq '.cluster.peername = "node2" | .cluster.secret = "'$$SECRET'" | .cluster.listen_multiaddress = ["/ip4/0.0.0.0/tcp/9106"] | .consensus.crdt.cluster_name = "debros-cluster" | .consensus.crdt.trusted_peers = ["*"] | .api.restapi.http_listen_multiaddress = "/ip4/0.0.0.0/tcp/9104" | .api.ipfsproxy.listen_multiaddress = "/ip4/127.0.0.1/tcp/9105" | .api.pinsvcapi.http_listen_multiaddress = "/ip4/127.0.0.1/tcp/9107" | .ipfs_connector.ipfshttp.node_multiaddress = "/ip4/127.0.0.1/tcp/5002"' $$HOME/.debros/node2/ipfs-cluster/service.json > $$HOME/.debros/node2/ipfs-cluster/service.json.tmp && mv $$HOME/.debros/node2/ipfs-cluster/service.json.tmp $$HOME/.debros/node2/ipfs-cluster/service.json; \ echo " Setting up node3 (IPFS: 5003, Cluster: 9114)..."; \ if [ ! -d $$HOME/.debros/node3/ipfs/repo ]; then \ echo " Initializing IPFS..."; \ mkdir -p $$HOME/.debros/node3/ipfs; \ IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs init --profile=server 2>&1 | grep -v "generating" | grep -v "peer identity" || true; \ - IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.API '["/ip4/localhost/tcp/5003"]' 2>&1 | grep -v "generating" || true; \ - IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/localhost/tcp/8082"]' 2>&1 | grep -v "generating" || true; \ + cp $$SWARM_KEY $$HOME/.debros/node3/ipfs/repo/swarm.key; \ + IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.API '["/ip4/127.0.0.1/tcp/5003"]' 2>&1 | grep -v "generating" || true; \ + IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/127.0.0.1/tcp/8082"]' 2>&1 | grep -v "generating" || true; \ IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.Swarm '["/ip4/0.0.0.0/tcp/4103","/ip6/::/tcp/4103"]' 2>&1 | grep -v "generating" || true; \ + else \ + if [ ! -f $$HOME/.debros/node3/ipfs/repo/swarm.key ]; then \ + cp $$SWARM_KEY $$HOME/.debros/node3/ipfs/repo/swarm.key; \ + fi; \ fi; \ - echo " Initializing IPFS Cluster..."; \ + echo " Creating IPFS Cluster directories (config will be managed by Go code)..."; \ mkdir -p $$HOME/.debros/node3/ipfs-cluster; \ - env IPFS_CLUSTER_PATH=$$HOME/.debros/node3/ipfs-cluster ipfs-cluster-service init --force >/dev/null 2>&1 || true; \ - jq '.cluster.peername = "node3" | .cluster.secret = "'$$SECRET'" | .cluster.listen_multiaddress = ["/ip4/0.0.0.0/tcp/9116"] | .consensus.crdt.cluster_name = "debros-cluster" | .consensus.crdt.trusted_peers = ["*"] | .api.restapi.http_listen_multiaddress = "/ip4/0.0.0.0/tcp/9114" | .api.ipfsproxy.listen_multiaddress = "/ip4/127.0.0.1/tcp/9115" | .api.pinsvcapi.http_listen_multiaddress = "/ip4/127.0.0.1/tcp/9117" | .ipfs_connector.ipfshttp.node_multiaddress = "/ip4/127.0.0.1/tcp/5003"' $$HOME/.debros/node3/ipfs-cluster/service.json > $$HOME/.debros/node3/ipfs-cluster/service.json.tmp && mv $$HOME/.debros/node3/ipfs-cluster/service.json.tmp $$HOME/.debros/node3/ipfs-cluster/service.json; \ echo "Starting IPFS daemons..."; \ if [ ! -f .dev/pids/ipfs-bootstrap.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-bootstrap.pid) 2>/dev/null; then \ IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo nohup ipfs daemon --enable-pubsub-experiment > $$HOME/.debros/logs/ipfs-bootstrap.log 2>&1 & echo $$! > .dev/pids/ipfs-bootstrap.pid; \ @@ -178,29 +194,6 @@ dev: build else \ echo " ✓ Node3 IPFS already running"; \ fi; \ - \ - echo "Starting IPFS Cluster peers..."; \ - if [ ! -f .dev/pids/ipfs-cluster-bootstrap.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-bootstrap.pid) 2>/dev/null; then \ - env IPFS_CLUSTER_PATH=$$HOME/.debros/bootstrap/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-bootstrap.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-bootstrap.pid; \ - echo " Bootstrap Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-bootstrap.pid), API: 9094)"; \ - sleep 3; \ - else \ - echo " ✓ Bootstrap Cluster already running"; \ - fi; \ - if [ ! -f .dev/pids/ipfs-cluster-node2.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-node2.pid) 2>/dev/null; then \ - env IPFS_CLUSTER_PATH=$$HOME/.debros/node2/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-node2.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-node2.pid; \ - echo " Node2 Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-node2.pid), API: 9104)"; \ - sleep 3; \ - else \ - echo " ✓ Node2 Cluster already running"; \ - fi; \ - if [ ! -f .dev/pids/ipfs-cluster-node3.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-node3.pid) 2>/dev/null; then \ - env IPFS_CLUSTER_PATH=$$HOME/.debros/node3/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-node3.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-node3.pid; \ - echo " Node3 Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-node3.pid), API: 9114)"; \ - sleep 3; \ - else \ - echo " ✓ Node3 Cluster already running"; \ - fi; \ else \ echo " ⚠️ ipfs or ipfs-cluster-service not found - skipping IPFS setup"; \ echo " Install with: https://docs.ipfs.tech/install/ and https://ipfscluster.io/documentation/guides/install/"; \ @@ -208,12 +201,58 @@ dev: build @sleep 2 @echo "Starting bootstrap node..." @nohup ./bin/node --config bootstrap.yaml > $$HOME/.debros/logs/bootstrap.log 2>&1 & echo $$! > .dev/pids/bootstrap.pid - @sleep 2 + @sleep 3 @echo "Starting node2..." @nohup ./bin/node --config node2.yaml > $$HOME/.debros/logs/node2.log 2>&1 & echo $$! > .dev/pids/node2.pid - @sleep 1 + @sleep 2 @echo "Starting node3..." @nohup ./bin/node --config node3.yaml > $$HOME/.debros/logs/node3.log 2>&1 & echo $$! > .dev/pids/node3.pid + @sleep 3 + @echo "Starting IPFS Cluster daemons (after Go nodes have configured them)..." + @if command -v ipfs-cluster-service >/dev/null 2>&1; then \ + if [ ! -f .dev/pids/ipfs-cluster-bootstrap.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-bootstrap.pid) 2>/dev/null; then \ + if [ -f $$HOME/.debros/bootstrap/ipfs-cluster/service.json ]; then \ + env IPFS_CLUSTER_PATH=$$HOME/.debros/bootstrap/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-bootstrap.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-bootstrap.pid; \ + echo " Bootstrap Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-bootstrap.pid), API: 9094)"; \ + echo " Waiting for bootstrap cluster to be ready..."; \ + for i in $$(seq 1 30); do \ + if curl -s http://localhost:9094/peers >/dev/null 2>&1; then \ + break; \ + fi; \ + sleep 1; \ + done; \ + sleep 2; \ + else \ + echo " ⚠️ Bootstrap cluster config not ready yet"; \ + fi; \ + else \ + echo " ✓ Bootstrap Cluster already running"; \ + fi; \ + if [ ! -f .dev/pids/ipfs-cluster-node2.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-node2.pid) 2>/dev/null; then \ + if [ -f $$HOME/.debros/node2/ipfs-cluster/service.json ]; then \ + env IPFS_CLUSTER_PATH=$$HOME/.debros/node2/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-node2.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-node2.pid; \ + echo " Node2 Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-node2.pid), API: 9104)"; \ + sleep 3; \ + else \ + echo " ⚠️ Node2 cluster config not ready yet"; \ + fi; \ + else \ + echo " ✓ Node2 Cluster already running"; \ + fi; \ + if [ ! -f .dev/pids/ipfs-cluster-node3.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-node3.pid) 2>/dev/null; then \ + if [ -f $$HOME/.debros/node3/ipfs-cluster/service.json ]; then \ + env IPFS_CLUSTER_PATH=$$HOME/.debros/node3/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-node3.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-node3.pid; \ + echo " Node3 Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-node3.pid), API: 9114)"; \ + sleep 3; \ + else \ + echo " ⚠️ Node3 cluster config not ready yet"; \ + fi; \ + else \ + echo " ✓ Node3 Cluster already running"; \ + fi; \ + else \ + echo " ⚠️ ipfs-cluster-service not found - skipping cluster daemon startup"; \ + fi @sleep 1 @echo "Starting Olric cache server..." @if command -v olric-server >/dev/null 2>&1; then \ diff --git a/pkg/cli/setup.go b/pkg/cli/setup.go index 18dd3b5..a103b27 100644 --- a/pkg/cli/setup.go +++ b/pkg/cli/setup.go @@ -1871,6 +1871,60 @@ func getOrGenerateClusterSecret() (string, error) { return secret, nil } +// getOrGenerateSwarmKey gets or generates a shared IPFS swarm key +// Returns the swarm key content as bytes (formatted for IPFS) +func getOrGenerateSwarmKey() ([]byte, error) { + secretPath := "/home/debros/.debros/swarm.key" + + // Try to read existing key + if data, err := os.ReadFile(secretPath); err == nil { + // Validate it's a proper swarm key format + content := string(data) + if strings.Contains(content, "/key/swarm/psk/1.0.0/") { + return data, nil + } + } + + // Generate new key (32 bytes) + keyBytes := make([]byte, 32) + if _, err := rand.Read(keyBytes); err != nil { + return nil, fmt.Errorf("failed to generate swarm key: %w", err) + } + + // Format as IPFS swarm key file + keyHex := strings.ToUpper(hex.EncodeToString(keyBytes)) + content := fmt.Sprintf("/key/swarm/psk/1.0.0/\n/base16/\n%s\n", keyHex) + + // Save key + if err := os.WriteFile(secretPath, []byte(content), 0600); err != nil { + return nil, fmt.Errorf("failed to save swarm key: %w", err) + } + exec.Command("chown", "debros:debros", secretPath).Run() + + fmt.Printf(" ✓ Generated private swarm key\n") + return []byte(content), nil +} + +// ensureSwarmKey ensures the swarm key exists in the IPFS repo +func ensureSwarmKey(repoPath string, swarmKey []byte) error { + swarmKeyPath := filepath.Join(repoPath, "swarm.key") + + // Check if swarm key already exists + if _, err := os.Stat(swarmKeyPath); err == nil { + // Verify it matches (optional: could compare content) + return nil + } + + // Create swarm key file in repo + if err := os.WriteFile(swarmKeyPath, swarmKey, 0600); err != nil { + return fmt.Errorf("failed to write swarm key to repo: %w", err) + } + + // Fix ownership + exec.Command("chown", "debros:debros", swarmKeyPath).Run() + return nil +} + // initializeIPFSForNode initializes IPFS and IPFS Cluster for a node func initializeIPFSForNode(nodeID, vpsIP string, isBootstrap bool) error { fmt.Printf(" Initializing IPFS and Cluster for node %s...\n", nodeID) @@ -1881,6 +1935,12 @@ func initializeIPFSForNode(nodeID, vpsIP string, isBootstrap bool) error { return fmt.Errorf("failed to get cluster secret: %w", err) } + // Get or generate swarm key for private network + swarmKey, err := getOrGenerateSwarmKey() + if err != nil { + return fmt.Errorf("failed to get swarm key: %w", err) + } + // Determine data directories var ipfsDataDir, clusterDataDir string if nodeID == "bootstrap" { @@ -1906,11 +1966,22 @@ func initializeIPFSForNode(nodeID, vpsIP string, isBootstrap bool) error { return fmt.Errorf("failed to initialize IPFS: %v\n%s", err, string(output)) } + // Ensure swarm key is in place (creates private network) + if err := ensureSwarmKey(ipfsRepoPath, swarmKey); err != nil { + return fmt.Errorf("failed to set swarm key: %w", err) + } + // Configure IPFS API and Gateway addresses exec.Command("sudo", "-u", "debros", "ipfs", "config", "--json", "Addresses.API", `["/ip4/localhost/tcp/5001"]`, "--repo-dir="+ipfsRepoPath).Run() exec.Command("sudo", "-u", "debros", "ipfs", "config", "--json", "Addresses.Gateway", `["/ip4/localhost/tcp/8080"]`, "--repo-dir="+ipfsRepoPath).Run() exec.Command("sudo", "-u", "debros", "ipfs", "config", "--json", "Addresses.Swarm", `["/ip4/0.0.0.0/tcp/4001","/ip6/::/tcp/4001"]`, "--repo-dir="+ipfsRepoPath).Run() - fmt.Printf(" ✓ IPFS initialized\n") + fmt.Printf(" ✓ IPFS initialized with private swarm key\n") + } else { + // Repo exists, but ensure swarm key is present + if err := ensureSwarmKey(ipfsRepoPath, swarmKey); err != nil { + return fmt.Errorf("failed to set swarm key: %w", err) + } + fmt.Printf(" ✓ IPFS repository already exists, swarm key ensured\n") } // Initialize IPFS Cluster if not already initialized @@ -2084,6 +2155,7 @@ User=debros Group=debros Environment=HOME=/home/debros ExecStartPre=/bin/bash -c 'if [ -f /home/debros/.debros/node.yaml ]; then export IPFS_PATH=/home/debros/.debros/node/ipfs/repo; elif [ -f /home/debros/.debros/bootstrap.yaml ]; then export IPFS_PATH=/home/debros/.debros/bootstrap/ipfs/repo; else export IPFS_PATH=/home/debros/.debros/bootstrap/ipfs/repo; fi' +ExecStartPre=/bin/bash -c 'if [ -f /home/debros/.debros/swarm.key ] && [ ! -f ${IPFS_PATH}/swarm.key ]; then cp /home/debros/.debros/swarm.key ${IPFS_PATH}/swarm.key && chmod 600 ${IPFS_PATH}/swarm.key; fi' ExecStart=/usr/bin/ipfs daemon --enable-pubsub-experiment --repo-dir=${IPFS_PATH} Restart=always RestartSec=5 diff --git a/pkg/ipfs/cluster.go b/pkg/ipfs/cluster.go new file mode 100644 index 0000000..0ab5e58 --- /dev/null +++ b/pkg/ipfs/cluster.go @@ -0,0 +1,717 @@ +package ipfs + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "go.uber.org/zap" + + "github.com/DeBrosOfficial/network/pkg/config" +) + +// ClusterConfigManager manages IPFS Cluster configuration files +type ClusterConfigManager struct { + cfg *config.Config + logger *zap.Logger + clusterPath string + secret string +} + +// ClusterServiceConfig represents the structure of service.json +type ClusterServiceConfig struct { + Cluster struct { + Peername string `json:"peername"` + Secret string `json:"secret"` + LeaveOnShutdown bool `json:"leave_on_shutdown"` + ListenMultiaddress []string `json:"listen_multiaddress"` + PeerAddresses []string `json:"peer_addresses"` + // ... other fields kept from template + } `json:"cluster"` + Consensus struct { + CRDT struct { + ClusterName string `json:"cluster_name"` + TrustedPeers []string `json:"trusted_peers"` + Batching struct { + MaxBatchSize int `json:"max_batch_size"` + MaxBatchAge string `json:"max_batch_age"` + } `json:"batching"` + RepairInterval string `json:"repair_interval"` + } `json:"crdt"` + } `json:"consensus"` + API struct { + IPFSProxy struct { + ListenMultiaddress string `json:"listen_multiaddress"` + NodeMultiaddress string `json:"node_multiaddress"` + } `json:"ipfsproxy"` + PinSvcAPI struct { + HTTPListenMultiaddress string `json:"http_listen_multiaddress"` + } `json:"pinsvcapi"` + RestAPI struct { + HTTPListenMultiaddress string `json:"http_listen_multiaddress"` + } `json:"restapi"` + } `json:"api"` + IPFSConnector struct { + IPFSHTTP struct { + NodeMultiaddress string `json:"node_multiaddress"` + } `json:"ipfshttp"` + } `json:"ipfs_connector"` + // Keep rest of fields as raw JSON to preserve structure + Raw map[string]interface{} `json:"-"` +} + +// NewClusterConfigManager creates a new IPFS Cluster config manager +func NewClusterConfigManager(cfg *config.Config, logger *zap.Logger) (*ClusterConfigManager, error) { + // Expand data directory path + dataDir := cfg.Node.DataDir + if strings.HasPrefix(dataDir, "~") { + home, err := os.UserHomeDir() + if err != nil { + return nil, fmt.Errorf("failed to determine home directory: %w", err) + } + dataDir = filepath.Join(home, dataDir[1:]) + } + + // Determine cluster path based on data directory structure + // Check if dataDir contains specific node names (e.g., ~/.debros/bootstrap, ~/.debros/node2) + clusterPath := filepath.Join(dataDir, "ipfs-cluster") + if strings.Contains(dataDir, "bootstrap") { + // Check if bootstrap is a direct child + if filepath.Base(filepath.Dir(dataDir)) == "bootstrap" || filepath.Base(dataDir) == "bootstrap" { + clusterPath = filepath.Join(dataDir, "ipfs-cluster") + } else { + clusterPath = filepath.Join(dataDir, "bootstrap", "ipfs-cluster") + } + } else if strings.Contains(dataDir, "node2") { + if filepath.Base(filepath.Dir(dataDir)) == "node2" || filepath.Base(dataDir) == "node2" { + clusterPath = filepath.Join(dataDir, "ipfs-cluster") + } else { + clusterPath = filepath.Join(dataDir, "node2", "ipfs-cluster") + } + } else if strings.Contains(dataDir, "node3") { + if filepath.Base(filepath.Dir(dataDir)) == "node3" || filepath.Base(dataDir) == "node3" { + clusterPath = filepath.Join(dataDir, "ipfs-cluster") + } else { + clusterPath = filepath.Join(dataDir, "node3", "ipfs-cluster") + } + } + + // Load or generate cluster secret + secretPath := filepath.Join(dataDir, "..", "cluster-secret") + if strings.Contains(dataDir, ".debros") { + // Try to find cluster-secret in ~/.debros + home, err := os.UserHomeDir() + if err == nil { + secretPath = filepath.Join(home, ".debros", "cluster-secret") + } + } + + secret, err := loadOrGenerateClusterSecret(secretPath) + if err != nil { + return nil, fmt.Errorf("failed to load/generate cluster secret: %w", err) + } + + return &ClusterConfigManager{ + cfg: cfg, + logger: logger, + clusterPath: clusterPath, + secret: secret, + }, nil +} + +// EnsureConfig ensures the IPFS Cluster service.json exists and is properly configured +func (cm *ClusterConfigManager) EnsureConfig() error { + if cm.cfg.Database.IPFS.ClusterAPIURL == "" { + cm.logger.Debug("IPFS Cluster API URL not configured, skipping cluster config") + return nil + } + + serviceJSONPath := filepath.Join(cm.clusterPath, "service.json") + + // Parse ports from URLs + clusterPort, restAPIPort, err := parseClusterPorts(cm.cfg.Database.IPFS.ClusterAPIURL) + if err != nil { + return fmt.Errorf("failed to parse cluster API URL: %w", err) + } + + ipfsPort, err := parseIPFSPort(cm.cfg.Database.IPFS.APIURL) + if err != nil { + return fmt.Errorf("failed to parse IPFS API URL: %w", err) + } + + // Determine node name + nodeName := cm.cfg.Node.Type + if nodeName == "node" { + // Try to extract from data dir or ID + if strings.Contains(cm.cfg.Node.DataDir, "node2") || strings.Contains(cm.cfg.Node.ID, "node2") { + nodeName = "node2" + } else if strings.Contains(cm.cfg.Node.DataDir, "node3") || strings.Contains(cm.cfg.Node.ID, "node3") { + nodeName = "node3" + } else { + nodeName = "node" + } + } + + // Calculate ports based on pattern + proxyPort := clusterPort - 1 + pinSvcPort := clusterPort + 1 + clusterListenPort := clusterPort + 2 + + // If config doesn't exist, initialize it with ipfs-cluster-service init + // This ensures we have all required sections (datastore, informer, etc.) + if _, err := os.Stat(serviceJSONPath); os.IsNotExist(err) { + cm.logger.Info("Initializing cluster config with ipfs-cluster-service init") + initCmd := exec.Command("ipfs-cluster-service", "init", "--force") + initCmd.Env = append(os.Environ(), "IPFS_CLUSTER_PATH="+cm.clusterPath) + if err := initCmd.Run(); err != nil { + cm.logger.Warn("Failed to initialize cluster config with ipfs-cluster-service init, will create minimal template", zap.Error(err)) + } + } + + // Load existing config or create new + cfg, err := cm.loadOrCreateConfig(serviceJSONPath) + if err != nil { + return fmt.Errorf("failed to load/create config: %w", err) + } + + // Update configuration + cfg.Cluster.Peername = nodeName + cfg.Cluster.Secret = cm.secret + cfg.Cluster.ListenMultiaddress = []string{fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", clusterListenPort)} + cfg.Consensus.CRDT.ClusterName = "debros-cluster" + cfg.Consensus.CRDT.TrustedPeers = []string{"*"} + + // API endpoints + cfg.API.RestAPI.HTTPListenMultiaddress = fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", restAPIPort) + cfg.API.IPFSProxy.ListenMultiaddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", proxyPort) + cfg.API.IPFSProxy.NodeMultiaddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsPort) // FIX: Correct path! + cfg.API.PinSvcAPI.HTTPListenMultiaddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", pinSvcPort) + + // IPFS connector (also needs to be set) + cfg.IPFSConnector.IPFSHTTP.NodeMultiaddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsPort) + + // Save configuration + if err := cm.saveConfig(serviceJSONPath, cfg); err != nil { + return fmt.Errorf("failed to save config: %w", err) + } + + cm.logger.Info("IPFS Cluster configuration ensured", + zap.String("path", serviceJSONPath), + zap.String("node_name", nodeName), + zap.Int("ipfs_port", ipfsPort), + zap.Int("cluster_port", clusterPort), + zap.Int("rest_api_port", restAPIPort)) + + return nil +} + +// UpdateBootstrapPeers updates peer_addresses and peerstore with bootstrap peer information +func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) error { + if cm.cfg.Database.IPFS.ClusterAPIURL == "" { + return nil // IPFS not configured + } + + // Skip if this is the bootstrap node itself + if cm.cfg.Node.Type == "bootstrap" { + return nil + } + + // Query bootstrap cluster API to get peer ID + peerID, err := getBootstrapPeerID(bootstrapAPIURL) + if err != nil { + return fmt.Errorf("failed to get bootstrap peer ID: %w", err) + } + + if peerID == "" { + cm.logger.Warn("Bootstrap peer ID not available yet") + return nil + } + + // Extract bootstrap cluster port from URL + _, clusterPort, err := parseClusterPorts(bootstrapAPIURL) + if err != nil { + return fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err) + } + + // Bootstrap listens on clusterPort + 2 (same pattern) + bootstrapClusterPort := clusterPort + 2 + bootstrapPeerAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/%s", bootstrapClusterPort, peerID) + + // Load current config + serviceJSONPath := filepath.Join(cm.clusterPath, "service.json") + cfg, err := cm.loadOrCreateConfig(serviceJSONPath) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + // Update peer_addresses + cfg.Cluster.PeerAddresses = []string{bootstrapPeerAddr} + + // Save config + if err := cm.saveConfig(serviceJSONPath, cfg); err != nil { + return fmt.Errorf("failed to save config: %w", err) + } + + // Write to peerstore file + peerstorePath := filepath.Join(cm.clusterPath, "peerstore") + if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil { + return fmt.Errorf("failed to write peerstore: %w", err) + } + + cm.logger.Info("Updated bootstrap peer configuration", + zap.String("bootstrap_peer_addr", bootstrapPeerAddr), + zap.String("peerstore_path", peerstorePath)) + + return nil +} + +// loadOrCreateConfig loads existing service.json or creates a template +func (cm *ClusterConfigManager) loadOrCreateConfig(path string) (*ClusterServiceConfig, error) { + // Try to load existing config + if data, err := os.ReadFile(path); err == nil { + var cfg ClusterServiceConfig + if err := json.Unmarshal(data, &cfg); err == nil { + // Also unmarshal into raw map to preserve all fields + var raw map[string]interface{} + if err := json.Unmarshal(data, &raw); err == nil { + cfg.Raw = raw + } + return &cfg, nil + } + } + + // Create new config from template + return cm.createTemplateConfig(), nil +} + +// createTemplateConfig creates a template configuration matching the structure +func (cm *ClusterConfigManager) createTemplateConfig() *ClusterServiceConfig { + cfg := &ClusterServiceConfig{} + cfg.Cluster.LeaveOnShutdown = false + cfg.Cluster.PeerAddresses = []string{} + cfg.Consensus.CRDT.TrustedPeers = []string{"*"} + cfg.Consensus.CRDT.Batching.MaxBatchSize = 0 + cfg.Consensus.CRDT.Batching.MaxBatchAge = "0s" + cfg.Consensus.CRDT.RepairInterval = "1h0m0s" + cfg.Raw = make(map[string]interface{}) + return cfg +} + +// saveConfig saves the configuration, preserving all existing fields +func (cm *ClusterConfigManager) saveConfig(path string, cfg *ClusterServiceConfig) error { + // Create directory if needed + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return fmt.Errorf("failed to create cluster directory: %w", err) + } + + // Load existing config if it exists to preserve all fields + var final map[string]interface{} + if data, err := os.ReadFile(path); err == nil { + if err := json.Unmarshal(data, &final); err != nil { + // If parsing fails, start fresh + final = make(map[string]interface{}) + } + } else { + final = make(map[string]interface{}) + } + + // Deep merge: update nested structures while preserving other fields + updateNestedMap(final, "cluster", map[string]interface{}{ + "peername": cfg.Cluster.Peername, + "secret": cfg.Cluster.Secret, + "leave_on_shutdown": cfg.Cluster.LeaveOnShutdown, + "listen_multiaddress": cfg.Cluster.ListenMultiaddress, + "peer_addresses": cfg.Cluster.PeerAddresses, + }) + + updateNestedMap(final, "consensus", map[string]interface{}{ + "crdt": map[string]interface{}{ + "cluster_name": cfg.Consensus.CRDT.ClusterName, + "trusted_peers": cfg.Consensus.CRDT.TrustedPeers, + "batching": map[string]interface{}{ + "max_batch_size": cfg.Consensus.CRDT.Batching.MaxBatchSize, + "max_batch_age": cfg.Consensus.CRDT.Batching.MaxBatchAge, + }, + "repair_interval": cfg.Consensus.CRDT.RepairInterval, + }, + }) + + // Update API section, preserving other fields + updateNestedMap(final, "api", map[string]interface{}{ + "ipfsproxy": map[string]interface{}{ + "listen_multiaddress": cfg.API.IPFSProxy.ListenMultiaddress, + "node_multiaddress": cfg.API.IPFSProxy.NodeMultiaddress, // FIX: Correct path! + }, + "pinsvcapi": map[string]interface{}{ + "http_listen_multiaddress": cfg.API.PinSvcAPI.HTTPListenMultiaddress, + }, + "restapi": map[string]interface{}{ + "http_listen_multiaddress": cfg.API.RestAPI.HTTPListenMultiaddress, + }, + }) + + // Update IPFS connector section + updateNestedMap(final, "ipfs_connector", map[string]interface{}{ + "ipfshttp": map[string]interface{}{ + "node_multiaddress": cfg.IPFSConnector.IPFSHTTP.NodeMultiaddress, + "connect_swarms_delay": "30s", + "ipfs_request_timeout": "5m0s", + "pin_timeout": "2m0s", + "unpin_timeout": "3h0m0s", + "repogc_timeout": "24h0m0s", + "informer_trigger_interval": 0, + }, + }) + + // Ensure all required sections exist with defaults if missing + ensureRequiredSection(final, "datastore", map[string]interface{}{ + "pebble": map[string]interface{}{ + "pebble_options": map[string]interface{}{ + "cache_size_bytes": 1073741824, + "bytes_per_sync": 1048576, + "disable_wal": false, + }, + }, + }) + + ensureRequiredSection(final, "informer", map[string]interface{}{ + "disk": map[string]interface{}{ + "metric_ttl": "30s", + "metric_type": "freespace", + }, + "pinqueue": map[string]interface{}{ + "metric_ttl": "30s", + "weight_bucket_size": 100000, + }, + "tags": map[string]interface{}{ + "metric_ttl": "30s", + "tags": map[string]interface{}{ + "group": "default", + }, + }, + }) + + ensureRequiredSection(final, "monitor", map[string]interface{}{ + "pubsubmon": map[string]interface{}{ + "check_interval": "15s", + }, + }) + + ensureRequiredSection(final, "pin_tracker", map[string]interface{}{ + "stateless": map[string]interface{}{ + "concurrent_pins": 10, + "priority_pin_max_age": "24h0m0s", + "priority_pin_max_retries": 5, + }, + }) + + ensureRequiredSection(final, "allocator", map[string]interface{}{ + "balanced": map[string]interface{}{ + "allocate_by": []interface{}{"tag:group", "freespace"}, + }, + }) + + // Write JSON + data, err := json.MarshalIndent(final, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal config: %w", err) + } + + if err := os.WriteFile(path, data, 0644); err != nil { + return fmt.Errorf("failed to write config: %w", err) + } + + return nil +} + +// updateNestedMap updates a nested map structure, merging values +func updateNestedMap(parent map[string]interface{}, key string, updates map[string]interface{}) { + existing, ok := parent[key].(map[string]interface{}) + if !ok { + parent[key] = updates + return + } + + // Merge updates into existing + for k, v := range updates { + if vm, ok := v.(map[string]interface{}); ok { + // Recursively merge nested maps + if _, ok := existing[k].(map[string]interface{}); !ok { + existing[k] = vm + } else { + updateNestedMap(existing, k, vm) + } + } else { + existing[k] = v + } + } + parent[key] = existing +} + +// ensureRequiredSection ensures a section exists in the config, creating it with defaults if missing +func ensureRequiredSection(parent map[string]interface{}, key string, defaults map[string]interface{}) { + if _, exists := parent[key]; !exists { + parent[key] = defaults + return + } + // If section exists, merge defaults to ensure all required subsections exist + existing, ok := parent[key].(map[string]interface{}) + if ok { + updateNestedMap(parent, key, defaults) + parent[key] = existing + } +} + +// parseClusterPorts extracts cluster port and REST API port from ClusterAPIURL +func parseClusterPorts(clusterAPIURL string) (clusterPort, restAPIPort int, err error) { + u, err := url.Parse(clusterAPIURL) + if err != nil { + return 0, 0, err + } + + portStr := u.Port() + if portStr == "" { + // Default port based on scheme + if u.Scheme == "http" { + portStr = "9094" + } else if u.Scheme == "https" { + portStr = "443" + } else { + return 0, 0, fmt.Errorf("unknown scheme: %s", u.Scheme) + } + } + + _, err = fmt.Sscanf(portStr, "%d", &restAPIPort) + if err != nil { + return 0, 0, fmt.Errorf("invalid port: %s", portStr) + } + + // Cluster listen port is typically REST API port + 2 + clusterPort = restAPIPort + 2 + + return clusterPort, restAPIPort, nil +} + +// parseIPFSPort extracts IPFS API port from APIURL +func parseIPFSPort(apiURL string) (int, error) { + if apiURL == "" { + return 5001, nil // Default + } + + u, err := url.Parse(apiURL) + if err != nil { + return 0, err + } + + portStr := u.Port() + if portStr == "" { + if u.Scheme == "http" { + return 5001, nil // Default HTTP port + } + return 0, fmt.Errorf("unknown scheme: %s", u.Scheme) + } + + var port int + _, err = fmt.Sscanf(portStr, "%d", &port) + if err != nil { + return 0, fmt.Errorf("invalid port: %s", portStr) + } + + return port, nil +} + +// getBootstrapPeerID queries the bootstrap cluster API to get the peer ID +func getBootstrapPeerID(apiURL string) (string, error) { + // Simple HTTP client to query /peers endpoint + client := &standardHTTPClient{} + peersResp, err := client.Get(fmt.Sprintf("%s/peers", apiURL)) + if err != nil { + return "", err + } + + var peersData struct { + ID string `json:"id"` + } + if err := json.Unmarshal(peersResp, &peersData); err != nil { + return "", err + } + + return peersData.ID, nil +} + +// loadOrGenerateClusterSecret loads cluster secret or generates a new one +func loadOrGenerateClusterSecret(path string) (string, error) { + // Try to load existing secret + if data, err := os.ReadFile(path); err == nil { + return strings.TrimSpace(string(data)), nil + } + + // Generate new secret (32 bytes hex = 64 hex chars) + secret := generateRandomSecret(64) + + // Save secret + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return "", err + } + if err := os.WriteFile(path, []byte(secret), 0600); err != nil { + return "", err + } + + return secret, nil +} + +// generateRandomSecret generates a random hex string +func generateRandomSecret(length int) string { + bytes := make([]byte, length/2) + if _, err := rand.Read(bytes); err != nil { + // Fallback to simple generation if crypto/rand fails + for i := range bytes { + bytes[i] = byte(os.Getpid() + i) + } + } + return hex.EncodeToString(bytes) +} + +// standardHTTPClient implements HTTP client using net/http +type standardHTTPClient struct{} + +func (c *standardHTTPClient) Get(url string) ([]byte, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) + } + + data, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return data, nil +} + +// FixIPFSConfigAddresses fixes localhost addresses in IPFS config to use 127.0.0.1 +// This is necessary because IPFS doesn't accept "localhost" as a valid IP address in multiaddrs +// This function always ensures the config is correct, regardless of current state +func (cm *ClusterConfigManager) FixIPFSConfigAddresses() error { + if cm.cfg.Database.IPFS.APIURL == "" { + return nil // IPFS not configured + } + + // Determine IPFS repo path from config + dataDir := cm.cfg.Node.DataDir + if strings.HasPrefix(dataDir, "~") { + home, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("failed to determine home directory: %w", err) + } + dataDir = filepath.Join(home, dataDir[1:]) + } + + // Try to find IPFS repo path + // Check common locations: dataDir/ipfs/repo, or dataDir/bootstrap/ipfs/repo, etc. + possiblePaths := []string{ + filepath.Join(dataDir, "ipfs", "repo"), + filepath.Join(dataDir, "bootstrap", "ipfs", "repo"), + filepath.Join(dataDir, "node2", "ipfs", "repo"), + filepath.Join(dataDir, "node3", "ipfs", "repo"), + filepath.Join(filepath.Dir(dataDir), "bootstrap", "ipfs", "repo"), + filepath.Join(filepath.Dir(dataDir), "node2", "ipfs", "repo"), + filepath.Join(filepath.Dir(dataDir), "node3", "ipfs", "repo"), + } + + var ipfsRepoPath string + for _, path := range possiblePaths { + if _, err := os.Stat(filepath.Join(path, "config")); err == nil { + ipfsRepoPath = path + break + } + } + + if ipfsRepoPath == "" { + cm.logger.Debug("IPFS repo not found, skipping config fix") + return nil // Not an error if repo doesn't exist yet + } + + // Parse IPFS API port from config + ipfsPort, err := parseIPFSPort(cm.cfg.Database.IPFS.APIURL) + if err != nil { + return fmt.Errorf("failed to parse IPFS API URL: %w", err) + } + + // Determine gateway port (typically API port + 3079, or 8080 for bootstrap, 8081 for node2, etc.) + gatewayPort := 8080 + if strings.Contains(dataDir, "node2") { + gatewayPort = 8081 + } else if strings.Contains(dataDir, "node3") { + gatewayPort = 8082 + } else if ipfsPort == 5002 { + gatewayPort = 8081 + } else if ipfsPort == 5003 { + gatewayPort = 8082 + } + + // Always ensure API address is correct (don't just check, always set it) + correctAPIAddr := fmt.Sprintf(`["/ip4/127.0.0.1/tcp/%d"]`, ipfsPort) + cm.logger.Info("Ensuring IPFS API address is correct", + zap.String("repo", ipfsRepoPath), + zap.Int("port", ipfsPort), + zap.String("correct_address", correctAPIAddr)) + + fixCmd := exec.Command("ipfs", "config", "--json", "Addresses.API", correctAPIAddr) + fixCmd.Env = append(os.Environ(), "IPFS_PATH="+ipfsRepoPath) + if err := fixCmd.Run(); err != nil { + cm.logger.Warn("Failed to fix IPFS API address", zap.Error(err)) + return fmt.Errorf("failed to set IPFS API address: %w", err) + } + + // Always ensure Gateway address is correct + correctGatewayAddr := fmt.Sprintf(`["/ip4/127.0.0.1/tcp/%d"]`, gatewayPort) + cm.logger.Info("Ensuring IPFS Gateway address is correct", + zap.String("repo", ipfsRepoPath), + zap.Int("port", gatewayPort), + zap.String("correct_address", correctGatewayAddr)) + + fixCmd = exec.Command("ipfs", "config", "--json", "Addresses.Gateway", correctGatewayAddr) + fixCmd.Env = append(os.Environ(), "IPFS_PATH="+ipfsRepoPath) + if err := fixCmd.Run(); err != nil { + cm.logger.Warn("Failed to fix IPFS Gateway address", zap.Error(err)) + return fmt.Errorf("failed to set IPFS Gateway address: %w", err) + } + + // Check if IPFS daemon is running - if so, it may need to be restarted for changes to take effect + // We can't restart it from here (it's managed by Makefile/systemd), but we can warn + if cm.isIPFSRunning(ipfsPort) { + cm.logger.Warn("IPFS daemon appears to be running - it may need to be restarted for config changes to take effect", + zap.Int("port", ipfsPort), + zap.String("repo", ipfsRepoPath)) + } + + return nil +} + +// isIPFSRunning checks if IPFS daemon is running by attempting to connect to the API +func (cm *ClusterConfigManager) isIPFSRunning(port int) bool { + client := &http.Client{ + Timeout: 1 * time.Second, + } + resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/api/v0/id", port)) + if err != nil { + return false + } + resp.Body.Close() + return resp.StatusCode == 200 +} diff --git a/pkg/node/node.go b/pkg/node/node.go index 1687b73..5a4ec9b 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -22,6 +22,7 @@ import ( "github.com/DeBrosOfficial/network/pkg/config" "github.com/DeBrosOfficial/network/pkg/discovery" "github.com/DeBrosOfficial/network/pkg/encryption" + "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/pubsub" database "github.com/DeBrosOfficial/network/pkg/rqlite" @@ -45,6 +46,9 @@ type Node struct { // Discovery discoveryManager *discovery.Manager + + // IPFS Cluster config manager + clusterConfigManager *ipfs.ClusterConfigManager } // NewNode creates a new network node @@ -631,6 +635,14 @@ func (n *Node) Start(ctx context.Context) error { return fmt.Errorf("failed to start LibP2P: %w", err) } + // Initialize IPFS Cluster configuration if enabled + if n.config.Database.IPFS.ClusterAPIURL != "" { + if err := n.startIPFSClusterConfig(); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to initialize IPFS Cluster config", zap.Error(err)) + // Don't fail node startup if cluster config fails + } + } + // Start RQLite with cluster discovery if err := n.startRQLite(ctx); err != nil { return fmt.Errorf("failed to start RQLite: %w", err) @@ -651,3 +663,41 @@ func (n *Node) Start(ctx context.Context) error { return nil } + +// startIPFSClusterConfig initializes and ensures IPFS Cluster configuration +func (n *Node) startIPFSClusterConfig() error { + n.logger.ComponentInfo(logging.ComponentNode, "Initializing IPFS Cluster configuration") + + // Create config manager + cm, err := ipfs.NewClusterConfigManager(n.config, n.logger.Logger) + if err != nil { + return fmt.Errorf("failed to create cluster config manager: %w", err) + } + n.clusterConfigManager = cm + + // Fix IPFS config addresses (localhost -> 127.0.0.1) before ensuring cluster config + if err := cm.FixIPFSConfigAddresses(); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to fix IPFS config addresses", zap.Error(err)) + // Don't fail startup if config fix fails - cluster config will handle it + } + + // Ensure configuration exists and is correct + if err := cm.EnsureConfig(); err != nil { + return fmt.Errorf("failed to ensure cluster config: %w", err) + } + + // If this is not the bootstrap node, try to update bootstrap peer info + if n.config.Node.Type != "bootstrap" && len(n.config.Discovery.BootstrapPeers) > 0 { + // Try to find bootstrap cluster API URL from config + // For now, we'll discover it from the first bootstrap peer + // In a real scenario, you might want to configure this explicitly + bootstrapClusterAPI := "http://localhost:9094" // Default bootstrap cluster API + if err := cm.UpdateBootstrapPeers(bootstrapClusterAPI); err != nil { + n.logger.ComponentWarn(logging.ComponentNode, "Failed to update bootstrap peers, will retry later", zap.Error(err)) + // Don't fail - peers can connect later via mDNS or manual config + } + } + + n.logger.ComponentInfo(logging.ComponentNode, "IPFS Cluster configuration initialized") + return nil +}