mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-12-12 22:58:49 +00:00
feat: enhance IPFS and Cluster integration in setup
- Added automatic setup for IPFS and IPFS Cluster during the network setup process. - Implemented initialization of IPFS repositories and Cluster configurations for each node. - Enhanced Makefile to support starting IPFS and Cluster daemons with improved logging. - Introduced a new documentation guide for IPFS Cluster setup, detailing configuration and verification steps. - Updated changelog to reflect the new features and improvements.
This commit is contained in:
parent
69d7ccf4c7
commit
d00290d278
119
Makefile
119
Makefile
@ -117,45 +117,61 @@ dev: build
|
||||
ipfs-cluster-service --version >/dev/null 2>&1 && openssl rand -hex 32 > $$CLUSTER_SECRET || echo "0000000000000000000000000000000000000000000000000000000000000000" > $$CLUSTER_SECRET; \
|
||||
fi; \
|
||||
SECRET=$$(cat $$CLUSTER_SECRET); \
|
||||
SWARM_KEY=$$HOME/.debros/swarm.key; \
|
||||
if [ ! -f $$SWARM_KEY ]; then \
|
||||
echo " Generating private swarm key..."; \
|
||||
KEY_HEX=$$(openssl rand -hex 32 | tr '[:lower:]' '[:upper:]'); \
|
||||
printf "/key/swarm/psk/1.0.0/\n/base16/\n%s\n" "$$KEY_HEX" > $$SWARM_KEY; \
|
||||
chmod 600 $$SWARM_KEY; \
|
||||
fi; \
|
||||
echo " Setting up bootstrap node (IPFS: 5001, Cluster: 9094)..."; \
|
||||
if [ ! -d $$HOME/.debros/bootstrap/ipfs/repo ]; then \
|
||||
echo " Initializing IPFS..."; \
|
||||
mkdir -p $$HOME/.debros/bootstrap/ipfs; \
|
||||
IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs init --profile=server 2>&1 | grep -v "generating" | grep -v "peer identity" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.API '["/ip4/localhost/tcp/5001"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/localhost/tcp/8080"]' 2>&1 | grep -v "generating" || true; \
|
||||
cp $$SWARM_KEY $$HOME/.debros/bootstrap/ipfs/repo/swarm.key; \
|
||||
IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.API '["/ip4/127.0.0.1/tcp/5001"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/127.0.0.1/tcp/8080"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo ipfs config --json Addresses.Swarm '["/ip4/0.0.0.0/tcp/4101","/ip6/::/tcp/4101"]' 2>&1 | grep -v "generating" || true; \
|
||||
else \
|
||||
if [ ! -f $$HOME/.debros/bootstrap/ipfs/repo/swarm.key ]; then \
|
||||
cp $$SWARM_KEY $$HOME/.debros/bootstrap/ipfs/repo/swarm.key; \
|
||||
fi; \
|
||||
fi; \
|
||||
echo " Initializing IPFS Cluster..."; \
|
||||
echo " Creating IPFS Cluster directories (config will be managed by Go code)..."; \
|
||||
mkdir -p $$HOME/.debros/bootstrap/ipfs-cluster; \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/bootstrap/ipfs-cluster ipfs-cluster-service init --force >/dev/null 2>&1 || true; \
|
||||
jq '.cluster.peername = "bootstrap" | .cluster.secret = "'$$SECRET'" | .cluster.listen_multiaddress = ["/ip4/0.0.0.0/tcp/9096"] | .consensus.crdt.cluster_name = "debros-cluster" | .consensus.crdt.trusted_peers = ["*"] | .api.restapi.http_listen_multiaddress = "/ip4/0.0.0.0/tcp/9094" | .api.ipfsproxy.listen_multiaddress = "/ip4/127.0.0.1/tcp/9095" | .api.pinsvcapi.http_listen_multiaddress = "/ip4/127.0.0.1/tcp/9097" | .ipfs_connector.ipfshttp.node_multiaddress = "/ip4/127.0.0.1/tcp/5001"' $$HOME/.debros/bootstrap/ipfs-cluster/service.json > $$HOME/.debros/bootstrap/ipfs-cluster/service.json.tmp && mv $$HOME/.debros/bootstrap/ipfs-cluster/service.json.tmp $$HOME/.debros/bootstrap/ipfs-cluster/service.json; \
|
||||
echo " Setting up node2 (IPFS: 5002, Cluster: 9104)..."; \
|
||||
if [ ! -d $$HOME/.debros/node2/ipfs/repo ]; then \
|
||||
echo " Initializing IPFS..."; \
|
||||
mkdir -p $$HOME/.debros/node2/ipfs; \
|
||||
IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs init --profile=server 2>&1 | grep -v "generating" | grep -v "peer identity" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.API '["/ip4/localhost/tcp/5002"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/localhost/tcp/8081"]' 2>&1 | grep -v "generating" || true; \
|
||||
cp $$SWARM_KEY $$HOME/.debros/node2/ipfs/repo/swarm.key; \
|
||||
IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.API '["/ip4/127.0.0.1/tcp/5002"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/127.0.0.1/tcp/8081"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/node2/ipfs/repo ipfs config --json Addresses.Swarm '["/ip4/0.0.0.0/tcp/4102","/ip6/::/tcp/4102"]' 2>&1 | grep -v "generating" || true; \
|
||||
else \
|
||||
if [ ! -f $$HOME/.debros/node2/ipfs/repo/swarm.key ]; then \
|
||||
cp $$SWARM_KEY $$HOME/.debros/node2/ipfs/repo/swarm.key; \
|
||||
fi; \
|
||||
fi; \
|
||||
echo " Initializing IPFS Cluster..."; \
|
||||
echo " Creating IPFS Cluster directories (config will be managed by Go code)..."; \
|
||||
mkdir -p $$HOME/.debros/node2/ipfs-cluster; \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/node2/ipfs-cluster ipfs-cluster-service init --force >/dev/null 2>&1 || true; \
|
||||
jq '.cluster.peername = "node2" | .cluster.secret = "'$$SECRET'" | .cluster.listen_multiaddress = ["/ip4/0.0.0.0/tcp/9106"] | .consensus.crdt.cluster_name = "debros-cluster" | .consensus.crdt.trusted_peers = ["*"] | .api.restapi.http_listen_multiaddress = "/ip4/0.0.0.0/tcp/9104" | .api.ipfsproxy.listen_multiaddress = "/ip4/127.0.0.1/tcp/9105" | .api.pinsvcapi.http_listen_multiaddress = "/ip4/127.0.0.1/tcp/9107" | .ipfs_connector.ipfshttp.node_multiaddress = "/ip4/127.0.0.1/tcp/5002"' $$HOME/.debros/node2/ipfs-cluster/service.json > $$HOME/.debros/node2/ipfs-cluster/service.json.tmp && mv $$HOME/.debros/node2/ipfs-cluster/service.json.tmp $$HOME/.debros/node2/ipfs-cluster/service.json; \
|
||||
echo " Setting up node3 (IPFS: 5003, Cluster: 9114)..."; \
|
||||
if [ ! -d $$HOME/.debros/node3/ipfs/repo ]; then \
|
||||
echo " Initializing IPFS..."; \
|
||||
mkdir -p $$HOME/.debros/node3/ipfs; \
|
||||
IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs init --profile=server 2>&1 | grep -v "generating" | grep -v "peer identity" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.API '["/ip4/localhost/tcp/5003"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/localhost/tcp/8082"]' 2>&1 | grep -v "generating" || true; \
|
||||
cp $$SWARM_KEY $$HOME/.debros/node3/ipfs/repo/swarm.key; \
|
||||
IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.API '["/ip4/127.0.0.1/tcp/5003"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.Gateway '["/ip4/127.0.0.1/tcp/8082"]' 2>&1 | grep -v "generating" || true; \
|
||||
IPFS_PATH=$$HOME/.debros/node3/ipfs/repo ipfs config --json Addresses.Swarm '["/ip4/0.0.0.0/tcp/4103","/ip6/::/tcp/4103"]' 2>&1 | grep -v "generating" || true; \
|
||||
else \
|
||||
if [ ! -f $$HOME/.debros/node3/ipfs/repo/swarm.key ]; then \
|
||||
cp $$SWARM_KEY $$HOME/.debros/node3/ipfs/repo/swarm.key; \
|
||||
fi; \
|
||||
fi; \
|
||||
echo " Initializing IPFS Cluster..."; \
|
||||
echo " Creating IPFS Cluster directories (config will be managed by Go code)..."; \
|
||||
mkdir -p $$HOME/.debros/node3/ipfs-cluster; \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/node3/ipfs-cluster ipfs-cluster-service init --force >/dev/null 2>&1 || true; \
|
||||
jq '.cluster.peername = "node3" | .cluster.secret = "'$$SECRET'" | .cluster.listen_multiaddress = ["/ip4/0.0.0.0/tcp/9116"] | .consensus.crdt.cluster_name = "debros-cluster" | .consensus.crdt.trusted_peers = ["*"] | .api.restapi.http_listen_multiaddress = "/ip4/0.0.0.0/tcp/9114" | .api.ipfsproxy.listen_multiaddress = "/ip4/127.0.0.1/tcp/9115" | .api.pinsvcapi.http_listen_multiaddress = "/ip4/127.0.0.1/tcp/9117" | .ipfs_connector.ipfshttp.node_multiaddress = "/ip4/127.0.0.1/tcp/5003"' $$HOME/.debros/node3/ipfs-cluster/service.json > $$HOME/.debros/node3/ipfs-cluster/service.json.tmp && mv $$HOME/.debros/node3/ipfs-cluster/service.json.tmp $$HOME/.debros/node3/ipfs-cluster/service.json; \
|
||||
echo "Starting IPFS daemons..."; \
|
||||
if [ ! -f .dev/pids/ipfs-bootstrap.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-bootstrap.pid) 2>/dev/null; then \
|
||||
IPFS_PATH=$$HOME/.debros/bootstrap/ipfs/repo nohup ipfs daemon --enable-pubsub-experiment > $$HOME/.debros/logs/ipfs-bootstrap.log 2>&1 & echo $$! > .dev/pids/ipfs-bootstrap.pid; \
|
||||
@ -178,29 +194,6 @@ dev: build
|
||||
else \
|
||||
echo " ✓ Node3 IPFS already running"; \
|
||||
fi; \
|
||||
\
|
||||
echo "Starting IPFS Cluster peers..."; \
|
||||
if [ ! -f .dev/pids/ipfs-cluster-bootstrap.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-bootstrap.pid) 2>/dev/null; then \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/bootstrap/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-bootstrap.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-bootstrap.pid; \
|
||||
echo " Bootstrap Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-bootstrap.pid), API: 9094)"; \
|
||||
sleep 3; \
|
||||
else \
|
||||
echo " ✓ Bootstrap Cluster already running"; \
|
||||
fi; \
|
||||
if [ ! -f .dev/pids/ipfs-cluster-node2.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-node2.pid) 2>/dev/null; then \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/node2/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-node2.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-node2.pid; \
|
||||
echo " Node2 Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-node2.pid), API: 9104)"; \
|
||||
sleep 3; \
|
||||
else \
|
||||
echo " ✓ Node2 Cluster already running"; \
|
||||
fi; \
|
||||
if [ ! -f .dev/pids/ipfs-cluster-node3.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-node3.pid) 2>/dev/null; then \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/node3/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-node3.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-node3.pid; \
|
||||
echo " Node3 Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-node3.pid), API: 9114)"; \
|
||||
sleep 3; \
|
||||
else \
|
||||
echo " ✓ Node3 Cluster already running"; \
|
||||
fi; \
|
||||
else \
|
||||
echo " ⚠️ ipfs or ipfs-cluster-service not found - skipping IPFS setup"; \
|
||||
echo " Install with: https://docs.ipfs.tech/install/ and https://ipfscluster.io/documentation/guides/install/"; \
|
||||
@ -208,12 +201,58 @@ dev: build
|
||||
@sleep 2
|
||||
@echo "Starting bootstrap node..."
|
||||
@nohup ./bin/node --config bootstrap.yaml > $$HOME/.debros/logs/bootstrap.log 2>&1 & echo $$! > .dev/pids/bootstrap.pid
|
||||
@sleep 2
|
||||
@sleep 3
|
||||
@echo "Starting node2..."
|
||||
@nohup ./bin/node --config node2.yaml > $$HOME/.debros/logs/node2.log 2>&1 & echo $$! > .dev/pids/node2.pid
|
||||
@sleep 1
|
||||
@sleep 2
|
||||
@echo "Starting node3..."
|
||||
@nohup ./bin/node --config node3.yaml > $$HOME/.debros/logs/node3.log 2>&1 & echo $$! > .dev/pids/node3.pid
|
||||
@sleep 3
|
||||
@echo "Starting IPFS Cluster daemons (after Go nodes have configured them)..."
|
||||
@if command -v ipfs-cluster-service >/dev/null 2>&1; then \
|
||||
if [ ! -f .dev/pids/ipfs-cluster-bootstrap.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-bootstrap.pid) 2>/dev/null; then \
|
||||
if [ -f $$HOME/.debros/bootstrap/ipfs-cluster/service.json ]; then \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/bootstrap/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-bootstrap.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-bootstrap.pid; \
|
||||
echo " Bootstrap Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-bootstrap.pid), API: 9094)"; \
|
||||
echo " Waiting for bootstrap cluster to be ready..."; \
|
||||
for i in $$(seq 1 30); do \
|
||||
if curl -s http://localhost:9094/peers >/dev/null 2>&1; then \
|
||||
break; \
|
||||
fi; \
|
||||
sleep 1; \
|
||||
done; \
|
||||
sleep 2; \
|
||||
else \
|
||||
echo " ⚠️ Bootstrap cluster config not ready yet"; \
|
||||
fi; \
|
||||
else \
|
||||
echo " ✓ Bootstrap Cluster already running"; \
|
||||
fi; \
|
||||
if [ ! -f .dev/pids/ipfs-cluster-node2.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-node2.pid) 2>/dev/null; then \
|
||||
if [ -f $$HOME/.debros/node2/ipfs-cluster/service.json ]; then \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/node2/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-node2.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-node2.pid; \
|
||||
echo " Node2 Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-node2.pid), API: 9104)"; \
|
||||
sleep 3; \
|
||||
else \
|
||||
echo " ⚠️ Node2 cluster config not ready yet"; \
|
||||
fi; \
|
||||
else \
|
||||
echo " ✓ Node2 Cluster already running"; \
|
||||
fi; \
|
||||
if [ ! -f .dev/pids/ipfs-cluster-node3.pid ] || ! kill -0 $$(cat .dev/pids/ipfs-cluster-node3.pid) 2>/dev/null; then \
|
||||
if [ -f $$HOME/.debros/node3/ipfs-cluster/service.json ]; then \
|
||||
env IPFS_CLUSTER_PATH=$$HOME/.debros/node3/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster-node3.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster-node3.pid; \
|
||||
echo " Node3 Cluster started (PID: $$(cat .dev/pids/ipfs-cluster-node3.pid), API: 9114)"; \
|
||||
sleep 3; \
|
||||
else \
|
||||
echo " ⚠️ Node3 cluster config not ready yet"; \
|
||||
fi; \
|
||||
else \
|
||||
echo " ✓ Node3 Cluster already running"; \
|
||||
fi; \
|
||||
else \
|
||||
echo " ⚠️ ipfs-cluster-service not found - skipping cluster daemon startup"; \
|
||||
fi
|
||||
@sleep 1
|
||||
@echo "Starting Olric cache server..."
|
||||
@if command -v olric-server >/dev/null 2>&1; then \
|
||||
|
||||
@ -1871,6 +1871,60 @@ func getOrGenerateClusterSecret() (string, error) {
|
||||
return secret, nil
|
||||
}
|
||||
|
||||
// getOrGenerateSwarmKey gets or generates a shared IPFS swarm key
|
||||
// Returns the swarm key content as bytes (formatted for IPFS)
|
||||
func getOrGenerateSwarmKey() ([]byte, error) {
|
||||
secretPath := "/home/debros/.debros/swarm.key"
|
||||
|
||||
// Try to read existing key
|
||||
if data, err := os.ReadFile(secretPath); err == nil {
|
||||
// Validate it's a proper swarm key format
|
||||
content := string(data)
|
||||
if strings.Contains(content, "/key/swarm/psk/1.0.0/") {
|
||||
return data, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Generate new key (32 bytes)
|
||||
keyBytes := make([]byte, 32)
|
||||
if _, err := rand.Read(keyBytes); err != nil {
|
||||
return nil, fmt.Errorf("failed to generate swarm key: %w", err)
|
||||
}
|
||||
|
||||
// Format as IPFS swarm key file
|
||||
keyHex := strings.ToUpper(hex.EncodeToString(keyBytes))
|
||||
content := fmt.Sprintf("/key/swarm/psk/1.0.0/\n/base16/\n%s\n", keyHex)
|
||||
|
||||
// Save key
|
||||
if err := os.WriteFile(secretPath, []byte(content), 0600); err != nil {
|
||||
return nil, fmt.Errorf("failed to save swarm key: %w", err)
|
||||
}
|
||||
exec.Command("chown", "debros:debros", secretPath).Run()
|
||||
|
||||
fmt.Printf(" ✓ Generated private swarm key\n")
|
||||
return []byte(content), nil
|
||||
}
|
||||
|
||||
// ensureSwarmKey ensures the swarm key exists in the IPFS repo
|
||||
func ensureSwarmKey(repoPath string, swarmKey []byte) error {
|
||||
swarmKeyPath := filepath.Join(repoPath, "swarm.key")
|
||||
|
||||
// Check if swarm key already exists
|
||||
if _, err := os.Stat(swarmKeyPath); err == nil {
|
||||
// Verify it matches (optional: could compare content)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create swarm key file in repo
|
||||
if err := os.WriteFile(swarmKeyPath, swarmKey, 0600); err != nil {
|
||||
return fmt.Errorf("failed to write swarm key to repo: %w", err)
|
||||
}
|
||||
|
||||
// Fix ownership
|
||||
exec.Command("chown", "debros:debros", swarmKeyPath).Run()
|
||||
return nil
|
||||
}
|
||||
|
||||
// initializeIPFSForNode initializes IPFS and IPFS Cluster for a node
|
||||
func initializeIPFSForNode(nodeID, vpsIP string, isBootstrap bool) error {
|
||||
fmt.Printf(" Initializing IPFS and Cluster for node %s...\n", nodeID)
|
||||
@ -1881,6 +1935,12 @@ func initializeIPFSForNode(nodeID, vpsIP string, isBootstrap bool) error {
|
||||
return fmt.Errorf("failed to get cluster secret: %w", err)
|
||||
}
|
||||
|
||||
// Get or generate swarm key for private network
|
||||
swarmKey, err := getOrGenerateSwarmKey()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get swarm key: %w", err)
|
||||
}
|
||||
|
||||
// Determine data directories
|
||||
var ipfsDataDir, clusterDataDir string
|
||||
if nodeID == "bootstrap" {
|
||||
@ -1906,11 +1966,22 @@ func initializeIPFSForNode(nodeID, vpsIP string, isBootstrap bool) error {
|
||||
return fmt.Errorf("failed to initialize IPFS: %v\n%s", err, string(output))
|
||||
}
|
||||
|
||||
// Ensure swarm key is in place (creates private network)
|
||||
if err := ensureSwarmKey(ipfsRepoPath, swarmKey); err != nil {
|
||||
return fmt.Errorf("failed to set swarm key: %w", err)
|
||||
}
|
||||
|
||||
// Configure IPFS API and Gateway addresses
|
||||
exec.Command("sudo", "-u", "debros", "ipfs", "config", "--json", "Addresses.API", `["/ip4/localhost/tcp/5001"]`, "--repo-dir="+ipfsRepoPath).Run()
|
||||
exec.Command("sudo", "-u", "debros", "ipfs", "config", "--json", "Addresses.Gateway", `["/ip4/localhost/tcp/8080"]`, "--repo-dir="+ipfsRepoPath).Run()
|
||||
exec.Command("sudo", "-u", "debros", "ipfs", "config", "--json", "Addresses.Swarm", `["/ip4/0.0.0.0/tcp/4001","/ip6/::/tcp/4001"]`, "--repo-dir="+ipfsRepoPath).Run()
|
||||
fmt.Printf(" ✓ IPFS initialized\n")
|
||||
fmt.Printf(" ✓ IPFS initialized with private swarm key\n")
|
||||
} else {
|
||||
// Repo exists, but ensure swarm key is present
|
||||
if err := ensureSwarmKey(ipfsRepoPath, swarmKey); err != nil {
|
||||
return fmt.Errorf("failed to set swarm key: %w", err)
|
||||
}
|
||||
fmt.Printf(" ✓ IPFS repository already exists, swarm key ensured\n")
|
||||
}
|
||||
|
||||
// Initialize IPFS Cluster if not already initialized
|
||||
@ -2084,6 +2155,7 @@ User=debros
|
||||
Group=debros
|
||||
Environment=HOME=/home/debros
|
||||
ExecStartPre=/bin/bash -c 'if [ -f /home/debros/.debros/node.yaml ]; then export IPFS_PATH=/home/debros/.debros/node/ipfs/repo; elif [ -f /home/debros/.debros/bootstrap.yaml ]; then export IPFS_PATH=/home/debros/.debros/bootstrap/ipfs/repo; else export IPFS_PATH=/home/debros/.debros/bootstrap/ipfs/repo; fi'
|
||||
ExecStartPre=/bin/bash -c 'if [ -f /home/debros/.debros/swarm.key ] && [ ! -f ${IPFS_PATH}/swarm.key ]; then cp /home/debros/.debros/swarm.key ${IPFS_PATH}/swarm.key && chmod 600 ${IPFS_PATH}/swarm.key; fi'
|
||||
ExecStart=/usr/bin/ipfs daemon --enable-pubsub-experiment --repo-dir=${IPFS_PATH}
|
||||
Restart=always
|
||||
RestartSec=5
|
||||
|
||||
717
pkg/ipfs/cluster.go
Normal file
717
pkg/ipfs/cluster.go
Normal file
@ -0,0 +1,717 @@
|
||||
package ipfs
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/config"
|
||||
)
|
||||
|
||||
// ClusterConfigManager manages IPFS Cluster configuration files
|
||||
type ClusterConfigManager struct {
|
||||
cfg *config.Config
|
||||
logger *zap.Logger
|
||||
clusterPath string
|
||||
secret string
|
||||
}
|
||||
|
||||
// ClusterServiceConfig represents the structure of service.json
|
||||
type ClusterServiceConfig struct {
|
||||
Cluster struct {
|
||||
Peername string `json:"peername"`
|
||||
Secret string `json:"secret"`
|
||||
LeaveOnShutdown bool `json:"leave_on_shutdown"`
|
||||
ListenMultiaddress []string `json:"listen_multiaddress"`
|
||||
PeerAddresses []string `json:"peer_addresses"`
|
||||
// ... other fields kept from template
|
||||
} `json:"cluster"`
|
||||
Consensus struct {
|
||||
CRDT struct {
|
||||
ClusterName string `json:"cluster_name"`
|
||||
TrustedPeers []string `json:"trusted_peers"`
|
||||
Batching struct {
|
||||
MaxBatchSize int `json:"max_batch_size"`
|
||||
MaxBatchAge string `json:"max_batch_age"`
|
||||
} `json:"batching"`
|
||||
RepairInterval string `json:"repair_interval"`
|
||||
} `json:"crdt"`
|
||||
} `json:"consensus"`
|
||||
API struct {
|
||||
IPFSProxy struct {
|
||||
ListenMultiaddress string `json:"listen_multiaddress"`
|
||||
NodeMultiaddress string `json:"node_multiaddress"`
|
||||
} `json:"ipfsproxy"`
|
||||
PinSvcAPI struct {
|
||||
HTTPListenMultiaddress string `json:"http_listen_multiaddress"`
|
||||
} `json:"pinsvcapi"`
|
||||
RestAPI struct {
|
||||
HTTPListenMultiaddress string `json:"http_listen_multiaddress"`
|
||||
} `json:"restapi"`
|
||||
} `json:"api"`
|
||||
IPFSConnector struct {
|
||||
IPFSHTTP struct {
|
||||
NodeMultiaddress string `json:"node_multiaddress"`
|
||||
} `json:"ipfshttp"`
|
||||
} `json:"ipfs_connector"`
|
||||
// Keep rest of fields as raw JSON to preserve structure
|
||||
Raw map[string]interface{} `json:"-"`
|
||||
}
|
||||
|
||||
// NewClusterConfigManager creates a new IPFS Cluster config manager
|
||||
func NewClusterConfigManager(cfg *config.Config, logger *zap.Logger) (*ClusterConfigManager, error) {
|
||||
// Expand data directory path
|
||||
dataDir := cfg.Node.DataDir
|
||||
if strings.HasPrefix(dataDir, "~") {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to determine home directory: %w", err)
|
||||
}
|
||||
dataDir = filepath.Join(home, dataDir[1:])
|
||||
}
|
||||
|
||||
// Determine cluster path based on data directory structure
|
||||
// Check if dataDir contains specific node names (e.g., ~/.debros/bootstrap, ~/.debros/node2)
|
||||
clusterPath := filepath.Join(dataDir, "ipfs-cluster")
|
||||
if strings.Contains(dataDir, "bootstrap") {
|
||||
// Check if bootstrap is a direct child
|
||||
if filepath.Base(filepath.Dir(dataDir)) == "bootstrap" || filepath.Base(dataDir) == "bootstrap" {
|
||||
clusterPath = filepath.Join(dataDir, "ipfs-cluster")
|
||||
} else {
|
||||
clusterPath = filepath.Join(dataDir, "bootstrap", "ipfs-cluster")
|
||||
}
|
||||
} else if strings.Contains(dataDir, "node2") {
|
||||
if filepath.Base(filepath.Dir(dataDir)) == "node2" || filepath.Base(dataDir) == "node2" {
|
||||
clusterPath = filepath.Join(dataDir, "ipfs-cluster")
|
||||
} else {
|
||||
clusterPath = filepath.Join(dataDir, "node2", "ipfs-cluster")
|
||||
}
|
||||
} else if strings.Contains(dataDir, "node3") {
|
||||
if filepath.Base(filepath.Dir(dataDir)) == "node3" || filepath.Base(dataDir) == "node3" {
|
||||
clusterPath = filepath.Join(dataDir, "ipfs-cluster")
|
||||
} else {
|
||||
clusterPath = filepath.Join(dataDir, "node3", "ipfs-cluster")
|
||||
}
|
||||
}
|
||||
|
||||
// Load or generate cluster secret
|
||||
secretPath := filepath.Join(dataDir, "..", "cluster-secret")
|
||||
if strings.Contains(dataDir, ".debros") {
|
||||
// Try to find cluster-secret in ~/.debros
|
||||
home, err := os.UserHomeDir()
|
||||
if err == nil {
|
||||
secretPath = filepath.Join(home, ".debros", "cluster-secret")
|
||||
}
|
||||
}
|
||||
|
||||
secret, err := loadOrGenerateClusterSecret(secretPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load/generate cluster secret: %w", err)
|
||||
}
|
||||
|
||||
return &ClusterConfigManager{
|
||||
cfg: cfg,
|
||||
logger: logger,
|
||||
clusterPath: clusterPath,
|
||||
secret: secret,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// EnsureConfig ensures the IPFS Cluster service.json exists and is properly configured
|
||||
func (cm *ClusterConfigManager) EnsureConfig() error {
|
||||
if cm.cfg.Database.IPFS.ClusterAPIURL == "" {
|
||||
cm.logger.Debug("IPFS Cluster API URL not configured, skipping cluster config")
|
||||
return nil
|
||||
}
|
||||
|
||||
serviceJSONPath := filepath.Join(cm.clusterPath, "service.json")
|
||||
|
||||
// Parse ports from URLs
|
||||
clusterPort, restAPIPort, err := parseClusterPorts(cm.cfg.Database.IPFS.ClusterAPIURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse cluster API URL: %w", err)
|
||||
}
|
||||
|
||||
ipfsPort, err := parseIPFSPort(cm.cfg.Database.IPFS.APIURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse IPFS API URL: %w", err)
|
||||
}
|
||||
|
||||
// Determine node name
|
||||
nodeName := cm.cfg.Node.Type
|
||||
if nodeName == "node" {
|
||||
// Try to extract from data dir or ID
|
||||
if strings.Contains(cm.cfg.Node.DataDir, "node2") || strings.Contains(cm.cfg.Node.ID, "node2") {
|
||||
nodeName = "node2"
|
||||
} else if strings.Contains(cm.cfg.Node.DataDir, "node3") || strings.Contains(cm.cfg.Node.ID, "node3") {
|
||||
nodeName = "node3"
|
||||
} else {
|
||||
nodeName = "node"
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate ports based on pattern
|
||||
proxyPort := clusterPort - 1
|
||||
pinSvcPort := clusterPort + 1
|
||||
clusterListenPort := clusterPort + 2
|
||||
|
||||
// If config doesn't exist, initialize it with ipfs-cluster-service init
|
||||
// This ensures we have all required sections (datastore, informer, etc.)
|
||||
if _, err := os.Stat(serviceJSONPath); os.IsNotExist(err) {
|
||||
cm.logger.Info("Initializing cluster config with ipfs-cluster-service init")
|
||||
initCmd := exec.Command("ipfs-cluster-service", "init", "--force")
|
||||
initCmd.Env = append(os.Environ(), "IPFS_CLUSTER_PATH="+cm.clusterPath)
|
||||
if err := initCmd.Run(); err != nil {
|
||||
cm.logger.Warn("Failed to initialize cluster config with ipfs-cluster-service init, will create minimal template", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Load existing config or create new
|
||||
cfg, err := cm.loadOrCreateConfig(serviceJSONPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load/create config: %w", err)
|
||||
}
|
||||
|
||||
// Update configuration
|
||||
cfg.Cluster.Peername = nodeName
|
||||
cfg.Cluster.Secret = cm.secret
|
||||
cfg.Cluster.ListenMultiaddress = []string{fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", clusterListenPort)}
|
||||
cfg.Consensus.CRDT.ClusterName = "debros-cluster"
|
||||
cfg.Consensus.CRDT.TrustedPeers = []string{"*"}
|
||||
|
||||
// API endpoints
|
||||
cfg.API.RestAPI.HTTPListenMultiaddress = fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", restAPIPort)
|
||||
cfg.API.IPFSProxy.ListenMultiaddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", proxyPort)
|
||||
cfg.API.IPFSProxy.NodeMultiaddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsPort) // FIX: Correct path!
|
||||
cfg.API.PinSvcAPI.HTTPListenMultiaddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", pinSvcPort)
|
||||
|
||||
// IPFS connector (also needs to be set)
|
||||
cfg.IPFSConnector.IPFSHTTP.NodeMultiaddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", ipfsPort)
|
||||
|
||||
// Save configuration
|
||||
if err := cm.saveConfig(serviceJSONPath, cfg); err != nil {
|
||||
return fmt.Errorf("failed to save config: %w", err)
|
||||
}
|
||||
|
||||
cm.logger.Info("IPFS Cluster configuration ensured",
|
||||
zap.String("path", serviceJSONPath),
|
||||
zap.String("node_name", nodeName),
|
||||
zap.Int("ipfs_port", ipfsPort),
|
||||
zap.Int("cluster_port", clusterPort),
|
||||
zap.Int("rest_api_port", restAPIPort))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateBootstrapPeers updates peer_addresses and peerstore with bootstrap peer information
|
||||
func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) error {
|
||||
if cm.cfg.Database.IPFS.ClusterAPIURL == "" {
|
||||
return nil // IPFS not configured
|
||||
}
|
||||
|
||||
// Skip if this is the bootstrap node itself
|
||||
if cm.cfg.Node.Type == "bootstrap" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Query bootstrap cluster API to get peer ID
|
||||
peerID, err := getBootstrapPeerID(bootstrapAPIURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get bootstrap peer ID: %w", err)
|
||||
}
|
||||
|
||||
if peerID == "" {
|
||||
cm.logger.Warn("Bootstrap peer ID not available yet")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Extract bootstrap cluster port from URL
|
||||
_, clusterPort, err := parseClusterPorts(bootstrapAPIURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err)
|
||||
}
|
||||
|
||||
// Bootstrap listens on clusterPort + 2 (same pattern)
|
||||
bootstrapClusterPort := clusterPort + 2
|
||||
bootstrapPeerAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/%s", bootstrapClusterPort, peerID)
|
||||
|
||||
// Load current config
|
||||
serviceJSONPath := filepath.Join(cm.clusterPath, "service.json")
|
||||
cfg, err := cm.loadOrCreateConfig(serviceJSONPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load config: %w", err)
|
||||
}
|
||||
|
||||
// Update peer_addresses
|
||||
cfg.Cluster.PeerAddresses = []string{bootstrapPeerAddr}
|
||||
|
||||
// Save config
|
||||
if err := cm.saveConfig(serviceJSONPath, cfg); err != nil {
|
||||
return fmt.Errorf("failed to save config: %w", err)
|
||||
}
|
||||
|
||||
// Write to peerstore file
|
||||
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
|
||||
if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil {
|
||||
return fmt.Errorf("failed to write peerstore: %w", err)
|
||||
}
|
||||
|
||||
cm.logger.Info("Updated bootstrap peer configuration",
|
||||
zap.String("bootstrap_peer_addr", bootstrapPeerAddr),
|
||||
zap.String("peerstore_path", peerstorePath))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadOrCreateConfig loads existing service.json or creates a template
|
||||
func (cm *ClusterConfigManager) loadOrCreateConfig(path string) (*ClusterServiceConfig, error) {
|
||||
// Try to load existing config
|
||||
if data, err := os.ReadFile(path); err == nil {
|
||||
var cfg ClusterServiceConfig
|
||||
if err := json.Unmarshal(data, &cfg); err == nil {
|
||||
// Also unmarshal into raw map to preserve all fields
|
||||
var raw map[string]interface{}
|
||||
if err := json.Unmarshal(data, &raw); err == nil {
|
||||
cfg.Raw = raw
|
||||
}
|
||||
return &cfg, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Create new config from template
|
||||
return cm.createTemplateConfig(), nil
|
||||
}
|
||||
|
||||
// createTemplateConfig creates a template configuration matching the structure
|
||||
func (cm *ClusterConfigManager) createTemplateConfig() *ClusterServiceConfig {
|
||||
cfg := &ClusterServiceConfig{}
|
||||
cfg.Cluster.LeaveOnShutdown = false
|
||||
cfg.Cluster.PeerAddresses = []string{}
|
||||
cfg.Consensus.CRDT.TrustedPeers = []string{"*"}
|
||||
cfg.Consensus.CRDT.Batching.MaxBatchSize = 0
|
||||
cfg.Consensus.CRDT.Batching.MaxBatchAge = "0s"
|
||||
cfg.Consensus.CRDT.RepairInterval = "1h0m0s"
|
||||
cfg.Raw = make(map[string]interface{})
|
||||
return cfg
|
||||
}
|
||||
|
||||
// saveConfig saves the configuration, preserving all existing fields
|
||||
func (cm *ClusterConfigManager) saveConfig(path string, cfg *ClusterServiceConfig) error {
|
||||
// Create directory if needed
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
|
||||
return fmt.Errorf("failed to create cluster directory: %w", err)
|
||||
}
|
||||
|
||||
// Load existing config if it exists to preserve all fields
|
||||
var final map[string]interface{}
|
||||
if data, err := os.ReadFile(path); err == nil {
|
||||
if err := json.Unmarshal(data, &final); err != nil {
|
||||
// If parsing fails, start fresh
|
||||
final = make(map[string]interface{})
|
||||
}
|
||||
} else {
|
||||
final = make(map[string]interface{})
|
||||
}
|
||||
|
||||
// Deep merge: update nested structures while preserving other fields
|
||||
updateNestedMap(final, "cluster", map[string]interface{}{
|
||||
"peername": cfg.Cluster.Peername,
|
||||
"secret": cfg.Cluster.Secret,
|
||||
"leave_on_shutdown": cfg.Cluster.LeaveOnShutdown,
|
||||
"listen_multiaddress": cfg.Cluster.ListenMultiaddress,
|
||||
"peer_addresses": cfg.Cluster.PeerAddresses,
|
||||
})
|
||||
|
||||
updateNestedMap(final, "consensus", map[string]interface{}{
|
||||
"crdt": map[string]interface{}{
|
||||
"cluster_name": cfg.Consensus.CRDT.ClusterName,
|
||||
"trusted_peers": cfg.Consensus.CRDT.TrustedPeers,
|
||||
"batching": map[string]interface{}{
|
||||
"max_batch_size": cfg.Consensus.CRDT.Batching.MaxBatchSize,
|
||||
"max_batch_age": cfg.Consensus.CRDT.Batching.MaxBatchAge,
|
||||
},
|
||||
"repair_interval": cfg.Consensus.CRDT.RepairInterval,
|
||||
},
|
||||
})
|
||||
|
||||
// Update API section, preserving other fields
|
||||
updateNestedMap(final, "api", map[string]interface{}{
|
||||
"ipfsproxy": map[string]interface{}{
|
||||
"listen_multiaddress": cfg.API.IPFSProxy.ListenMultiaddress,
|
||||
"node_multiaddress": cfg.API.IPFSProxy.NodeMultiaddress, // FIX: Correct path!
|
||||
},
|
||||
"pinsvcapi": map[string]interface{}{
|
||||
"http_listen_multiaddress": cfg.API.PinSvcAPI.HTTPListenMultiaddress,
|
||||
},
|
||||
"restapi": map[string]interface{}{
|
||||
"http_listen_multiaddress": cfg.API.RestAPI.HTTPListenMultiaddress,
|
||||
},
|
||||
})
|
||||
|
||||
// Update IPFS connector section
|
||||
updateNestedMap(final, "ipfs_connector", map[string]interface{}{
|
||||
"ipfshttp": map[string]interface{}{
|
||||
"node_multiaddress": cfg.IPFSConnector.IPFSHTTP.NodeMultiaddress,
|
||||
"connect_swarms_delay": "30s",
|
||||
"ipfs_request_timeout": "5m0s",
|
||||
"pin_timeout": "2m0s",
|
||||
"unpin_timeout": "3h0m0s",
|
||||
"repogc_timeout": "24h0m0s",
|
||||
"informer_trigger_interval": 0,
|
||||
},
|
||||
})
|
||||
|
||||
// Ensure all required sections exist with defaults if missing
|
||||
ensureRequiredSection(final, "datastore", map[string]interface{}{
|
||||
"pebble": map[string]interface{}{
|
||||
"pebble_options": map[string]interface{}{
|
||||
"cache_size_bytes": 1073741824,
|
||||
"bytes_per_sync": 1048576,
|
||||
"disable_wal": false,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
ensureRequiredSection(final, "informer", map[string]interface{}{
|
||||
"disk": map[string]interface{}{
|
||||
"metric_ttl": "30s",
|
||||
"metric_type": "freespace",
|
||||
},
|
||||
"pinqueue": map[string]interface{}{
|
||||
"metric_ttl": "30s",
|
||||
"weight_bucket_size": 100000,
|
||||
},
|
||||
"tags": map[string]interface{}{
|
||||
"metric_ttl": "30s",
|
||||
"tags": map[string]interface{}{
|
||||
"group": "default",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
ensureRequiredSection(final, "monitor", map[string]interface{}{
|
||||
"pubsubmon": map[string]interface{}{
|
||||
"check_interval": "15s",
|
||||
},
|
||||
})
|
||||
|
||||
ensureRequiredSection(final, "pin_tracker", map[string]interface{}{
|
||||
"stateless": map[string]interface{}{
|
||||
"concurrent_pins": 10,
|
||||
"priority_pin_max_age": "24h0m0s",
|
||||
"priority_pin_max_retries": 5,
|
||||
},
|
||||
})
|
||||
|
||||
ensureRequiredSection(final, "allocator", map[string]interface{}{
|
||||
"balanced": map[string]interface{}{
|
||||
"allocate_by": []interface{}{"tag:group", "freespace"},
|
||||
},
|
||||
})
|
||||
|
||||
// Write JSON
|
||||
data, err := json.MarshalIndent(final, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal config: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(path, data, 0644); err != nil {
|
||||
return fmt.Errorf("failed to write config: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateNestedMap updates a nested map structure, merging values
|
||||
func updateNestedMap(parent map[string]interface{}, key string, updates map[string]interface{}) {
|
||||
existing, ok := parent[key].(map[string]interface{})
|
||||
if !ok {
|
||||
parent[key] = updates
|
||||
return
|
||||
}
|
||||
|
||||
// Merge updates into existing
|
||||
for k, v := range updates {
|
||||
if vm, ok := v.(map[string]interface{}); ok {
|
||||
// Recursively merge nested maps
|
||||
if _, ok := existing[k].(map[string]interface{}); !ok {
|
||||
existing[k] = vm
|
||||
} else {
|
||||
updateNestedMap(existing, k, vm)
|
||||
}
|
||||
} else {
|
||||
existing[k] = v
|
||||
}
|
||||
}
|
||||
parent[key] = existing
|
||||
}
|
||||
|
||||
// ensureRequiredSection ensures a section exists in the config, creating it with defaults if missing
|
||||
func ensureRequiredSection(parent map[string]interface{}, key string, defaults map[string]interface{}) {
|
||||
if _, exists := parent[key]; !exists {
|
||||
parent[key] = defaults
|
||||
return
|
||||
}
|
||||
// If section exists, merge defaults to ensure all required subsections exist
|
||||
existing, ok := parent[key].(map[string]interface{})
|
||||
if ok {
|
||||
updateNestedMap(parent, key, defaults)
|
||||
parent[key] = existing
|
||||
}
|
||||
}
|
||||
|
||||
// parseClusterPorts extracts cluster port and REST API port from ClusterAPIURL
|
||||
func parseClusterPorts(clusterAPIURL string) (clusterPort, restAPIPort int, err error) {
|
||||
u, err := url.Parse(clusterAPIURL)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
portStr := u.Port()
|
||||
if portStr == "" {
|
||||
// Default port based on scheme
|
||||
if u.Scheme == "http" {
|
||||
portStr = "9094"
|
||||
} else if u.Scheme == "https" {
|
||||
portStr = "443"
|
||||
} else {
|
||||
return 0, 0, fmt.Errorf("unknown scheme: %s", u.Scheme)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = fmt.Sscanf(portStr, "%d", &restAPIPort)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("invalid port: %s", portStr)
|
||||
}
|
||||
|
||||
// Cluster listen port is typically REST API port + 2
|
||||
clusterPort = restAPIPort + 2
|
||||
|
||||
return clusterPort, restAPIPort, nil
|
||||
}
|
||||
|
||||
// parseIPFSPort extracts IPFS API port from APIURL
|
||||
func parseIPFSPort(apiURL string) (int, error) {
|
||||
if apiURL == "" {
|
||||
return 5001, nil // Default
|
||||
}
|
||||
|
||||
u, err := url.Parse(apiURL)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
portStr := u.Port()
|
||||
if portStr == "" {
|
||||
if u.Scheme == "http" {
|
||||
return 5001, nil // Default HTTP port
|
||||
}
|
||||
return 0, fmt.Errorf("unknown scheme: %s", u.Scheme)
|
||||
}
|
||||
|
||||
var port int
|
||||
_, err = fmt.Sscanf(portStr, "%d", &port)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid port: %s", portStr)
|
||||
}
|
||||
|
||||
return port, nil
|
||||
}
|
||||
|
||||
// getBootstrapPeerID queries the bootstrap cluster API to get the peer ID
|
||||
func getBootstrapPeerID(apiURL string) (string, error) {
|
||||
// Simple HTTP client to query /peers endpoint
|
||||
client := &standardHTTPClient{}
|
||||
peersResp, err := client.Get(fmt.Sprintf("%s/peers", apiURL))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var peersData struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
if err := json.Unmarshal(peersResp, &peersData); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return peersData.ID, nil
|
||||
}
|
||||
|
||||
// loadOrGenerateClusterSecret loads cluster secret or generates a new one
|
||||
func loadOrGenerateClusterSecret(path string) (string, error) {
|
||||
// Try to load existing secret
|
||||
if data, err := os.ReadFile(path); err == nil {
|
||||
return strings.TrimSpace(string(data)), nil
|
||||
}
|
||||
|
||||
// Generate new secret (32 bytes hex = 64 hex chars)
|
||||
secret := generateRandomSecret(64)
|
||||
|
||||
// Save secret
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := os.WriteFile(path, []byte(secret), 0600); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return secret, nil
|
||||
}
|
||||
|
||||
// generateRandomSecret generates a random hex string
|
||||
func generateRandomSecret(length int) string {
|
||||
bytes := make([]byte, length/2)
|
||||
if _, err := rand.Read(bytes); err != nil {
|
||||
// Fallback to simple generation if crypto/rand fails
|
||||
for i := range bytes {
|
||||
bytes[i] = byte(os.Getpid() + i)
|
||||
}
|
||||
}
|
||||
return hex.EncodeToString(bytes)
|
||||
}
|
||||
|
||||
// standardHTTPClient implements HTTP client using net/http
|
||||
type standardHTTPClient struct{}
|
||||
|
||||
func (c *standardHTTPClient) Get(url string) ([]byte, error) {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// FixIPFSConfigAddresses fixes localhost addresses in IPFS config to use 127.0.0.1
|
||||
// This is necessary because IPFS doesn't accept "localhost" as a valid IP address in multiaddrs
|
||||
// This function always ensures the config is correct, regardless of current state
|
||||
func (cm *ClusterConfigManager) FixIPFSConfigAddresses() error {
|
||||
if cm.cfg.Database.IPFS.APIURL == "" {
|
||||
return nil // IPFS not configured
|
||||
}
|
||||
|
||||
// Determine IPFS repo path from config
|
||||
dataDir := cm.cfg.Node.DataDir
|
||||
if strings.HasPrefix(dataDir, "~") {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to determine home directory: %w", err)
|
||||
}
|
||||
dataDir = filepath.Join(home, dataDir[1:])
|
||||
}
|
||||
|
||||
// Try to find IPFS repo path
|
||||
// Check common locations: dataDir/ipfs/repo, or dataDir/bootstrap/ipfs/repo, etc.
|
||||
possiblePaths := []string{
|
||||
filepath.Join(dataDir, "ipfs", "repo"),
|
||||
filepath.Join(dataDir, "bootstrap", "ipfs", "repo"),
|
||||
filepath.Join(dataDir, "node2", "ipfs", "repo"),
|
||||
filepath.Join(dataDir, "node3", "ipfs", "repo"),
|
||||
filepath.Join(filepath.Dir(dataDir), "bootstrap", "ipfs", "repo"),
|
||||
filepath.Join(filepath.Dir(dataDir), "node2", "ipfs", "repo"),
|
||||
filepath.Join(filepath.Dir(dataDir), "node3", "ipfs", "repo"),
|
||||
}
|
||||
|
||||
var ipfsRepoPath string
|
||||
for _, path := range possiblePaths {
|
||||
if _, err := os.Stat(filepath.Join(path, "config")); err == nil {
|
||||
ipfsRepoPath = path
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if ipfsRepoPath == "" {
|
||||
cm.logger.Debug("IPFS repo not found, skipping config fix")
|
||||
return nil // Not an error if repo doesn't exist yet
|
||||
}
|
||||
|
||||
// Parse IPFS API port from config
|
||||
ipfsPort, err := parseIPFSPort(cm.cfg.Database.IPFS.APIURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse IPFS API URL: %w", err)
|
||||
}
|
||||
|
||||
// Determine gateway port (typically API port + 3079, or 8080 for bootstrap, 8081 for node2, etc.)
|
||||
gatewayPort := 8080
|
||||
if strings.Contains(dataDir, "node2") {
|
||||
gatewayPort = 8081
|
||||
} else if strings.Contains(dataDir, "node3") {
|
||||
gatewayPort = 8082
|
||||
} else if ipfsPort == 5002 {
|
||||
gatewayPort = 8081
|
||||
} else if ipfsPort == 5003 {
|
||||
gatewayPort = 8082
|
||||
}
|
||||
|
||||
// Always ensure API address is correct (don't just check, always set it)
|
||||
correctAPIAddr := fmt.Sprintf(`["/ip4/127.0.0.1/tcp/%d"]`, ipfsPort)
|
||||
cm.logger.Info("Ensuring IPFS API address is correct",
|
||||
zap.String("repo", ipfsRepoPath),
|
||||
zap.Int("port", ipfsPort),
|
||||
zap.String("correct_address", correctAPIAddr))
|
||||
|
||||
fixCmd := exec.Command("ipfs", "config", "--json", "Addresses.API", correctAPIAddr)
|
||||
fixCmd.Env = append(os.Environ(), "IPFS_PATH="+ipfsRepoPath)
|
||||
if err := fixCmd.Run(); err != nil {
|
||||
cm.logger.Warn("Failed to fix IPFS API address", zap.Error(err))
|
||||
return fmt.Errorf("failed to set IPFS API address: %w", err)
|
||||
}
|
||||
|
||||
// Always ensure Gateway address is correct
|
||||
correctGatewayAddr := fmt.Sprintf(`["/ip4/127.0.0.1/tcp/%d"]`, gatewayPort)
|
||||
cm.logger.Info("Ensuring IPFS Gateway address is correct",
|
||||
zap.String("repo", ipfsRepoPath),
|
||||
zap.Int("port", gatewayPort),
|
||||
zap.String("correct_address", correctGatewayAddr))
|
||||
|
||||
fixCmd = exec.Command("ipfs", "config", "--json", "Addresses.Gateway", correctGatewayAddr)
|
||||
fixCmd.Env = append(os.Environ(), "IPFS_PATH="+ipfsRepoPath)
|
||||
if err := fixCmd.Run(); err != nil {
|
||||
cm.logger.Warn("Failed to fix IPFS Gateway address", zap.Error(err))
|
||||
return fmt.Errorf("failed to set IPFS Gateway address: %w", err)
|
||||
}
|
||||
|
||||
// Check if IPFS daemon is running - if so, it may need to be restarted for changes to take effect
|
||||
// We can't restart it from here (it's managed by Makefile/systemd), but we can warn
|
||||
if cm.isIPFSRunning(ipfsPort) {
|
||||
cm.logger.Warn("IPFS daemon appears to be running - it may need to be restarted for config changes to take effect",
|
||||
zap.Int("port", ipfsPort),
|
||||
zap.String("repo", ipfsRepoPath))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isIPFSRunning checks if IPFS daemon is running by attempting to connect to the API
|
||||
func (cm *ClusterConfigManager) isIPFSRunning(port int) bool {
|
||||
client := &http.Client{
|
||||
Timeout: 1 * time.Second,
|
||||
}
|
||||
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/api/v0/id", port))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
resp.Body.Close()
|
||||
return resp.StatusCode == 200
|
||||
}
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"github.com/DeBrosOfficial/network/pkg/config"
|
||||
"github.com/DeBrosOfficial/network/pkg/discovery"
|
||||
"github.com/DeBrosOfficial/network/pkg/encryption"
|
||||
"github.com/DeBrosOfficial/network/pkg/ipfs"
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"github.com/DeBrosOfficial/network/pkg/pubsub"
|
||||
database "github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
@ -45,6 +46,9 @@ type Node struct {
|
||||
|
||||
// Discovery
|
||||
discoveryManager *discovery.Manager
|
||||
|
||||
// IPFS Cluster config manager
|
||||
clusterConfigManager *ipfs.ClusterConfigManager
|
||||
}
|
||||
|
||||
// NewNode creates a new network node
|
||||
@ -631,6 +635,14 @@ func (n *Node) Start(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to start LibP2P: %w", err)
|
||||
}
|
||||
|
||||
// Initialize IPFS Cluster configuration if enabled
|
||||
if n.config.Database.IPFS.ClusterAPIURL != "" {
|
||||
if err := n.startIPFSClusterConfig(); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to initialize IPFS Cluster config", zap.Error(err))
|
||||
// Don't fail node startup if cluster config fails
|
||||
}
|
||||
}
|
||||
|
||||
// Start RQLite with cluster discovery
|
||||
if err := n.startRQLite(ctx); err != nil {
|
||||
return fmt.Errorf("failed to start RQLite: %w", err)
|
||||
@ -651,3 +663,41 @@ func (n *Node) Start(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// startIPFSClusterConfig initializes and ensures IPFS Cluster configuration
|
||||
func (n *Node) startIPFSClusterConfig() error {
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "Initializing IPFS Cluster configuration")
|
||||
|
||||
// Create config manager
|
||||
cm, err := ipfs.NewClusterConfigManager(n.config, n.logger.Logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create cluster config manager: %w", err)
|
||||
}
|
||||
n.clusterConfigManager = cm
|
||||
|
||||
// Fix IPFS config addresses (localhost -> 127.0.0.1) before ensuring cluster config
|
||||
if err := cm.FixIPFSConfigAddresses(); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to fix IPFS config addresses", zap.Error(err))
|
||||
// Don't fail startup if config fix fails - cluster config will handle it
|
||||
}
|
||||
|
||||
// Ensure configuration exists and is correct
|
||||
if err := cm.EnsureConfig(); err != nil {
|
||||
return fmt.Errorf("failed to ensure cluster config: %w", err)
|
||||
}
|
||||
|
||||
// If this is not the bootstrap node, try to update bootstrap peer info
|
||||
if n.config.Node.Type != "bootstrap" && len(n.config.Discovery.BootstrapPeers) > 0 {
|
||||
// Try to find bootstrap cluster API URL from config
|
||||
// For now, we'll discover it from the first bootstrap peer
|
||||
// In a real scenario, you might want to configure this explicitly
|
||||
bootstrapClusterAPI := "http://localhost:9094" // Default bootstrap cluster API
|
||||
if err := cm.UpdateBootstrapPeers(bootstrapClusterAPI); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to update bootstrap peers, will retry later", zap.Error(err))
|
||||
// Don't fail - peers can connect later via mDNS or manual config
|
||||
}
|
||||
}
|
||||
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "IPFS Cluster configuration initialized")
|
||||
return nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user