mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-12-12 22:58:49 +00:00
works
This commit is contained in:
parent
f991d55676
commit
0b24c66d56
15
CHANGELOG.md
15
CHANGELOG.md
@ -13,6 +13,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
|
|||||||
### Deprecated
|
### Deprecated
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
## [0.60.1] - 2025-11-09
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- Improved IPFS Cluster startup logic in development environment to ensure proper peer discovery and configuration.
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
- Refactored IPFS Cluster initialization in the development environment to use a multi-phase startup (bootstrap first, then followers) and explicitly clean stale cluster state (pebble, peerstore) before initialization.
|
||||||
|
|
||||||
|
### Deprecated
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- Fixed an issue where IPFS Cluster nodes in the development environment might fail to join due to incorrect bootstrap configuration or stale state.
|
||||||
|
|
||||||
## [0.60.0] - 2025-11-09
|
## [0.60.0] - 2025-11-09
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|||||||
2
Makefile
2
Makefile
@ -21,7 +21,7 @@ test-e2e:
|
|||||||
|
|
||||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
||||||
|
|
||||||
VERSION := 0.60.0
|
VERSION := 0.60.1
|
||||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||||
|
|||||||
@ -120,26 +120,26 @@ func (ce *ConfigEnsurer) ensureBootstrap() error {
|
|||||||
|
|
||||||
// Ensure bootstrap config - always regenerate to ensure template fixes are applied
|
// Ensure bootstrap config - always regenerate to ensure template fixes are applied
|
||||||
bootstrapConfigPath := filepath.Join(ce.debrosDir, "bootstrap.yaml")
|
bootstrapConfigPath := filepath.Join(ce.debrosDir, "bootstrap.yaml")
|
||||||
data := templates.BootstrapConfigData{
|
data := templates.BootstrapConfigData{
|
||||||
NodeID: "bootstrap",
|
NodeID: "bootstrap",
|
||||||
P2PPort: 4001,
|
P2PPort: 4001,
|
||||||
DataDir: bootstrapDir,
|
DataDir: bootstrapDir,
|
||||||
RQLiteHTTPPort: 5001,
|
RQLiteHTTPPort: 5001,
|
||||||
RQLiteRaftPort: 7001,
|
RQLiteRaftPort: 7001,
|
||||||
ClusterAPIPort: 9094,
|
ClusterAPIPort: 9094,
|
||||||
IPFSAPIPort: 4501,
|
IPFSAPIPort: 4501,
|
||||||
}
|
}
|
||||||
|
|
||||||
config, err := templates.RenderBootstrapConfig(data)
|
config, err := templates.RenderBootstrapConfig(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to render bootstrap config: %w", err)
|
return fmt.Errorf("failed to render bootstrap config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.WriteFile(bootstrapConfigPath, []byte(config), 0644); err != nil {
|
if err := os.WriteFile(bootstrapConfigPath, []byte(config), 0644); err != nil {
|
||||||
return fmt.Errorf("failed to write bootstrap config: %w", err)
|
return fmt.Errorf("failed to write bootstrap config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("✓ Generated bootstrap.yaml\n")
|
fmt.Printf("✓ Generated bootstrap.yaml\n")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -171,32 +171,32 @@ func (ce *ConfigEnsurer) ensureNode2And3() error {
|
|||||||
configPath := filepath.Join(ce.debrosDir, fmt.Sprintf("%s.yaml", node.name))
|
configPath := filepath.Join(ce.debrosDir, fmt.Sprintf("%s.yaml", node.name))
|
||||||
|
|
||||||
// Always regenerate to ensure template fixes are applied
|
// Always regenerate to ensure template fixes are applied
|
||||||
if err := os.MkdirAll(nodeDir, 0755); err != nil {
|
if err := os.MkdirAll(nodeDir, 0755); err != nil {
|
||||||
return fmt.Errorf("failed to create %s directory: %w", node.name, err)
|
return fmt.Errorf("failed to create %s directory: %w", node.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data := templates.NodeConfigData{
|
data := templates.NodeConfigData{
|
||||||
NodeID: node.name,
|
NodeID: node.name,
|
||||||
P2PPort: node.p2pPort,
|
P2PPort: node.p2pPort,
|
||||||
DataDir: nodeDir,
|
DataDir: nodeDir,
|
||||||
RQLiteHTTPPort: node.rqliteHTTPPort,
|
RQLiteHTTPPort: node.rqliteHTTPPort,
|
||||||
RQLiteRaftPort: node.rqliteRaftPort,
|
RQLiteRaftPort: node.rqliteRaftPort,
|
||||||
RQLiteJoinAddress: "localhost:7001",
|
RQLiteJoinAddress: "localhost:7001",
|
||||||
BootstrapPeers: []string{bootstrapMultiaddr},
|
BootstrapPeers: []string{bootstrapMultiaddr},
|
||||||
ClusterAPIPort: node.clusterAPIPort,
|
ClusterAPIPort: node.clusterAPIPort,
|
||||||
IPFSAPIPort: node.ipfsAPIPort,
|
IPFSAPIPort: node.ipfsAPIPort,
|
||||||
}
|
}
|
||||||
|
|
||||||
config, err := templates.RenderNodeConfig(data)
|
config, err := templates.RenderNodeConfig(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to render %s config: %w", node.name, err)
|
return fmt.Errorf("failed to render %s config: %w", node.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.WriteFile(configPath, []byte(config), 0644); err != nil {
|
if err := os.WriteFile(configPath, []byte(config), 0644); err != nil {
|
||||||
return fmt.Errorf("failed to write %s config: %w", node.name, err)
|
return fmt.Errorf("failed to write %s config: %w", node.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("✓ Generated %s.yaml\n", node.name)
|
fmt.Printf("✓ Generated %s.yaml\n", node.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -207,32 +207,32 @@ func (ce *ConfigEnsurer) ensureGateway() error {
|
|||||||
configPath := filepath.Join(ce.debrosDir, "gateway.yaml")
|
configPath := filepath.Join(ce.debrosDir, "gateway.yaml")
|
||||||
|
|
||||||
// Always regenerate to ensure template fixes are applied
|
// Always regenerate to ensure template fixes are applied
|
||||||
// Get bootstrap multiaddr
|
// Get bootstrap multiaddr
|
||||||
bootstrapInfo, err := encryption.LoadIdentity(filepath.Join(ce.debrosDir, "bootstrap", "identity.key"))
|
bootstrapInfo, err := encryption.LoadIdentity(filepath.Join(ce.debrosDir, "bootstrap", "identity.key"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to load bootstrap identity: %w", err)
|
return fmt.Errorf("failed to load bootstrap identity: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bootstrapMultiaddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/4001/p2p/%s", bootstrapInfo.PeerID.String())
|
bootstrapMultiaddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/4001/p2p/%s", bootstrapInfo.PeerID.String())
|
||||||
|
|
||||||
data := templates.GatewayConfigData{
|
data := templates.GatewayConfigData{
|
||||||
ListenPort: 6001,
|
ListenPort: 6001,
|
||||||
BootstrapPeers: []string{bootstrapMultiaddr},
|
BootstrapPeers: []string{bootstrapMultiaddr},
|
||||||
OlricServers: []string{"127.0.0.1:3320"},
|
OlricServers: []string{"127.0.0.1:3320"},
|
||||||
ClusterAPIPort: 9094,
|
ClusterAPIPort: 9094,
|
||||||
IPFSAPIPort: 4501,
|
IPFSAPIPort: 4501,
|
||||||
}
|
}
|
||||||
|
|
||||||
config, err := templates.RenderGatewayConfig(data)
|
config, err := templates.RenderGatewayConfig(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to render gateway config: %w", err)
|
return fmt.Errorf("failed to render gateway config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.WriteFile(configPath, []byte(config), 0644); err != nil {
|
if err := os.WriteFile(configPath, []byte(config), 0644); err != nil {
|
||||||
return fmt.Errorf("failed to write gateway config: %w", err)
|
return fmt.Errorf("failed to write gateway config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("✓ Generated gateway.yaml\n")
|
fmt.Printf("✓ Generated gateway.yaml\n")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,22 +241,22 @@ func (ce *ConfigEnsurer) ensureOlric() error {
|
|||||||
configPath := filepath.Join(ce.debrosDir, "olric-config.yaml")
|
configPath := filepath.Join(ce.debrosDir, "olric-config.yaml")
|
||||||
|
|
||||||
// Always regenerate to ensure template fixes are applied
|
// Always regenerate to ensure template fixes are applied
|
||||||
data := templates.OlricConfigData{
|
data := templates.OlricConfigData{
|
||||||
BindAddr: "127.0.0.1",
|
BindAddr: "127.0.0.1",
|
||||||
HTTPPort: 3320,
|
HTTPPort: 3320,
|
||||||
MemberlistPort: 3322,
|
MemberlistPort: 3322,
|
||||||
}
|
}
|
||||||
|
|
||||||
config, err := templates.RenderOlricConfig(data)
|
config, err := templates.RenderOlricConfig(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to render olric config: %w", err)
|
return fmt.Errorf("failed to render olric config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.WriteFile(configPath, []byte(config), 0644); err != nil {
|
if err := os.WriteFile(configPath, []byte(config), 0644); err != nil {
|
||||||
return fmt.Errorf("failed to write olric config: %w", err)
|
return fmt.Errorf("failed to write olric config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("✓ Generated olric-config.yaml\n")
|
fmt.Printf("✓ Generated olric-config.yaml\n")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -37,8 +37,8 @@ func (pm *ProcessManager) IPFSHealthCheck(ctx context.Context, nodes []ipfsNodeI
|
|||||||
for _, line := range peerLines {
|
for _, line := range peerLines {
|
||||||
if strings.TrimSpace(line) != "" {
|
if strings.TrimSpace(line) != "" {
|
||||||
peerCount++
|
peerCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if peerCount < 2 {
|
if peerCount < 2 {
|
||||||
result.Details += fmt.Sprintf("%s: only %d peers (want 2+); ", node.name, peerCount)
|
result.Details += fmt.Sprintf("%s: only %d peers (want 2+); ", node.name, peerCount)
|
||||||
@ -99,7 +99,7 @@ func (pm *ProcessManager) checkRQLiteNode(ctx context.Context, name string, http
|
|||||||
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
|
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
|
||||||
result.Details = fmt.Sprintf("decode error: %v", err)
|
result.Details = fmt.Sprintf("decode error: %v", err)
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the store.raft structure (RQLite 8 format)
|
// Check the store.raft structure (RQLite 8 format)
|
||||||
store, ok := status["store"].(map[string]interface{})
|
store, ok := status["store"].(map[string]interface{})
|
||||||
|
|||||||
@ -418,14 +418,108 @@ func (pm *ProcessManager) startIPFSCluster(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, node := range nodes {
|
// Read cluster secret to ensure all nodes use the same PSK
|
||||||
serviceJSON := filepath.Join(node.clusterPath, "service.json")
|
secretPath := filepath.Join(pm.debrosDir, "cluster-secret")
|
||||||
if _, err := os.Stat(serviceJSON); os.IsNotExist(err) {
|
clusterSecret, err := os.ReadFile(secretPath)
|
||||||
os.MkdirAll(node.clusterPath, 0755)
|
if err != nil {
|
||||||
fmt.Fprintf(pm.logWriter, " Initializing IPFS Cluster (%s)...\n", node.name)
|
return fmt.Errorf("failed to read cluster secret: %w", err)
|
||||||
cmd := exec.CommandContext(ctx, "ipfs-cluster-service", "init", "--force")
|
}
|
||||||
cmd.Env = append(os.Environ(), fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath))
|
clusterSecretHex := strings.TrimSpace(string(clusterSecret))
|
||||||
cmd.Run()
|
|
||||||
|
// Phase 1: Initialize and start bootstrap IPFS Cluster, then read its identity
|
||||||
|
bootstrapMultiaddr := ""
|
||||||
|
{
|
||||||
|
node := nodes[0] // bootstrap
|
||||||
|
|
||||||
|
// Always clean stale cluster state to ensure fresh initialization with correct secret
|
||||||
|
if err := pm.cleanClusterState(node.clusterPath); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: failed to clean cluster state for %s: %v\n", node.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
os.MkdirAll(node.clusterPath, 0755)
|
||||||
|
fmt.Fprintf(pm.logWriter, " Initializing IPFS Cluster (%s)...\n", node.name)
|
||||||
|
cmd := exec.CommandContext(ctx, "ipfs-cluster-service", "init", "--force")
|
||||||
|
cmd.Env = append(os.Environ(),
|
||||||
|
fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath),
|
||||||
|
fmt.Sprintf("CLUSTER_SECRET=%s", clusterSecretHex),
|
||||||
|
)
|
||||||
|
if output, err := cmd.CombinedOutput(); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: ipfs-cluster-service init failed: %v (output: %s)\n", err, string(output))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure correct ports in service.json BEFORE starting daemon
|
||||||
|
// This is critical: it sets the cluster listen port to clusterPort, not the default
|
||||||
|
if err := pm.ensureIPFSClusterPorts(node.clusterPath, node.restAPIPort, node.clusterPort); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: failed to update IPFS Cluster config for %s: %v\n", node.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the config was written correctly (debug: read it back)
|
||||||
|
serviceJSONPath := filepath.Join(node.clusterPath, "service.json")
|
||||||
|
if data, err := os.ReadFile(serviceJSONPath); err == nil {
|
||||||
|
var verifyConfig map[string]interface{}
|
||||||
|
if err := json.Unmarshal(data, &verifyConfig); err == nil {
|
||||||
|
if cluster, ok := verifyConfig["cluster"].(map[string]interface{}); ok {
|
||||||
|
if listenAddrs, ok := cluster["listen_multiaddress"].([]interface{}); ok {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Config verified: %s cluster listening on %v\n", node.name, listenAddrs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start bootstrap cluster service
|
||||||
|
pidPath := filepath.Join(pm.pidsDir, fmt.Sprintf("ipfs-cluster-%s.pid", node.name))
|
||||||
|
logPath := filepath.Join(pm.debrosDir, "logs", fmt.Sprintf("ipfs-cluster-%s.log", node.name))
|
||||||
|
|
||||||
|
cmd = exec.CommandContext(ctx, "ipfs-cluster-service", "daemon")
|
||||||
|
cmd.Env = append(os.Environ(), fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath))
|
||||||
|
logFile, _ := os.Create(logPath)
|
||||||
|
cmd.Stdout = logFile
|
||||||
|
cmd.Stderr = logFile
|
||||||
|
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " ⚠️ Failed to start ipfs-cluster-%s: %v\n", node.name, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0644)
|
||||||
|
fmt.Fprintf(pm.logWriter, "✓ IPFS Cluster (%s) started (PID: %d, API: %d)\n", node.name, cmd.Process.Pid, node.restAPIPort)
|
||||||
|
|
||||||
|
// Wait for bootstrap to be ready and read its identity
|
||||||
|
if err := pm.waitClusterReady(ctx, node.name, node.restAPIPort); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: IPFS Cluster %s did not become ready: %v\n", node.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a brief delay to allow identity.json to be written
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
|
// Read bootstrap peer ID for follower nodes to join
|
||||||
|
peerID, err := pm.waitForClusterPeerID(ctx, filepath.Join(node.clusterPath, "identity.json"))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: failed to read bootstrap peer ID: %v\n", err)
|
||||||
|
} else {
|
||||||
|
bootstrapMultiaddr = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/p2p/%s", node.clusterPort, peerID)
|
||||||
|
fmt.Fprintf(pm.logWriter, " Bootstrap multiaddress: %s\n", bootstrapMultiaddr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 2: Initialize and start follower IPFS Cluster nodes with bootstrap flag
|
||||||
|
for i := 1; i < len(nodes); i++ {
|
||||||
|
node := nodes[i]
|
||||||
|
|
||||||
|
// Always clean stale cluster state to ensure fresh initialization with correct secret
|
||||||
|
if err := pm.cleanClusterState(node.clusterPath); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: failed to clean cluster state for %s: %v\n", node.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
os.MkdirAll(node.clusterPath, 0755)
|
||||||
|
fmt.Fprintf(pm.logWriter, " Initializing IPFS Cluster (%s)...\n", node.name)
|
||||||
|
cmd := exec.CommandContext(ctx, "ipfs-cluster-service", "init", "--force")
|
||||||
|
cmd.Env = append(os.Environ(),
|
||||||
|
fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath),
|
||||||
|
fmt.Sprintf("CLUSTER_SECRET=%s", clusterSecretHex),
|
||||||
|
)
|
||||||
|
if output, err := cmd.CombinedOutput(); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: ipfs-cluster-service init failed for %s: %v (output: %s)\n", node.name, err, string(output))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure correct ports in service.json BEFORE starting daemon
|
// Ensure correct ports in service.json BEFORE starting daemon
|
||||||
@ -433,11 +527,29 @@ func (pm *ProcessManager) startIPFSCluster(ctx context.Context) error {
|
|||||||
fmt.Fprintf(pm.logWriter, " Warning: failed to update IPFS Cluster config for %s: %v\n", node.name, err)
|
fmt.Fprintf(pm.logWriter, " Warning: failed to update IPFS Cluster config for %s: %v\n", node.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start cluster service
|
// Verify the config was written correctly (debug: read it back)
|
||||||
|
serviceJSONPath := filepath.Join(node.clusterPath, "service.json")
|
||||||
|
if data, err := os.ReadFile(serviceJSONPath); err == nil {
|
||||||
|
var verifyConfig map[string]interface{}
|
||||||
|
if err := json.Unmarshal(data, &verifyConfig); err == nil {
|
||||||
|
if cluster, ok := verifyConfig["cluster"].(map[string]interface{}); ok {
|
||||||
|
if listenAddrs, ok := cluster["listen_multiaddress"].([]interface{}); ok {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Config verified: %s cluster listening on %v\n", node.name, listenAddrs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start follower cluster service with bootstrap flag
|
||||||
pidPath := filepath.Join(pm.pidsDir, fmt.Sprintf("ipfs-cluster-%s.pid", node.name))
|
pidPath := filepath.Join(pm.pidsDir, fmt.Sprintf("ipfs-cluster-%s.pid", node.name))
|
||||||
logPath := filepath.Join(pm.debrosDir, "logs", fmt.Sprintf("ipfs-cluster-%s.log", node.name))
|
logPath := filepath.Join(pm.debrosDir, "logs", fmt.Sprintf("ipfs-cluster-%s.log", node.name))
|
||||||
|
|
||||||
cmd := exec.CommandContext(ctx, "ipfs-cluster-service", "daemon")
|
args := []string{"daemon"}
|
||||||
|
if bootstrapMultiaddr != "" {
|
||||||
|
args = append(args, "--bootstrap", bootstrapMultiaddr)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd = exec.CommandContext(ctx, "ipfs-cluster-service", args...)
|
||||||
cmd.Env = append(os.Environ(), fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath))
|
cmd.Env = append(os.Environ(), fmt.Sprintf("IPFS_CLUSTER_PATH=%s", node.clusterPath))
|
||||||
logFile, _ := os.Create(logPath)
|
logFile, _ := os.Create(logPath)
|
||||||
cmd.Stdout = logFile
|
cmd.Stdout = logFile
|
||||||
@ -450,12 +562,145 @@ func (pm *ProcessManager) startIPFSCluster(ctx context.Context) error {
|
|||||||
|
|
||||||
os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0644)
|
os.WriteFile(pidPath, []byte(fmt.Sprintf("%d", cmd.Process.Pid)), 0644)
|
||||||
fmt.Fprintf(pm.logWriter, "✓ IPFS Cluster (%s) started (PID: %d, API: %d)\n", node.name, cmd.Process.Pid, node.restAPIPort)
|
fmt.Fprintf(pm.logWriter, "✓ IPFS Cluster (%s) started (PID: %d, API: %d)\n", node.name, cmd.Process.Pid, node.restAPIPort)
|
||||||
|
|
||||||
|
// Wait for follower node to connect to the bootstrap peer
|
||||||
|
if err := pm.waitClusterReady(ctx, node.name, node.restAPIPort); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: IPFS Cluster %s did not become ready: %v\n", node.name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 3: Wait for all cluster peers to discover each other
|
||||||
|
fmt.Fprintf(pm.logWriter, " Waiting for IPFS Cluster peers to form...\n")
|
||||||
|
if err := pm.waitClusterFormed(ctx, nodes[0].restAPIPort); err != nil {
|
||||||
|
fmt.Fprintf(pm.logWriter, " Warning: IPFS Cluster did not form fully: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitForClusterPeerID polls the identity.json file until it appears and extracts the peer ID
|
||||||
|
func (pm *ProcessManager) waitForClusterPeerID(ctx context.Context, identityPath string) (string, error) {
|
||||||
|
maxRetries := 30
|
||||||
|
retryInterval := 500 * time.Millisecond
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
data, err := os.ReadFile(identityPath)
|
||||||
|
if err == nil {
|
||||||
|
var identity map[string]interface{}
|
||||||
|
if err := json.Unmarshal(data, &identity); err == nil {
|
||||||
|
if id, ok := identity["id"].(string); ok {
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(retryInterval):
|
||||||
|
continue
|
||||||
|
case <-ctx.Done():
|
||||||
|
return "", ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return "", fmt.Errorf("could not read cluster peer ID after %d seconds", (maxRetries * int(retryInterval.Milliseconds()) / 1000))
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitClusterReady polls the cluster REST API until it's ready
|
||||||
|
func (pm *ProcessManager) waitClusterReady(ctx context.Context, name string, restAPIPort int) error {
|
||||||
|
maxRetries := 30
|
||||||
|
retryInterval := 500 * time.Millisecond
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
httpURL := fmt.Sprintf("http://127.0.0.1:%d/peers", restAPIPort)
|
||||||
|
resp, err := http.Get(httpURL)
|
||||||
|
if err == nil && resp.StatusCode == 200 {
|
||||||
|
resp.Body.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if resp != nil {
|
||||||
|
resp.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(retryInterval):
|
||||||
|
continue
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("IPFS Cluster %s did not become ready after %d seconds", name, (maxRetries * int(retryInterval.Seconds())))
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitClusterFormed waits for all cluster peers to be visible from the bootstrap node
|
||||||
|
func (pm *ProcessManager) waitClusterFormed(ctx context.Context, bootstrapRestAPIPort int) error {
|
||||||
|
maxRetries := 30
|
||||||
|
retryInterval := 1 * time.Second
|
||||||
|
requiredPeers := 3 // bootstrap, node2, node3
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
httpURL := fmt.Sprintf("http://127.0.0.1:%d/peers", bootstrapRestAPIPort)
|
||||||
|
resp, err := http.Get(httpURL)
|
||||||
|
if err == nil && resp.StatusCode == 200 {
|
||||||
|
var peers []interface{}
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&peers); err == nil {
|
||||||
|
resp.Body.Close()
|
||||||
|
if len(peers) >= requiredPeers {
|
||||||
|
return nil // All peers have formed
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
resp.Body.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if resp != nil {
|
||||||
|
resp.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(retryInterval):
|
||||||
|
continue
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("IPFS Cluster did not form fully after %d seconds", (maxRetries * int(retryInterval.Seconds())))
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanClusterState removes stale cluster state files to ensure fresh initialization
|
||||||
|
// This prevents PSK (private network key) mismatches when cluster secret changes
|
||||||
|
func (pm *ProcessManager) cleanClusterState(clusterPath string) error {
|
||||||
|
// Remove pebble datastore (contains persisted PSK state)
|
||||||
|
pebblePath := filepath.Join(clusterPath, "pebble")
|
||||||
|
if err := os.RemoveAll(pebblePath); err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to remove pebble directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove peerstore (contains peer addresses and metadata)
|
||||||
|
peerstorePath := filepath.Join(clusterPath, "peerstore")
|
||||||
|
if err := os.Remove(peerstorePath); err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to remove peerstore: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove service.json (will be regenerated with correct ports and secret)
|
||||||
|
serviceJSONPath := filepath.Join(clusterPath, "service.json")
|
||||||
|
if err := os.Remove(serviceJSONPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to remove service.json: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove cluster.lock if it exists (from previous run)
|
||||||
|
lockPath := filepath.Join(clusterPath, "cluster.lock")
|
||||||
|
if err := os.Remove(lockPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to remove cluster.lock: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: We keep identity.json as it's tied to the node's peer ID
|
||||||
|
// The secret will be updated via CLUSTER_SECRET env var during init
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ensureIPFSClusterPorts updates service.json with correct per-node ports and IPFS connector settings
|
// ensureIPFSClusterPorts updates service.json with correct per-node ports and IPFS connector settings
|
||||||
func (pm *ProcessManager) ensureIPFSClusterPorts(clusterPath string, restAPIPort int, clusterPort int) error {
|
func (pm *ProcessManager) ensureIPFSClusterPorts(clusterPath string, restAPIPort int, clusterPort int) error {
|
||||||
serviceJSONPath := filepath.Join(clusterPath, "service.json")
|
serviceJSONPath := filepath.Join(clusterPath, "service.json")
|
||||||
@ -502,10 +747,13 @@ func (pm *ProcessManager) ensureIPFSClusterPorts(clusterPath string, restAPIPort
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update cluster listen multiaddress
|
// Update cluster listen multiaddress to match the correct port
|
||||||
|
// Replace all old listen addresses with new ones for the correct port
|
||||||
if cluster, ok := config["cluster"].(map[string]interface{}); ok {
|
if cluster, ok := config["cluster"].(map[string]interface{}); ok {
|
||||||
listenAddrs := make([]string, 0)
|
listenAddrs := []string{
|
||||||
listenAddrs = append(listenAddrs, fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", clusterPort))
|
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", clusterPort),
|
||||||
|
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort),
|
||||||
|
}
|
||||||
cluster["listen_multiaddress"] = listenAddrs
|
cluster["listen_multiaddress"] = listenAddrs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user