mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-12-11 07:38:49 +00:00
feat: enhance production service initialization and logging
- Updated the `Phase2cInitializeServices` function to accept bootstrap peers and VPS IP, improving service configuration for non-bootstrap nodes. - Refactored the `handleProdInstall` and `handleProdUpgrade` functions to ensure proper initialization of services with the new parameters. - Improved logging to provide clearer feedback during service initialization and configuration, enhancing user experience and troubleshooting capabilities.
This commit is contained in:
parent
47ffe817b4
commit
358de8a8ad
18
CHANGELOG.md
18
CHANGELOG.md
@ -13,6 +13,24 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
|
||||
### Deprecated
|
||||
|
||||
### Fixed
|
||||
## [0.69.11] - 2025-11-13
|
||||
|
||||
### Added
|
||||
- Added a new comprehensive shell script (`scripts/test-cluster-health.sh`) for checking the health and replication status of RQLite, IPFS, and IPFS Cluster across production environments.
|
||||
|
||||
### Changed
|
||||
- Improved RQLite cluster discovery logic to ensure `peers.json` is correctly generated and includes the local node, which is crucial for reliable cluster recovery.
|
||||
- Refactored logging across discovery and RQLite components for cleaner, more concise output, especially for routine operations.
|
||||
- Updated the installation and upgrade process to correctly configure IPFS Cluster bootstrap peers using the node's public IP, improving cluster formation reliability.
|
||||
|
||||
### Deprecated
|
||||
|
||||
### Removed
|
||||
|
||||
### Fixed
|
||||
- Fixed an issue where RQLite recovery operations (like clearing Raft state) did not correctly force the regeneration of `peers.json`, preventing successful cluster rejoin.
|
||||
- Corrected the port calculation logic for IPFS Cluster to ensure the correct LibP2P listen port (9098) is used for bootstrap peer addressing.
|
||||
|
||||
## [0.69.10] - 2025-11-13
|
||||
|
||||
### Added
|
||||
|
||||
2
Makefile
2
Makefile
@ -19,7 +19,7 @@ test-e2e:
|
||||
|
||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
||||
|
||||
VERSION := 0.69.10
|
||||
VERSION := 0.69.11
|
||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||
|
||||
@ -264,7 +264,7 @@ func handleProdInstall(args []string) {
|
||||
|
||||
// Phase 2c: Initialize services (after secrets are in place)
|
||||
fmt.Printf("\nPhase 2c: Initializing services...\n")
|
||||
if err := setup.Phase2cInitializeServices(nodeType); err != nil {
|
||||
if err := setup.Phase2cInitializeServices(nodeType, bootstrapPeers, *vpsIP); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "❌ Service initialization failed: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
@ -447,13 +447,6 @@ func handleProdUpgrade(args []string) {
|
||||
nodeType = "bootstrap" // Default for upgrade if nothing exists
|
||||
}
|
||||
|
||||
// Phase 2c: Ensure services are properly initialized (fixes existing repos)
|
||||
fmt.Printf("\nPhase 2c: Ensuring services are properly initialized...\n")
|
||||
if err := setup.Phase2cInitializeServices(nodeType); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "❌ Service initialization failed: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Phase 3: Ensure secrets exist (preserves existing secrets)
|
||||
fmt.Printf("\n🔐 Phase 3: Ensuring secrets...\n")
|
||||
if err := setup.Phase3GenerateSecrets(nodeType == "bootstrap"); err != nil {
|
||||
@ -584,6 +577,14 @@ func handleProdUpgrade(args []string) {
|
||||
fmt.Printf(" - Bootstrap join address: %s\n", bootstrapJoin)
|
||||
}
|
||||
|
||||
// Phase 2c: Ensure services are properly initialized (fixes existing repos)
|
||||
// Now that we have bootstrap peers and VPS IP, we can properly configure IPFS Cluster
|
||||
fmt.Printf("\nPhase 2c: Ensuring services are properly initialized...\n")
|
||||
if err := setup.Phase2cInitializeServices(nodeType, bootstrapPeers, vpsIP); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "❌ Service initialization failed: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err := setup.Phase4GenerateConfigs(nodeType == "bootstrap", bootstrapPeers, vpsIP, enableHTTPS, domain, bootstrapJoin); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "⚠️ Config generation warning: %v\n", err)
|
||||
fmt.Fprintf(os.Stderr, " Existing configs preserved\n")
|
||||
|
||||
@ -140,19 +140,9 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) {
|
||||
}
|
||||
}
|
||||
|
||||
// Log if addresses were filtered out
|
||||
if filteredCount > 0 {
|
||||
d.logger.Debug("Filtered out non-libp2p addresses",
|
||||
zap.String("peer_id", pid.String()[:8]+"..."),
|
||||
zap.Int("filtered_count", filteredCount),
|
||||
zap.Int("valid_count", len(filteredAddrs)))
|
||||
}
|
||||
|
||||
// If no addresses remain after filtering, skip this peer
|
||||
// (Filtering is routine - no need to log every occurrence)
|
||||
if len(filteredAddrs) == 0 {
|
||||
d.logger.Debug("No valid addresses after filtering",
|
||||
zap.String("peer_id", pid.String()[:8]+"..."),
|
||||
zap.Int("original_count", len(addrs)))
|
||||
continue
|
||||
}
|
||||
|
||||
@ -186,9 +176,7 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) {
|
||||
return
|
||||
}
|
||||
|
||||
d.logger.Debug("Sent peer exchange response",
|
||||
zap.Int("peer_count", len(resp.Peers)),
|
||||
zap.Bool("has_rqlite_metadata", resp.RQLiteMetadata != nil))
|
||||
// Response sent - routine operation, no need to log
|
||||
}
|
||||
|
||||
// Start begins periodic peer discovery
|
||||
@ -231,9 +219,6 @@ func (d *Manager) discoverPeers(ctx context.Context, config Config) {
|
||||
connectedPeers := d.host.Network().Peers()
|
||||
initialCount := len(connectedPeers)
|
||||
|
||||
d.logger.Debug("Starting peer discovery",
|
||||
zap.Int("current_peers", initialCount))
|
||||
|
||||
newConnections := 0
|
||||
|
||||
// Strategy 1: Try to connect to peers learned from the host's peerstore
|
||||
@ -246,11 +231,12 @@ func (d *Manager) discoverPeers(ctx context.Context, config Config) {
|
||||
|
||||
finalPeerCount := len(d.host.Network().Peers())
|
||||
|
||||
// Summary log: only log if there were changes or new connections
|
||||
if newConnections > 0 || finalPeerCount != initialCount {
|
||||
d.logger.Debug("Peer discovery completed",
|
||||
zap.Int("new_connections", newConnections),
|
||||
zap.Int("initial_peers", initialCount),
|
||||
zap.Int("final_peers", finalPeerCount))
|
||||
d.logger.Debug("Discovery summary",
|
||||
zap.Int("connected", finalPeerCount),
|
||||
zap.Int("new", newConnections),
|
||||
zap.Int("was", initialCount))
|
||||
}
|
||||
}
|
||||
|
||||
@ -265,7 +251,10 @@ func (d *Manager) discoverViaPeerstore(ctx context.Context, maxConnections int)
|
||||
|
||||
// Iterate over peerstore known peers
|
||||
peers := d.host.Peerstore().Peers()
|
||||
d.logger.Debug("Peerstore contains peers", zap.Int("count", len(peers)))
|
||||
|
||||
// Only connect to peers on our standard LibP2P port to avoid cross-connecting
|
||||
// with IPFS/IPFS Cluster instances that use different ports
|
||||
const libp2pPort = 4001
|
||||
|
||||
for _, pid := range peers {
|
||||
if connected >= maxConnections {
|
||||
@ -280,6 +269,24 @@ func (d *Manager) discoverViaPeerstore(ctx context.Context, maxConnections int)
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter peers to only include those with addresses on our port (4001)
|
||||
// This prevents attempting to connect to IPFS (port 4101) or IPFS Cluster (port 9096)
|
||||
peerInfo := d.host.Peerstore().PeerInfo(pid)
|
||||
hasValidPort := false
|
||||
for _, addr := range peerInfo.Addrs {
|
||||
if port, err := addr.ValueForProtocol(multiaddr.P_TCP); err == nil {
|
||||
if portNum, err := strconv.Atoi(port); err == nil && portNum == libp2pPort {
|
||||
hasValidPort = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Skip peers without valid port 4001 addresses
|
||||
if !hasValidPort {
|
||||
continue
|
||||
}
|
||||
|
||||
// Try to connect
|
||||
if err := d.connectToPeer(ctx, pid); err == nil {
|
||||
connected++
|
||||
@ -302,8 +309,8 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in
|
||||
return 0
|
||||
}
|
||||
|
||||
d.logger.Debug("Starting peer exchange with connected peers",
|
||||
zap.Int("num_peers", len(connectedPeers)))
|
||||
exchangedPeers := 0
|
||||
metadataCollected := 0
|
||||
|
||||
for _, peerID := range connectedPeers {
|
||||
if connected >= maxConnections {
|
||||
@ -316,9 +323,13 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in
|
||||
continue
|
||||
}
|
||||
|
||||
d.logger.Debug("Received peer list from peer",
|
||||
zap.String("from_peer", peerID.String()[:8]+"..."),
|
||||
zap.Int("peer_count", len(peers)))
|
||||
exchangedPeers++
|
||||
// Check if we got RQLite metadata
|
||||
if val, err := d.host.Peerstore().Get(peerID, "rqlite_metadata"); err == nil {
|
||||
if _, ok := val.([]byte); ok {
|
||||
metadataCollected++
|
||||
}
|
||||
}
|
||||
|
||||
// Try to connect to discovered peers
|
||||
for _, peerInfo := range peers {
|
||||
@ -365,9 +376,7 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in
|
||||
}
|
||||
|
||||
if len(addrs) == 0 {
|
||||
d.logger.Debug("No valid libp2p addresses (port 4001) for peer",
|
||||
zap.String("peer_id", parsedID.String()[:8]+"..."),
|
||||
zap.Int("total_addresses", len(peerInfo.Addrs)))
|
||||
// Skip peers without valid addresses - no need to log every occurrence
|
||||
continue
|
||||
}
|
||||
|
||||
@ -380,20 +389,29 @@ func (d *Manager) discoverViaPeerExchange(ctx context.Context, maxConnections in
|
||||
|
||||
if err := d.host.Connect(connectCtx, peerAddrInfo); err != nil {
|
||||
cancel()
|
||||
d.logger.Debug("Failed to connect to discovered peer",
|
||||
zap.String("peer_id", parsedID.String()[:8]+"..."),
|
||||
// Only log connection failures for debugging - errors are still useful
|
||||
d.logger.Debug("Connect failed",
|
||||
zap.String("peer", parsedID.String()[:8]+"..."),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
|
||||
d.logger.Info("Successfully connected to discovered peer",
|
||||
zap.String("peer_id", parsedID.String()[:8]+"..."),
|
||||
zap.String("discovered_from", peerID.String()[:8]+"..."))
|
||||
d.logger.Info("Connected",
|
||||
zap.String("peer", parsedID.String()[:8]+"..."),
|
||||
zap.String("from", peerID.String()[:8]+"..."))
|
||||
connected++
|
||||
}
|
||||
}
|
||||
|
||||
// Summary log for peer exchange
|
||||
if exchangedPeers > 0 {
|
||||
d.logger.Debug("Exchange summary",
|
||||
zap.Int("exchanged_with", exchangedPeers),
|
||||
zap.Int("metadata_collected", metadataCollected),
|
||||
zap.Int("new_connections", connected))
|
||||
}
|
||||
|
||||
return connected
|
||||
}
|
||||
|
||||
@ -446,9 +464,10 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi
|
||||
metadataJSON, err := json.Marshal(resp.RQLiteMetadata)
|
||||
if err == nil {
|
||||
_ = d.host.Peerstore().Put(peerID, "rqlite_metadata", metadataJSON)
|
||||
d.logger.Debug("Stored RQLite metadata from peer",
|
||||
zap.String("peer_id", peerID.String()[:8]+"..."),
|
||||
zap.String("node_id", resp.RQLiteMetadata.NodeID))
|
||||
// Only log when new metadata is stored (useful for debugging)
|
||||
d.logger.Debug("Metadata stored",
|
||||
zap.String("peer", peerID.String()[:8]+"..."),
|
||||
zap.String("node", resp.RQLiteMetadata.NodeID))
|
||||
}
|
||||
}
|
||||
|
||||
@ -464,9 +483,6 @@ func (d *Manager) TriggerPeerExchange(ctx context.Context) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
d.logger.Info("Manually triggering peer exchange",
|
||||
zap.Int("connected_peers", len(connectedPeers)))
|
||||
|
||||
metadataCollected := 0
|
||||
for _, peerID := range connectedPeers {
|
||||
// Request peer list from this peer (which includes their RQLite metadata)
|
||||
@ -480,9 +496,9 @@ func (d *Manager) TriggerPeerExchange(ctx context.Context) int {
|
||||
}
|
||||
}
|
||||
|
||||
d.logger.Info("Peer exchange completed",
|
||||
zap.Int("peers_with_metadata", metadataCollected),
|
||||
zap.Int("total_peers", len(connectedPeers)))
|
||||
d.logger.Info("Exchange completed",
|
||||
zap.Int("peers", len(connectedPeers)),
|
||||
zap.Int("with_metadata", metadataCollected))
|
||||
|
||||
return metadataCollected
|
||||
}
|
||||
@ -502,8 +518,7 @@ func (d *Manager) connectToPeer(ctx context.Context, peerID peer.ID) error {
|
||||
return err
|
||||
}
|
||||
|
||||
d.logger.Debug("Successfully connected to peer",
|
||||
zap.String("peer_id", peerID.String()[:8]+"..."))
|
||||
// Connection success logged at higher level - no need for duplicate DEBUG log
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -532,7 +532,8 @@ func (bi *BinaryInstaller) configureIPFSAddresses(ipfsRepoPath string, apiPort,
|
||||
// InitializeIPFSClusterConfig initializes IPFS Cluster configuration
|
||||
// This runs `ipfs-cluster-service init` to create the service.json configuration file.
|
||||
// For existing installations, it ensures the cluster secret is up to date.
|
||||
func (bi *BinaryInstaller) InitializeIPFSClusterConfig(nodeType, clusterPath, clusterSecret string, ipfsAPIPort int) error {
|
||||
// bootstrapClusterPeers should be in format: ["/ip4/<ip>/tcp/9098/p2p/<cluster-peer-id>"]
|
||||
func (bi *BinaryInstaller) InitializeIPFSClusterConfig(nodeType, clusterPath, clusterSecret string, ipfsAPIPort int, bootstrapClusterPeers []string) error {
|
||||
serviceJSONPath := filepath.Join(clusterPath, "service.json")
|
||||
configExists := false
|
||||
if _, err := os.Stat(serviceJSONPath); err == nil {
|
||||
@ -567,11 +568,11 @@ func (bi *BinaryInstaller) InitializeIPFSClusterConfig(nodeType, clusterPath, cl
|
||||
}
|
||||
}
|
||||
|
||||
// Always update the cluster secret and IPFS port (for both new and existing configs)
|
||||
// Always update the cluster secret, IPFS port, and peer addresses (for both new and existing configs)
|
||||
// This ensures existing installations get the secret and port synchronized
|
||||
if clusterSecret != "" {
|
||||
fmt.Fprintf(bi.logWriter.(interface{ Write([]byte) (int, error) }), " Updating cluster secret and IPFS port...\n")
|
||||
if err := bi.updateClusterConfig(clusterPath, clusterSecret, ipfsAPIPort); err != nil {
|
||||
fmt.Fprintf(bi.logWriter.(interface{ Write([]byte) (int, error) }), " Updating cluster secret, IPFS port, and peer addresses...\n")
|
||||
if err := bi.updateClusterConfig(clusterPath, clusterSecret, ipfsAPIPort, bootstrapClusterPeers); err != nil {
|
||||
return fmt.Errorf("failed to update cluster config: %w", err)
|
||||
}
|
||||
}
|
||||
@ -582,8 +583,8 @@ func (bi *BinaryInstaller) InitializeIPFSClusterConfig(nodeType, clusterPath, cl
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateClusterConfig updates the secret and IPFS port in IPFS Cluster service.json
|
||||
func (bi *BinaryInstaller) updateClusterConfig(clusterPath, secret string, ipfsAPIPort int) error {
|
||||
// updateClusterConfig updates the secret, IPFS port, and peer addresses in IPFS Cluster service.json
|
||||
func (bi *BinaryInstaller) updateClusterConfig(clusterPath, secret string, ipfsAPIPort int, bootstrapClusterPeers []string) error {
|
||||
serviceJSONPath := filepath.Join(clusterPath, "service.json")
|
||||
|
||||
// Read existing config
|
||||
@ -598,13 +599,22 @@ func (bi *BinaryInstaller) updateClusterConfig(clusterPath, secret string, ipfsA
|
||||
return fmt.Errorf("failed to parse service.json: %w", err)
|
||||
}
|
||||
|
||||
// Update cluster secret
|
||||
// Update cluster secret and peer addresses
|
||||
if cluster, ok := config["cluster"].(map[string]interface{}); ok {
|
||||
cluster["secret"] = secret
|
||||
// Configure peer addresses for cluster discovery
|
||||
// This allows nodes to find and connect to each other
|
||||
if len(bootstrapClusterPeers) > 0 {
|
||||
cluster["peer_addresses"] = bootstrapClusterPeers
|
||||
}
|
||||
} else {
|
||||
config["cluster"] = map[string]interface{}{
|
||||
clusterConfig := map[string]interface{}{
|
||||
"secret": secret,
|
||||
}
|
||||
if len(bootstrapClusterPeers) > 0 {
|
||||
clusterConfig["peer_addresses"] = bootstrapClusterPeers
|
||||
}
|
||||
config["cluster"] = clusterConfig
|
||||
}
|
||||
|
||||
// Update IPFS port in IPFS Proxy configuration
|
||||
@ -635,6 +645,35 @@ func (bi *BinaryInstaller) updateClusterConfig(clusterPath, secret string, ipfsA
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClusterPeerMultiaddr reads the IPFS Cluster peer ID and returns its multiaddress
|
||||
// Returns format: /ip4/<ip>/tcp/9098/p2p/<cluster-peer-id>
|
||||
func (bi *BinaryInstaller) GetClusterPeerMultiaddr(clusterPath string, nodeIP string) (string, error) {
|
||||
identityPath := filepath.Join(clusterPath, "identity.json")
|
||||
|
||||
// Read identity file
|
||||
data, err := os.ReadFile(identityPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read identity.json: %w", err)
|
||||
}
|
||||
|
||||
// Parse JSON
|
||||
var identity map[string]interface{}
|
||||
if err := json.Unmarshal(data, &identity); err != nil {
|
||||
return "", fmt.Errorf("failed to parse identity.json: %w", err)
|
||||
}
|
||||
|
||||
// Get peer ID
|
||||
peerID, ok := identity["id"].(string)
|
||||
if !ok || peerID == "" {
|
||||
return "", fmt.Errorf("peer ID not found in identity.json")
|
||||
}
|
||||
|
||||
// Construct multiaddress: /ip4/<ip>/tcp/9098/p2p/<peer-id>
|
||||
// Port 9098 is the default cluster listen port
|
||||
multiaddr := fmt.Sprintf("/ip4/%s/tcp/9098/p2p/%s", nodeIP, peerID)
|
||||
return multiaddr, nil
|
||||
}
|
||||
|
||||
// InitializeRQLiteDataDir initializes RQLite data directory
|
||||
func (bi *BinaryInstaller) InitializeRQLiteDataDir(nodeType, dataDir string) error {
|
||||
fmt.Fprintf(bi.logWriter.(interface{ Write([]byte) (int, error) }), " Initializing RQLite data dir for %s...\n", nodeType)
|
||||
|
||||
@ -265,7 +265,7 @@ func (ps *ProductionSetup) Phase2bInstallBinaries() error {
|
||||
}
|
||||
|
||||
// Phase2cInitializeServices initializes service repositories and configurations
|
||||
func (ps *ProductionSetup) Phase2cInitializeServices(nodeType string) error {
|
||||
func (ps *ProductionSetup) Phase2cInitializeServices(nodeType string, bootstrapPeers []string, vpsIP string) error {
|
||||
ps.logf("Phase 2c: Initializing services...")
|
||||
|
||||
// Ensure node-specific directories exist
|
||||
@ -289,7 +289,34 @@ func (ps *ProductionSetup) Phase2cInitializeServices(nodeType string) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get cluster secret: %w", err)
|
||||
}
|
||||
if err := ps.binaryInstaller.InitializeIPFSClusterConfig(nodeType, clusterPath, clusterSecret, 4501); err != nil {
|
||||
|
||||
// Get bootstrap cluster peer addresses for non-bootstrap nodes
|
||||
var bootstrapClusterPeers []string
|
||||
if nodeType != "bootstrap" && len(bootstrapPeers) > 0 {
|
||||
// Try to read bootstrap cluster peer ID and construct multiaddress
|
||||
bootstrapClusterPath := filepath.Join(ps.debrosDir, "data", "bootstrap", "ipfs-cluster")
|
||||
|
||||
// Infer bootstrap IP from bootstrap peers
|
||||
bootstrapIP := inferBootstrapIP(bootstrapPeers, vpsIP)
|
||||
if bootstrapIP != "" {
|
||||
// Check if bootstrap cluster identity exists
|
||||
if _, err := os.Stat(filepath.Join(bootstrapClusterPath, "identity.json")); err == nil {
|
||||
// Bootstrap cluster is initialized, get its multiaddress
|
||||
if clusterMultiaddr, err := ps.binaryInstaller.GetClusterPeerMultiaddr(bootstrapClusterPath, bootstrapIP); err == nil {
|
||||
bootstrapClusterPeers = []string{clusterMultiaddr}
|
||||
ps.logf(" ℹ️ Configured IPFS Cluster to connect to bootstrap: %s", clusterMultiaddr)
|
||||
} else {
|
||||
ps.logf(" ⚠️ Could not read bootstrap cluster peer ID: %v", err)
|
||||
ps.logf(" ⚠️ IPFS Cluster will rely on mDNS discovery (may not work across internet)")
|
||||
}
|
||||
} else {
|
||||
ps.logf(" ℹ️ Bootstrap cluster not yet initialized, peer_addresses will be empty")
|
||||
ps.logf(" ℹ️ IPFS Cluster will rely on mDNS discovery (may not work across internet)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := ps.binaryInstaller.InitializeIPFSClusterConfig(nodeType, clusterPath, clusterSecret, 4501, bootstrapClusterPeers); err != nil {
|
||||
return fmt.Errorf("failed to initialize IPFS Cluster: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@ -161,9 +161,15 @@ func (cm *ClusterConfigManager) EnsureConfig() error {
|
||||
}
|
||||
|
||||
// Calculate ports based on pattern
|
||||
proxyPort := clusterPort - 1
|
||||
pinSvcPort := clusterPort + 1
|
||||
clusterListenPort := clusterPort + 2
|
||||
// REST API: 9094
|
||||
// Proxy: 9094 - 1 = 9093 (NOT USED - keeping for reference)
|
||||
// PinSvc: 9094 + 1 = 9095
|
||||
// Proxy API: 9094 + 1 = 9095 (actual proxy port)
|
||||
// PinSvc API: 9094 + 3 = 9097
|
||||
// Cluster LibP2P: 9094 + 4 = 9098
|
||||
proxyPort := clusterPort + 1 // 9095 (IPFSProxy API)
|
||||
pinSvcPort := clusterPort + 3 // 9097 (PinSvc API)
|
||||
clusterListenPort := clusterPort + 4 // 9098 (Cluster LibP2P)
|
||||
|
||||
// If config doesn't exist, initialize it with ipfs-cluster-service init
|
||||
// This ensures we have all required sections (datastore, informer, etc.)
|
||||
@ -246,8 +252,9 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo
|
||||
return false, fmt.Errorf("failed to parse bootstrap cluster API URL: %w", err)
|
||||
}
|
||||
|
||||
// Bootstrap listens on clusterPort + 2 (same pattern)
|
||||
bootstrapClusterPort := clusterPort + 2
|
||||
// Bootstrap cluster LibP2P listens on clusterPort + 4
|
||||
// (REST API is 9094, LibP2P is 9098 = 9094 + 4)
|
||||
bootstrapClusterPort := clusterPort + 4
|
||||
|
||||
// Determine IP protocol (ip4 or ip6) based on the host
|
||||
var ipProtocol string
|
||||
@ -266,17 +273,31 @@ func (cm *ClusterConfigManager) UpdateBootstrapPeers(bootstrapAPIURL string) (bo
|
||||
return false, fmt.Errorf("failed to load config: %w", err)
|
||||
}
|
||||
|
||||
// Check if we already have the correct address configured
|
||||
if len(cfg.Cluster.PeerAddresses) > 0 && cfg.Cluster.PeerAddresses[0] == bootstrapPeerAddr {
|
||||
cm.logger.Debug("Bootstrap peer address already correct", zap.String("addr", bootstrapPeerAddr))
|
||||
return true, nil
|
||||
// CRITICAL: Always update peerstore file to ensure no stale addresses remain
|
||||
// Stale addresses (e.g., from old port configurations) cause LibP2P dial backoff,
|
||||
// preventing cluster peers from connecting even if the correct address is present.
|
||||
// We must clean and rewrite the peerstore on every update to avoid this.
|
||||
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
|
||||
|
||||
// Check if peerstore needs updating (avoid unnecessary writes but always clean stale entries)
|
||||
needsUpdate := true
|
||||
if peerstoreData, err := os.ReadFile(peerstorePath); err == nil {
|
||||
// Only skip update if peerstore contains EXACTLY the correct address and nothing else
|
||||
existingAddrs := strings.Split(strings.TrimSpace(string(peerstoreData)), "\n")
|
||||
if len(existingAddrs) == 1 && strings.TrimSpace(existingAddrs[0]) == bootstrapPeerAddr {
|
||||
cm.logger.Debug("Bootstrap peer address already correct in peerstore", zap.String("addr", bootstrapPeerAddr))
|
||||
needsUpdate = false
|
||||
}
|
||||
}
|
||||
|
||||
// Update peerstore file FIRST - this is what IPFS Cluster reads for bootstrapping
|
||||
// Peerstore is the source of truth, service.json is just for our tracking
|
||||
peerstorePath := filepath.Join(cm.clusterPath, "peerstore")
|
||||
if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil {
|
||||
return false, fmt.Errorf("failed to write peerstore: %w", err)
|
||||
if needsUpdate {
|
||||
// Write ONLY the correct bootstrap peer address, removing any stale entries
|
||||
if err := os.WriteFile(peerstorePath, []byte(bootstrapPeerAddr+"\n"), 0644); err != nil {
|
||||
return false, fmt.Errorf("failed to write peerstore: %w", err)
|
||||
}
|
||||
cm.logger.Info("Updated peerstore with bootstrap peer (cleaned stale entries)",
|
||||
zap.String("addr", bootstrapPeerAddr),
|
||||
zap.String("peerstore_path", peerstorePath))
|
||||
}
|
||||
|
||||
// Then sync service.json from peerstore to keep them in sync
|
||||
@ -852,8 +873,9 @@ func parseClusterPorts(clusterAPIURL string) (clusterPort, restAPIPort int, err
|
||||
return 0, 0, fmt.Errorf("invalid port: %s", portStr)
|
||||
}
|
||||
|
||||
// Cluster listen port is typically REST API port + 2
|
||||
clusterPort = restAPIPort + 2
|
||||
// clusterPort is used as the base port for calculations
|
||||
// The actual cluster LibP2P listen port is calculated as clusterPort + 4
|
||||
clusterPort = restAPIPort
|
||||
|
||||
return clusterPort, restAPIPort, nil
|
||||
}
|
||||
|
||||
@ -101,8 +101,10 @@ func getLevelColor(level zapcore.Level) string {
|
||||
// coloredConsoleEncoder creates a custom encoder with colors
|
||||
func coloredConsoleEncoder(enableColors bool) zapcore.Encoder {
|
||||
config := zap.NewDevelopmentEncoderConfig()
|
||||
|
||||
// Ultra-short timestamp: HH:MM:SS (no milliseconds, no date, no timezone)
|
||||
config.EncodeTime = func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
|
||||
timeStr := t.Format("2006-01-02T15:04:05.000Z0700")
|
||||
timeStr := t.Format("15:04:05")
|
||||
if enableColors {
|
||||
enc.AppendString(fmt.Sprintf("%s%s%s", Dim, timeStr, Reset))
|
||||
} else {
|
||||
@ -110,21 +112,41 @@ func coloredConsoleEncoder(enableColors bool) zapcore.Encoder {
|
||||
}
|
||||
}
|
||||
|
||||
// Single letter level: D, I, W, E
|
||||
config.EncodeLevel = func(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
|
||||
levelStr := strings.ToUpper(level.String())
|
||||
levelMap := map[zapcore.Level]string{
|
||||
zapcore.DebugLevel: "D",
|
||||
zapcore.InfoLevel: "I",
|
||||
zapcore.WarnLevel: "W",
|
||||
zapcore.ErrorLevel: "E",
|
||||
}
|
||||
levelStr := levelMap[level]
|
||||
if levelStr == "" {
|
||||
levelStr = "?"
|
||||
}
|
||||
if enableColors {
|
||||
color := getLevelColor(level)
|
||||
enc.AppendString(fmt.Sprintf("%s%s%-5s%s", color, Bold, levelStr, Reset))
|
||||
enc.AppendString(fmt.Sprintf("%s%s%s%s", color, Bold, levelStr, Reset))
|
||||
} else {
|
||||
enc.AppendString(fmt.Sprintf("%-5s", levelStr))
|
||||
enc.AppendString(levelStr)
|
||||
}
|
||||
}
|
||||
|
||||
// Just filename, no line number for cleaner output
|
||||
config.EncodeCaller = func(caller zapcore.EntryCaller, enc zapcore.PrimitiveArrayEncoder) {
|
||||
file := caller.File
|
||||
// Extract just the filename from the path
|
||||
if idx := strings.LastIndex(file, "/"); idx >= 0 {
|
||||
file = file[idx+1:]
|
||||
}
|
||||
// Remove .go extension for even more compact format
|
||||
if strings.HasSuffix(file, ".go") {
|
||||
file = file[:len(file)-3]
|
||||
}
|
||||
if enableColors {
|
||||
enc.AppendString(fmt.Sprintf("%s%s%s", Dim, caller.TrimmedPath(), Reset))
|
||||
enc.AppendString(fmt.Sprintf("%s%s%s", Dim, file, Reset))
|
||||
} else {
|
||||
enc.AppendString(caller.TrimmedPath())
|
||||
enc.AppendString(file)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -166,8 +166,7 @@ func (c *ClusterDiscoveryService) collectPeerMetadata() []*discovery.RQLiteNodeM
|
||||
connectedPeers := c.host.Network().Peers()
|
||||
var metadata []*discovery.RQLiteNodeMetadata
|
||||
|
||||
c.logger.Debug("Collecting peer metadata from LibP2P",
|
||||
zap.Int("connected_libp2p_peers", len(connectedPeers)))
|
||||
// Metadata collection is routine - no need to log every occurrence
|
||||
|
||||
c.mu.RLock()
|
||||
currentRaftAddr := c.raftAddress
|
||||
@ -240,9 +239,6 @@ type membershipUpdateResult struct {
|
||||
func (c *ClusterDiscoveryService) updateClusterMembership() {
|
||||
metadata := c.collectPeerMetadata()
|
||||
|
||||
c.logger.Debug("Collected peer metadata",
|
||||
zap.Int("metadata_count", len(metadata)))
|
||||
|
||||
// Compute membership changes while holding lock
|
||||
c.mu.Lock()
|
||||
result := c.computeMembershipChangesLocked(metadata)
|
||||
@ -252,35 +248,30 @@ func (c *ClusterDiscoveryService) updateClusterMembership() {
|
||||
if result.changed {
|
||||
// Log state changes (peer added/removed) at Info level
|
||||
if len(result.added) > 0 || len(result.updated) > 0 {
|
||||
c.logger.Info("Cluster membership changed",
|
||||
c.logger.Info("Membership changed",
|
||||
zap.Int("added", len(result.added)),
|
||||
zap.Int("updated", len(result.updated)),
|
||||
zap.Strings("added_ids", result.added),
|
||||
zap.Strings("updated_ids", result.updated))
|
||||
zap.Strings("added", result.added),
|
||||
zap.Strings("updated", result.updated))
|
||||
}
|
||||
|
||||
// Write peers.json without holding lock
|
||||
if err := c.writePeersJSONWithData(result.peersJSON); err != nil {
|
||||
c.logger.Error("CRITICAL: Failed to write peers.json",
|
||||
c.logger.Error("Failed to write peers.json",
|
||||
zap.Error(err),
|
||||
zap.String("data_dir", c.dataDir),
|
||||
zap.Int("peer_count", len(result.peersJSON)))
|
||||
zap.Int("peers", len(result.peersJSON)))
|
||||
} else {
|
||||
c.logger.Debug("peers.json updated",
|
||||
zap.Int("peer_count", len(result.peersJSON)))
|
||||
zap.Int("peers", len(result.peersJSON)))
|
||||
}
|
||||
|
||||
// Update lastUpdate timestamp
|
||||
c.mu.Lock()
|
||||
c.lastUpdate = time.Now()
|
||||
c.mu.Unlock()
|
||||
} else {
|
||||
c.mu.RLock()
|
||||
totalPeers := len(c.knownPeers)
|
||||
c.mu.RUnlock()
|
||||
c.logger.Debug("No changes to cluster membership",
|
||||
zap.Int("total_peers", totalPeers))
|
||||
}
|
||||
// No changes - don't log (reduces noise)
|
||||
}
|
||||
|
||||
// computeMembershipChangesLocked computes membership changes and returns snapshot data
|
||||
@ -305,10 +296,10 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis
|
||||
} else {
|
||||
// New peer discovered
|
||||
added = append(added, meta.NodeID)
|
||||
c.logger.Info("Node added to cluster",
|
||||
zap.String("node_id", meta.NodeID),
|
||||
zap.String("raft_address", meta.RaftAddress),
|
||||
zap.String("node_type", meta.NodeType),
|
||||
c.logger.Info("Node added",
|
||||
zap.String("node", meta.NodeID),
|
||||
zap.String("raft", meta.RaftAddress),
|
||||
zap.String("type", meta.NodeType),
|
||||
zap.Uint64("log_index", meta.RaftLogIndex))
|
||||
}
|
||||
|
||||
@ -354,11 +345,10 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis
|
||||
requiredRemotePeers := c.minClusterSize - 1
|
||||
|
||||
if remotePeerCount < requiredRemotePeers {
|
||||
c.logger.Info("Skipping initial peers.json write - not enough remote peers discovered",
|
||||
zap.Int("remote_peers", remotePeerCount),
|
||||
zap.Int("required_remote_peers", requiredRemotePeers),
|
||||
zap.Int("min_cluster_size", c.minClusterSize),
|
||||
zap.String("action", "will write when enough peers are discovered"))
|
||||
c.logger.Info("Waiting for peers",
|
||||
zap.Int("have", remotePeerCount),
|
||||
zap.Int("need", requiredRemotePeers),
|
||||
zap.Int("min_size", c.minClusterSize))
|
||||
return membershipUpdateResult{
|
||||
changed: false,
|
||||
}
|
||||
@ -367,8 +357,7 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis
|
||||
|
||||
// Additional safety check: don't write empty peers.json (would cause single-node cluster)
|
||||
if len(peers) == 0 && c.lastUpdate.IsZero() {
|
||||
c.logger.Info("Skipping peers.json write - no remote peers to include",
|
||||
zap.String("action", "will write when peers are discovered"))
|
||||
c.logger.Info("No remote peers - waiting")
|
||||
return membershipUpdateResult{
|
||||
changed: false,
|
||||
}
|
||||
@ -376,10 +365,10 @@ func (c *ClusterDiscoveryService) computeMembershipChangesLocked(metadata []*dis
|
||||
|
||||
// Log initial sync if this is the first time
|
||||
if c.lastUpdate.IsZero() {
|
||||
c.logger.Info("Initial cluster membership sync",
|
||||
zap.Int("total_peers", len(c.knownPeers)),
|
||||
zap.Int("remote_peers", remotePeerCount),
|
||||
zap.Int("peers_in_json", len(peers)))
|
||||
c.logger.Info("Initial sync",
|
||||
zap.Int("total", len(c.knownPeers)),
|
||||
zap.Int("remote", remotePeerCount),
|
||||
zap.Int("in_json", len(peers)))
|
||||
}
|
||||
|
||||
return membershipUpdateResult{
|
||||
@ -408,8 +397,8 @@ func (c *ClusterDiscoveryService) removeInactivePeers() {
|
||||
|
||||
if inactiveDuration > c.inactivityLimit {
|
||||
// Mark as inactive and remove
|
||||
c.logger.Warn("Node removed from cluster",
|
||||
zap.String("node_id", nodeID),
|
||||
c.logger.Warn("Node removed",
|
||||
zap.String("node", nodeID),
|
||||
zap.String("reason", "inactive"),
|
||||
zap.Duration("inactive_duration", inactiveDuration))
|
||||
|
||||
@ -421,9 +410,9 @@ func (c *ClusterDiscoveryService) removeInactivePeers() {
|
||||
|
||||
// Regenerate peers.json if any peers were removed
|
||||
if len(removed) > 0 {
|
||||
c.logger.Info("Removed inactive nodes, regenerating peers.json",
|
||||
zap.Int("removed", len(removed)),
|
||||
zap.Strings("node_ids", removed))
|
||||
c.logger.Info("Removed inactive",
|
||||
zap.Int("count", len(removed)),
|
||||
zap.Strings("nodes", removed))
|
||||
|
||||
if err := c.writePeersJSON(); err != nil {
|
||||
c.logger.Error("Failed to write peers.json after cleanup", zap.Error(err))
|
||||
@ -443,10 +432,11 @@ func (c *ClusterDiscoveryService) getPeersJSONUnlocked() []map[string]interface{
|
||||
peers := make([]map[string]interface{}, 0, len(c.knownPeers))
|
||||
|
||||
for _, peer := range c.knownPeers {
|
||||
// Skip self - RQLite knows about itself, shouldn't be in peers.json
|
||||
if peer.NodeID == c.raftAddress {
|
||||
continue
|
||||
}
|
||||
// CRITICAL FIX: Include ALL peers (including self) in peers.json
|
||||
// When using bootstrap-expect with recovery, RQLite needs the complete
|
||||
// expected cluster configuration to properly form consensus.
|
||||
// The peers.json file is used by RQLite's recovery mechanism to know
|
||||
// what the full cluster membership should be, including the local node.
|
||||
peerEntry := map[string]interface{}{
|
||||
"id": peer.RaftAddress, // RQLite uses raft address as node ID
|
||||
"address": peer.RaftAddress,
|
||||
@ -482,11 +472,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte
|
||||
// Get the RQLite raft directory
|
||||
rqliteDir := filepath.Join(dataDir, "rqlite", "raft")
|
||||
|
||||
c.logger.Debug("Writing peers.json",
|
||||
zap.String("data_dir", c.dataDir),
|
||||
zap.String("expanded_path", dataDir),
|
||||
zap.String("raft_dir", rqliteDir),
|
||||
zap.Int("peer_count", len(peers)))
|
||||
// Writing peers.json - routine operation, no need to log details
|
||||
|
||||
if err := os.MkdirAll(rqliteDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create raft directory %s: %w", rqliteDir, err)
|
||||
@ -497,7 +483,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte
|
||||
|
||||
// Backup existing peers.json if it exists
|
||||
if _, err := os.Stat(peersFile); err == nil {
|
||||
c.logger.Debug("Backing up existing peers.json", zap.String("backup_file", backupFile))
|
||||
// Backup existing peers.json if it exists - routine operation
|
||||
data, err := os.ReadFile(peersFile)
|
||||
if err == nil {
|
||||
_ = os.WriteFile(backupFile, data, 0644)
|
||||
@ -510,7 +496,7 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte
|
||||
return fmt.Errorf("failed to marshal peers.json: %w", err)
|
||||
}
|
||||
|
||||
c.logger.Debug("Marshaled peers.json", zap.Int("data_size", len(data)))
|
||||
// Marshaled peers.json - routine operation
|
||||
|
||||
// Write atomically using temp file + rename
|
||||
tempFile := peersFile + ".tmp"
|
||||
@ -530,9 +516,8 @@ func (c *ClusterDiscoveryService) writePeersJSONWithData(peers []map[string]inte
|
||||
}
|
||||
|
||||
c.logger.Info("peers.json written",
|
||||
zap.String("file", peersFile),
|
||||
zap.Int("node_count", len(peers)),
|
||||
zap.Strings("node_ids", nodeIDs))
|
||||
zap.Int("peers", len(peers)),
|
||||
zap.Strings("nodes", nodeIDs))
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -648,17 +633,58 @@ func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context)
|
||||
|
||||
// TriggerSync manually triggers a cluster membership sync
|
||||
func (c *ClusterDiscoveryService) TriggerSync() {
|
||||
c.logger.Info("Manually triggering cluster membership sync")
|
||||
|
||||
// For bootstrap nodes, wait a bit for peer discovery to stabilize
|
||||
if c.nodeType == "bootstrap" {
|
||||
c.logger.Info("Bootstrap node: waiting for peer discovery to complete")
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
|
||||
c.updateClusterMembership()
|
||||
}
|
||||
|
||||
// ForceWritePeersJSON forces writing peers.json regardless of membership changes
|
||||
// This is useful after clearing raft state when we need to recreate peers.json
|
||||
func (c *ClusterDiscoveryService) ForceWritePeersJSON() error {
|
||||
c.logger.Info("Force writing peers.json")
|
||||
|
||||
// First, collect latest peer metadata to ensure we have current information
|
||||
metadata := c.collectPeerMetadata()
|
||||
|
||||
// Update known peers with latest metadata (without writing file yet)
|
||||
c.mu.Lock()
|
||||
for _, meta := range metadata {
|
||||
c.knownPeers[meta.NodeID] = meta
|
||||
// Update health tracking for remote peers
|
||||
if meta.NodeID != c.raftAddress {
|
||||
if _, ok := c.peerHealth[meta.NodeID]; !ok {
|
||||
c.peerHealth[meta.NodeID] = &PeerHealth{
|
||||
LastSeen: time.Now(),
|
||||
LastSuccessful: time.Now(),
|
||||
Status: "active",
|
||||
}
|
||||
} else {
|
||||
c.peerHealth[meta.NodeID].LastSeen = time.Now()
|
||||
c.peerHealth[meta.NodeID].Status = "active"
|
||||
}
|
||||
}
|
||||
}
|
||||
peers := c.getPeersJSONUnlocked()
|
||||
c.mu.Unlock()
|
||||
|
||||
// Now force write the file
|
||||
if err := c.writePeersJSONWithData(peers); err != nil {
|
||||
c.logger.Error("Failed to force write peers.json",
|
||||
zap.Error(err),
|
||||
zap.String("data_dir", c.dataDir),
|
||||
zap.Int("peers", len(peers)))
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Info("peers.json written",
|
||||
zap.Int("peers", len(peers)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TriggerPeerExchange actively exchanges peer information with connected peers
|
||||
// This populates the peerstore with RQLite metadata from other nodes
|
||||
func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error {
|
||||
@ -666,9 +692,8 @@ func (c *ClusterDiscoveryService) TriggerPeerExchange(ctx context.Context) error
|
||||
return fmt.Errorf("discovery manager not available")
|
||||
}
|
||||
|
||||
c.logger.Info("Triggering peer exchange via discovery manager")
|
||||
collected := c.discoveryMgr.TriggerPeerExchange(ctx)
|
||||
c.logger.Info("Peer exchange completed", zap.Int("peers_with_metadata", collected))
|
||||
c.logger.Debug("Exchange completed", zap.Int("with_metadata", collected))
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -709,8 +734,8 @@ func (c *ClusterDiscoveryService) UpdateOwnMetadata() {
|
||||
return
|
||||
}
|
||||
|
||||
c.logger.Debug("Updated own RQLite metadata",
|
||||
zap.String("node_id", metadata.NodeID),
|
||||
c.logger.Debug("Metadata updated",
|
||||
zap.String("node", metadata.NodeID),
|
||||
zap.Uint64("log_index", metadata.RaftLogIndex))
|
||||
}
|
||||
|
||||
@ -740,9 +765,9 @@ func (c *ClusterDiscoveryService) StoreRemotePeerMetadata(peerID peer.ID, metada
|
||||
return fmt.Errorf("failed to store metadata: %w", err)
|
||||
}
|
||||
|
||||
c.logger.Debug("Stored remote peer metadata",
|
||||
zap.String("peer_id", shortPeerID(peerID)),
|
||||
zap.String("node_id", metadata.NodeID))
|
||||
c.logger.Debug("Metadata stored",
|
||||
zap.String("peer", shortPeerID(peerID)),
|
||||
zap.String("node", metadata.NodeID))
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -758,9 +783,9 @@ func (c *ClusterDiscoveryService) adjustPeerAdvertisedAddresses(peerID peer.ID,
|
||||
|
||||
changed, stale := rewriteAdvertisedAddresses(meta, ip, true)
|
||||
if changed {
|
||||
c.logger.Debug("Normalized peer advertised RQLite addresses",
|
||||
zap.String("peer_id", shortPeerID(peerID)),
|
||||
zap.String("raft_address", meta.RaftAddress),
|
||||
c.logger.Debug("Addresses normalized",
|
||||
zap.String("peer", shortPeerID(peerID)),
|
||||
zap.String("raft", meta.RaftAddress),
|
||||
zap.String("http_address", meta.HTTPAddress))
|
||||
}
|
||||
return changed, stale
|
||||
|
||||
@ -34,11 +34,6 @@ type RQLiteManager struct {
|
||||
|
||||
// waitForSQLAvailable waits until a simple query succeeds, indicating a leader is known and queries can be served.
|
||||
func (r *RQLiteManager) waitForSQLAvailable(ctx context.Context) error {
|
||||
if r.connection == nil {
|
||||
r.logger.Error("No rqlite connection")
|
||||
return errors.New("no rqlite connection")
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
@ -48,6 +43,16 @@ func (r *RQLiteManager) waitForSQLAvailable(ctx context.Context) error {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
// Check for nil connection inside the loop to handle cases where
|
||||
// connection becomes nil during restart/recovery operations
|
||||
if r.connection == nil {
|
||||
attempts++
|
||||
if attempts%5 == 0 { // log every ~5s to reduce noise
|
||||
r.logger.Debug("Waiting for RQLite connection to be established")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
attempts++
|
||||
_, err := r.connection.QueryOne("SELECT 1")
|
||||
if err == nil {
|
||||
@ -417,6 +422,15 @@ func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDat
|
||||
if err := r.clearRaftState(rqliteDataDir); err != nil {
|
||||
r.logger.Error("Failed to clear Raft state", zap.Error(err))
|
||||
} else {
|
||||
// Force write peers.json after clearing state
|
||||
if r.discoveryService != nil {
|
||||
r.logger.Info("Force writing peers.json after clearing state for configuration mismatch recovery")
|
||||
if err := r.discoveryService.ForceWritePeersJSON(); err != nil {
|
||||
r.logger.Error("Failed to force write peers.json", zap.Error(err))
|
||||
}
|
||||
// Update peersPath after force write
|
||||
peersPath = filepath.Join(rqliteDataDir, "raft", "peers.json")
|
||||
}
|
||||
// Restart RQLite with clean state
|
||||
r.logger.Info("Raft state cleared, restarting RQLite for clean rejoin")
|
||||
if recoveryErr := r.recoverCluster(ctx, peersPath); recoveryErr == nil {
|
||||
@ -1279,15 +1293,30 @@ func (r *RQLiteManager) recoverFromSplitBrain(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to clear Raft state: %w", err)
|
||||
}
|
||||
|
||||
// Step 5: Ensure peers.json exists with all discovered peers
|
||||
r.discoveryService.TriggerSync()
|
||||
time.Sleep(2 * time.Second)
|
||||
// Step 5: Refresh peer metadata and force write peers.json
|
||||
// We trigger peer exchange again to ensure we have the absolute latest metadata
|
||||
// after clearing state, then force write peers.json regardless of changes
|
||||
r.logger.Info("Refreshing peer metadata after clearing raft state")
|
||||
r.discoveryService.TriggerPeerExchange(ctx)
|
||||
time.Sleep(1 * time.Second) // Brief wait for peer exchange to complete
|
||||
|
||||
r.logger.Info("Force writing peers.json with all discovered peers")
|
||||
// We use ForceWritePeersJSON instead of TriggerSync because TriggerSync
|
||||
// only writes if membership changed, but after clearing state we need
|
||||
// to write regardless of changes
|
||||
if err := r.discoveryService.ForceWritePeersJSON(); err != nil {
|
||||
return fmt.Errorf("failed to force write peers.json: %w", err)
|
||||
}
|
||||
|
||||
// Verify peers.json was created
|
||||
peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json")
|
||||
if _, err := os.Stat(peersPath); err != nil {
|
||||
return fmt.Errorf("peers.json not created: %w", err)
|
||||
return fmt.Errorf("peers.json not created after force write: %w", err)
|
||||
}
|
||||
|
||||
r.logger.Info("peers.json verified after force write",
|
||||
zap.String("peers_path", peersPath))
|
||||
|
||||
// Step 6: Restart RQLite to pick up new peers.json
|
||||
r.logger.Info("Restarting RQLite to apply new cluster configuration")
|
||||
if err := r.recoverCluster(ctx, peersPath); err != nil {
|
||||
@ -1435,6 +1464,14 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql
|
||||
if err := r.clearRaftState(rqliteDataDir); err != nil {
|
||||
r.logger.Error("Failed to clear Raft state", zap.Error(err))
|
||||
// Continue anyway - rqlite might still be able to recover
|
||||
} else {
|
||||
// Force write peers.json after clearing stale state
|
||||
if r.discoveryService != nil {
|
||||
r.logger.Info("Force writing peers.json after clearing stale Raft state")
|
||||
if err := r.discoveryService.ForceWritePeersJSON(); err != nil {
|
||||
r.logger.Error("Failed to force write peers.json after clearing stale state", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
379
scripts/test-cluster-health.sh
Executable file
379
scripts/test-cluster-health.sh
Executable file
@ -0,0 +1,379 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Production Cluster Health Check Script
|
||||
# Tests RQLite, IPFS, and IPFS Cluster connectivity and replication
|
||||
|
||||
# Note: We don't use 'set -e' here because we want to continue testing even if individual checks fail
|
||||
|
||||
# Colors for output
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
BLUE='\033[0;34m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# Node IPs - Update these if needed
|
||||
BOOTSTRAP="${BOOTSTRAP:-51.83.128.181}"
|
||||
NODE1="${NODE1:-57.128.223.92}"
|
||||
NODE2="${NODE2:-185.185.83.89}"
|
||||
|
||||
ALL_NODES=($BOOTSTRAP $NODE1 $NODE2)
|
||||
|
||||
# Counters
|
||||
PASSED=0
|
||||
FAILED=0
|
||||
WARNINGS=0
|
||||
|
||||
# Helper functions
|
||||
print_header() {
|
||||
echo ""
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
echo -e "${BLUE}$1${NC}"
|
||||
echo -e "${BLUE}========================================${NC}"
|
||||
}
|
||||
|
||||
print_test() {
|
||||
echo -e "${YELLOW}▶ $1${NC}"
|
||||
}
|
||||
|
||||
print_pass() {
|
||||
echo -e "${GREEN}✓ $1${NC}"
|
||||
PASSED=$((PASSED + 1))
|
||||
}
|
||||
|
||||
print_fail() {
|
||||
echo -e "${RED}✗ $1${NC}"
|
||||
FAILED=$((FAILED + 1))
|
||||
}
|
||||
|
||||
print_warn() {
|
||||
echo -e "${YELLOW}⚠ $1${NC}"
|
||||
WARNINGS=$((WARNINGS + 1))
|
||||
}
|
||||
|
||||
print_info() {
|
||||
echo -e " $1"
|
||||
}
|
||||
|
||||
# Test functions
|
||||
test_rqlite_status() {
|
||||
print_header "1. RQLITE CLUSTER STATUS"
|
||||
|
||||
local leader_found=false
|
||||
local follower_count=0
|
||||
local commit_indices=()
|
||||
|
||||
for i in "${!ALL_NODES[@]}"; do
|
||||
local node="${ALL_NODES[$i]}"
|
||||
print_test "Testing RQLite on $node"
|
||||
|
||||
if ! response=$(curl -s --max-time 5 http://$node:5001/status 2>/dev/null); then
|
||||
print_fail "Cannot connect to RQLite on $node:5001"
|
||||
continue
|
||||
fi
|
||||
|
||||
local state=$(echo "$response" | jq -r '.store.raft.state // "unknown"')
|
||||
local num_peers=$(echo "$response" | jq -r '.store.raft.num_peers // 0')
|
||||
local commit_index=$(echo "$response" | jq -r '.store.raft.commit_index // 0')
|
||||
local last_contact=$(echo "$response" | jq -r '.store.raft.last_contact // "N/A"')
|
||||
local config=$(echo "$response" | jq -r '.store.raft.latest_configuration // "[]"')
|
||||
local node_count=$(echo "$config" | grep -o "Address" | wc -l | tr -d ' ')
|
||||
|
||||
commit_indices+=($commit_index)
|
||||
|
||||
print_info "State: $state | Peers: $num_peers | Commit Index: $commit_index | Cluster Nodes: $node_count"
|
||||
|
||||
# Check state
|
||||
if [ "$state" = "Leader" ]; then
|
||||
leader_found=true
|
||||
print_pass "Node $node is the Leader"
|
||||
elif [ "$state" = "Follower" ]; then
|
||||
follower_count=$((follower_count + 1))
|
||||
# Check last contact
|
||||
if [ "$last_contact" != "N/A" ] && [ "$last_contact" != "0" ]; then
|
||||
print_pass "Node $node is a Follower (last contact: $last_contact)"
|
||||
else
|
||||
print_warn "Node $node is Follower but last_contact is $last_contact"
|
||||
fi
|
||||
else
|
||||
print_fail "Node $node has unexpected state: $state"
|
||||
fi
|
||||
|
||||
# Check peer count
|
||||
if [ "$num_peers" = "2" ]; then
|
||||
print_pass "Node $node has correct peer count: 2"
|
||||
else
|
||||
print_fail "Node $node has incorrect peer count: $num_peers (expected 2)"
|
||||
fi
|
||||
|
||||
# Check cluster configuration
|
||||
if [ "$node_count" = "3" ]; then
|
||||
print_pass "Node $node sees all 3 cluster members"
|
||||
else
|
||||
print_fail "Node $node only sees $node_count cluster members (expected 3)"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
done
|
||||
|
||||
# Check for exactly 1 leader
|
||||
if [ "$leader_found" = true ] && [ "$follower_count" = "2" ]; then
|
||||
print_pass "Cluster has 1 Leader and 2 Followers ✓"
|
||||
else
|
||||
print_fail "Invalid cluster state (Leader found: $leader_found, Followers: $follower_count)"
|
||||
fi
|
||||
|
||||
# Check commit index sync
|
||||
if [ ${#commit_indices[@]} -eq 3 ]; then
|
||||
local first="${commit_indices[0]}"
|
||||
local all_same=true
|
||||
for idx in "${commit_indices[@]}"; do
|
||||
if [ "$idx" != "$first" ]; then
|
||||
all_same=false
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$all_same" = true ]; then
|
||||
print_pass "All nodes have synced commit index: $first"
|
||||
else
|
||||
print_warn "Commit indices differ: ${commit_indices[*]} (might be normal if writes are happening)"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
test_rqlite_replication() {
|
||||
print_header "2. RQLITE REPLICATION TEST"
|
||||
|
||||
print_test "Creating test table and inserting data on leader ($BOOTSTRAP)"
|
||||
|
||||
# Create table
|
||||
if ! response=$(curl -s --max-time 5 -XPOST "http://$BOOTSTRAP:5001/db/execute" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '[["CREATE TABLE IF NOT EXISTS test_cluster_health (id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, node TEXT, value TEXT)"]]' 2>/dev/null); then
|
||||
print_fail "Failed to create table"
|
||||
return
|
||||
fi
|
||||
|
||||
if echo "$response" | jq -e '.results[0].error' >/dev/null 2>&1; then
|
||||
local error=$(echo "$response" | jq -r '.results[0].error')
|
||||
if [[ "$error" != "table test_cluster_health already exists" ]]; then
|
||||
print_fail "Table creation error: $error"
|
||||
return
|
||||
fi
|
||||
fi
|
||||
print_pass "Table exists"
|
||||
|
||||
# Insert test data
|
||||
local test_value="test_$(date +%s)"
|
||||
if ! response=$(curl -s --max-time 5 -XPOST "http://$BOOTSTRAP:5001/db/execute" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "[
|
||||
[\"INSERT INTO test_cluster_health (timestamp, node, value) VALUES (datetime('now'), 'bootstrap', '$test_value')\"]
|
||||
]" 2>/dev/null); then
|
||||
print_fail "Failed to insert data"
|
||||
return
|
||||
fi
|
||||
|
||||
if echo "$response" | jq -e '.results[0].error' >/dev/null 2>&1; then
|
||||
local error=$(echo "$response" | jq -r '.results[0].error')
|
||||
print_fail "Insert error: $error"
|
||||
return
|
||||
fi
|
||||
print_pass "Data inserted: $test_value"
|
||||
|
||||
# Wait for replication
|
||||
print_info "Waiting 2 seconds for replication..."
|
||||
sleep 2
|
||||
|
||||
# Query from all nodes
|
||||
for node in "${ALL_NODES[@]}"; do
|
||||
print_test "Reading from $node"
|
||||
|
||||
if ! response=$(curl -s --max-time 5 -XPOST "http://$node:5001/db/query?level=weak" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "[\"SELECT * FROM test_cluster_health WHERE value = '$test_value' LIMIT 1\"]" 2>/dev/null); then
|
||||
print_fail "Failed to query from $node"
|
||||
continue
|
||||
fi
|
||||
|
||||
if echo "$response" | jq -e '.results[0].error' >/dev/null 2>&1; then
|
||||
local error=$(echo "$response" | jq -r '.results[0].error')
|
||||
print_fail "Query error on $node: $error"
|
||||
continue
|
||||
fi
|
||||
|
||||
local row_count=$(echo "$response" | jq -r '.results[0].values | length // 0')
|
||||
if [ "$row_count" = "1" ]; then
|
||||
local retrieved_value=$(echo "$response" | jq -r '.results[0].values[0][3] // ""')
|
||||
if [ "$retrieved_value" = "$test_value" ]; then
|
||||
print_pass "Data replicated correctly to $node"
|
||||
else
|
||||
print_fail "Data mismatch on $node (got: $retrieved_value, expected: $test_value)"
|
||||
fi
|
||||
else
|
||||
print_fail "Expected 1 row from $node, got $row_count"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
test_ipfs_status() {
|
||||
print_header "3. IPFS DAEMON STATUS"
|
||||
|
||||
for node in "${ALL_NODES[@]}"; do
|
||||
print_test "Testing IPFS on $node"
|
||||
|
||||
if ! response=$(curl -s --max-time 5 -X POST http://$node:4501/api/v0/id 2>/dev/null); then
|
||||
print_fail "Cannot connect to IPFS on $node:4501"
|
||||
continue
|
||||
fi
|
||||
|
||||
local peer_id=$(echo "$response" | jq -r '.ID // "unknown"')
|
||||
local addr_count=$(echo "$response" | jq -r '.Addresses | length // 0')
|
||||
local agent=$(echo "$response" | jq -r '.AgentVersion // "unknown"')
|
||||
|
||||
if [ "$peer_id" != "unknown" ]; then
|
||||
print_pass "IPFS running on $node (ID: ${peer_id:0:12}...)"
|
||||
print_info "Agent: $agent | Addresses: $addr_count"
|
||||
else
|
||||
print_fail "IPFS not responding correctly on $node"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
test_ipfs_swarm() {
|
||||
print_header "4. IPFS SWARM CONNECTIVITY"
|
||||
|
||||
for node in "${ALL_NODES[@]}"; do
|
||||
print_test "Checking IPFS swarm peers on $node"
|
||||
|
||||
if ! response=$(curl -s --max-time 5 -X POST http://$node:4501/api/v0/swarm/peers 2>/dev/null); then
|
||||
print_fail "Failed to get swarm peers from $node"
|
||||
continue
|
||||
fi
|
||||
|
||||
local peer_count=$(echo "$response" | jq -r '.Peers | length // 0')
|
||||
|
||||
if [ "$peer_count" = "2" ]; then
|
||||
print_pass "Node $node connected to 2 IPFS peers"
|
||||
elif [ "$peer_count" -gt "0" ]; then
|
||||
print_warn "Node $node connected to $peer_count IPFS peers (expected 2)"
|
||||
else
|
||||
print_fail "Node $node has no IPFS swarm peers"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
test_ipfs_cluster_status() {
|
||||
print_header "5. IPFS CLUSTER STATUS"
|
||||
|
||||
for node in "${ALL_NODES[@]}"; do
|
||||
print_test "Testing IPFS Cluster on $node"
|
||||
|
||||
if ! response=$(curl -s --max-time 5 http://$node:9094/id 2>/dev/null); then
|
||||
print_fail "Cannot connect to IPFS Cluster on $node:9094"
|
||||
continue
|
||||
fi
|
||||
|
||||
local cluster_id=$(echo "$response" | jq -r '.id // "unknown"')
|
||||
local cluster_peers=$(echo "$response" | jq -r '.cluster_peers | length // 0')
|
||||
local version=$(echo "$response" | jq -r '.version // "unknown"')
|
||||
|
||||
if [ "$cluster_id" != "unknown" ]; then
|
||||
print_pass "IPFS Cluster running on $node (ID: ${cluster_id:0:12}...)"
|
||||
print_info "Version: $version | Cluster Peers: $cluster_peers"
|
||||
|
||||
if [ "$cluster_peers" = "3" ]; then
|
||||
print_pass "Node $node sees all 3 cluster peers"
|
||||
else
|
||||
print_warn "Node $node sees $cluster_peers cluster peers (expected 3)"
|
||||
fi
|
||||
else
|
||||
print_fail "IPFS Cluster not responding correctly on $node"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
test_ipfs_cluster_pins() {
|
||||
print_header "6. IPFS CLUSTER PIN CONSISTENCY"
|
||||
|
||||
local pin_counts=()
|
||||
|
||||
for node in "${ALL_NODES[@]}"; do
|
||||
print_test "Checking pins on $node"
|
||||
|
||||
if ! response=$(curl -s --max-time 5 http://$node:9094/pins 2>/dev/null); then
|
||||
print_fail "Failed to get pins from $node"
|
||||
pin_counts+=(0)
|
||||
continue
|
||||
fi
|
||||
|
||||
local pin_count=$(echo "$response" | jq -r 'length // 0')
|
||||
pin_counts+=($pin_count)
|
||||
print_pass "Node $node has $pin_count pins"
|
||||
done
|
||||
|
||||
# Check if all nodes have same pin count
|
||||
if [ ${#pin_counts[@]} -eq 3 ]; then
|
||||
local first="${pin_counts[0]}"
|
||||
local all_same=true
|
||||
for count in "${pin_counts[@]}"; do
|
||||
if [ "$count" != "$first" ]; then
|
||||
all_same=false
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [ "$all_same" = true ]; then
|
||||
print_pass "All nodes have consistent pin count: $first"
|
||||
else
|
||||
print_warn "Pin counts differ: ${pin_counts[*]} (might be syncing)"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
print_summary() {
|
||||
print_header "TEST SUMMARY"
|
||||
|
||||
echo ""
|
||||
echo -e "${GREEN}Passed: $PASSED${NC}"
|
||||
echo -e "${YELLOW}Warnings: $WARNINGS${NC}"
|
||||
echo -e "${RED}Failed: $FAILED${NC}"
|
||||
echo ""
|
||||
|
||||
if [ $FAILED -eq 0 ]; then
|
||||
echo -e "${GREEN}🎉 All critical tests passed! Cluster is healthy.${NC}"
|
||||
exit 0
|
||||
elif [ $FAILED -le 2 ]; then
|
||||
echo -e "${YELLOW}⚠️ Some tests failed. Review the output above.${NC}"
|
||||
exit 1
|
||||
else
|
||||
echo -e "${RED}❌ Multiple failures detected. Cluster needs attention.${NC}"
|
||||
exit 2
|
||||
fi
|
||||
}
|
||||
|
||||
# Main execution
|
||||
main() {
|
||||
echo ""
|
||||
echo -e "${BLUE}╔════════════════════════════════════════════╗${NC}"
|
||||
echo -e "${BLUE}║ DEBROS Production Cluster Health Check ║${NC}"
|
||||
echo -e "${BLUE}╚════════════════════════════════════════════╝${NC}"
|
||||
echo ""
|
||||
echo "Testing cluster:"
|
||||
echo " Bootstrap: $BOOTSTRAP"
|
||||
echo " Node 1: $NODE1"
|
||||
echo " Node 2: $NODE2"
|
||||
|
||||
test_rqlite_status
|
||||
test_rqlite_replication
|
||||
test_ipfs_status
|
||||
test_ipfs_swarm
|
||||
test_ipfs_cluster_status
|
||||
test_ipfs_cluster_pins
|
||||
print_summary
|
||||
}
|
||||
|
||||
# Run main
|
||||
main
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user