From 4c1f8429399c75024e01a0a8b0a8d93179d0c660 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Fri, 21 Nov 2025 13:52:55 +0200 Subject: [PATCH] feat: enhance service shutdown and logging in development environment - Improved the `stop` target in the Makefile to ensure graceful shutdown of development services, allowing for a more reliable process termination. - Updated the `StopAll` method in the ProcessManager to provide clearer logging during service shutdown, including progress updates and error handling. - Added a new `PushNotificationService` to handle sending push notifications via Expo, including bulk notification capabilities and improved error handling. - Refactored RQLite management to streamline node identification and logging, ensuring consistent behavior across node types during startup and recovery. --- CHANGELOG.md | 36 ++ Makefile | 8 +- README.md | 335 ++++++++++++++++++ pkg/cli/dev_commands.go | 3 + pkg/discovery/discovery.go | 14 +- pkg/environments/development/runner.go | 17 +- pkg/gateway/push_notifications.go | 184 ++++++++++ pkg/node/node.go | 19 +- pkg/rqlite/cluster_discovery.go | 6 +- pkg/rqlite/rqlite.go | 465 ++++++------------------- 10 files changed, 703 insertions(+), 384 deletions(-) create mode 100644 pkg/gateway/push_notifications.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 23a85ae..c0b7a10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,42 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.69.17] - 2025-11-21 + +### Added +- Initial implementation of a Push Notification Service for the Gateway, utilizing the Expo API. +- Detailed documentation for RQLite operations, monitoring, and troubleshooting was added to the README. + +### Changed +- Improved `make stop` and `dbn dev down` commands to ensure all development services are forcefully killed after graceful shutdown attempt. +- Refactored RQLite startup logic to simplify cluster establishment and remove complex, error-prone leadership/recovery checks, relying on RQLite's built-in join mechanism. +- RQLite logs are now written to individual log files (e.g., `~/.debros/logs/rqlite-bootstrap.log`) instead of stdout/stderr, improving development environment clarity. +- Improved peer exchange discovery logging to suppress expected 'protocols not supported' warnings from lightweight clients like the Gateway. + +### Deprecated + +### Removed + +### Fixed +\n +## [0.69.17] - 2025-11-21 + +### Added +- Initial implementation of a Push Notification Service for the Gateway, utilizing the Expo API. +- Detailed documentation for RQLite operations, monitoring, and troubleshooting in the README. + +### Changed +- Improved `make stop` and `dbn dev down` commands to ensure all development services are forcefully killed after graceful shutdown attempt. +- Refactored RQLite startup logic to simplify cluster establishment and remove complex, error-prone leadership/recovery checks, relying on RQLite's built-in join mechanism. +- RQLite logs are now written to individual log files (e.g., `~/.debros/logs/rqlite-bootstrap.log`) instead of stdout/stderr, improving development environment clarity. +- Improved peer exchange discovery logging to suppress expected 'protocols not supported' warnings from lightweight clients like the Gateway. + +### Deprecated + +### Removed + +### Fixed +\n ## [0.69.16] - 2025-11-16 ### Added diff --git a/Makefile b/Makefile index 7660e69..c6d6145 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.69.16 +VERSION := 0.69.17 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' @@ -88,11 +88,9 @@ dev: build # Graceful shutdown of all dev services stop: @if [ -f ./bin/dbn ]; then \ - ./bin/dbn dev down; \ - else \ - echo "⚠️ dbn binary not found, using force kill instead..."; \ - bash scripts/dev-kill-all.sh; \ + ./bin/dbn dev down || true; \ fi + @bash scripts/dev-kill-all.sh # Force kill all processes (immediate termination) kill: diff --git a/README.md b/README.md index ddc4c79..e9a096a 100644 --- a/README.md +++ b/README.md @@ -590,15 +590,350 @@ Common endpoints (see `openapi/gateway.yaml` for the full spec): - `POST /v1/pubsub/publish`, `GET /v1/pubsub/topics`, `GET /v1/pubsub/ws?topic=` - `POST /v1/storage/upload`, `POST /v1/storage/pin`, `GET /v1/storage/status/:cid`, `GET /v1/storage/get/:cid`, `DELETE /v1/storage/unpin/:cid` +## RQLite Operations & Monitoring + +RQLite is the distributed SQL database backing DeBros Network. Proper monitoring and maintenance are critical for cluster health. + +### Connecting to RQLite + +```bash +# Local development (bootstrap) - port 5001 +rqlite -H localhost -p 5001 + +# Local development (bootstrap2) - port 5011 +rqlite -H localhost -p 5011 + +# Production nodes +rqlite -H 192.168.1.151 -p 5001 +``` + +### Health Checks (CRITICAL for Cluster Health) + +```bash +# Check node status and diagnostics +rqlite -H localhost -p 5001 ".status" + +# List all nodes in cluster (verify all nodes connected) +rqlite -H localhost -p 5001 ".nodes" + +# Check if node is ready for operations +rqlite -H localhost -p 5001 ".ready" + +# Get Go runtime info (goroutines, memory, performance) +rqlite -H localhost -p 5001 ".expvar" + +# Show all tables +rqlite -H localhost -p 5001 ".tables" + +# Show schema (CREATE statements) +rqlite -H localhost -p 5001 ".schema" + +# Show all indexes +rqlite -H localhost -p 5001 ".indexes" +``` + +### Backup & Restore + +```bash +# Backup database +rqlite -H localhost -p 5001 ".backup ~/rqlite-backup.db" + +# Restore from backup +rqlite -H localhost -p 5001 ".restore ~/rqlite-backup.db" + +# Dump database in SQL text format +rqlite -H localhost -p 5001 ".dump ~/rqlite-dump.sql" +``` + +### Consistency Levels (Important for Data Integrity) + +RQLite supports three consistency levels for read operations: + +```bash +# View current consistency level +rqlite -H localhost -p 5001 ".consistency" + +# Set to weak (default, good balance for most applications) +rqlite -H localhost -p 5001 ".consistency weak" + +# Set to strong (guaranteed consistency across entire cluster) +rqlite -H localhost -p 5001 ".consistency strong" + +# Set to none (fastest reads, no consistency guarantees) +rqlite -H localhost -p 5001 ".consistency none" +``` + +**Recommendation**: Use `weak` for general operations, `strong` when data integrity is critical, and `none` only for cache-like data. + +### Cluster Management + +```bash +# Show detailed cluster diagnostics +rqlite -H localhost -p 5001 ".sysdump /tmp/rqlite-diagnostic.txt" + +# Remove a node from cluster (use raft ID from .nodes output) +rqlite -H localhost -p 5001 ".remove " +``` + +### RQLite Log Files (Development) + +All RQLite logs are now written to individual files for easier debugging: + +``` +~/.debros/logs/rqlite-bootstrap.log +~/.debros/logs/rqlite-bootstrap2.log +~/.debros/logs/rqlite-node2.log +~/.debros/logs/rqlite-node3.log +~/.debros/logs/rqlite-node4.log +``` + +View logs: + +```bash +tail -f ~/.debros/logs/rqlite-bootstrap.log +tail -f ~/.debros/logs/rqlite-node2.log +dbn dev logs rqlite-bootstrap --follow +``` + +## Development Environment Operations + +### Starting & Managing Development Environment + +```bash +# Start the complete development stack (2 bootstraps + 3 nodes + gateway) +make dev + +# Check status of running services +dbn dev status + +# Stop all services +dbn dev down +``` + +### Development Logs + +```bash +# View logs for specific component +dbn dev logs bootstrap +dbn dev logs bootstrap2 +dbn dev logs node2 +dbn dev logs node3 +dbn dev logs node4 +dbn dev logs gateway +dbn dev logs olric +dbn dev logs anon + +# Follow logs in real-time (like tail -f) +dbn dev logs bootstrap --follow +dbn dev logs rqlite-bootstrap --follow +``` + +### Key Development Endpoints + +``` +Gateway: http://localhost:6001 +Bootstrap IPFS: http://localhost:4501 +Bootstrap2 IPFS: http://localhost:4511 +Node2 IPFS: http://localhost:4502 +Node3 IPFS: http://localhost:4503 +Node4 IPFS: http://localhost:4504 +Anon SOCKS: 127.0.0.1:9050 +Olric Cache: http://localhost:3320 +RQLite Bootstrap: http://localhost:5001 +RQLite Bootstrap2: http://localhost:5011 +RQLite Node2: http://localhost:5002 +RQLite Node3: http://localhost:5003 +RQLite Node4: http://localhost:5004 +``` + +## IPFS Configuration + +### Ensure Consistent Cluster Setup + +All nodes in a cluster must have identical `cluster.secret` and `swarm.key`: + +```bash +# Copy swarm key to each host (adjust path for bootstrap vs node): + +# Bootstrap node +sudo cp /home/debros/.debros/secrets/swarm.key /home/debros/.debros/data/bootstrap/ipfs/repo/swarm.key + +# Regular nodes +sudo cp /home/debros/.debros/secrets/swarm.key /home/debros/.debros/data/node/ipfs/repo/swarm.key + +# Fix permissions +sudo chown debros:debros /home/debros/.debros/data/*/ipfs/repo/swarm.key +sudo chmod 600 /home/debros/.debros/data/*/ipfs/repo/swarm.key +``` + +### Important IPFS Configuration Notes + +- **Production**: Update Olric config - change `0.0.0.0` to actual IP address for both entries +- **All Nodes**: Must have identical `cluster.secret` and `swarm.key` for cluster to form + ## Troubleshooting +### General Issues + - **Config directory errors**: Ensure `~/.debros/` exists, is writable, and has free disk space (`touch ~/.debros/test && rm ~/.debros/test`). - **Port conflicts**: Inspect with `lsof -i :4001` (or other ports) and stop conflicting processes or regenerate configs with new ports. - **Missing configs**: Run `./bin/dbn config init` before starting nodes. - **Cluster join issues**: Confirm the bootstrap node is running, `peer.info` multiaddr matches `bootstrap_peers`, and firewall rules allow the P2P ports. +### RQLite Troubleshooting + +#### Cluster Not Forming + +```bash +# Verify all nodes see each other +rqlite -H localhost -p 5001 ".nodes" + +# Check node readiness +rqlite -H localhost -p 5001 ".ready" + +# Check status and Raft logs +rqlite -H localhost -p 5001 ".status" +``` + +#### Broken RQLite Raft (Production) + +```bash +# Fix RQLite Raft consensus +sudo env HOME=/home/debros network-cli rqlite fix +``` + +#### Reset RQLite State (DESTRUCTIVE - Last Resort Only) + +```bash +# ⚠️ WARNING: This destroys all RQLite data! +rm -f ~/.debros/data/rqlite/raft.db +rm -f ~/.debros/data/rqlite/raft/peers.json +``` + +#### Kill IPFS Cluster Service + +```bash +pkill -f ipfs-cluster-service +``` + +### Services Not Starting + +```bash +# Check service status +systemctl status debros-node-bootstrap + +# View detailed logs +journalctl -u debros-node-bootstrap -n 100 + +# Check log files +tail -f /home/debros/.debros/logs/node-bootstrap.log +``` + +### Port Conflicts + +```bash +# Check what's using specific ports +sudo lsof -i :4001 # P2P port +sudo lsof -i :5001 # RQLite HTTP +sudo lsof -i :6001 # Gateway +sudo lsof -i :9094 # IPFS Cluster API + +# Kill all DeBros-related processes (except Anyone on 9050) +lsof -ti:7001,7002,7003,5001,5002,5003,6001,4001,3320,3322,9094 | xargs kill -9 2>/dev/null && echo "Killed processes" || echo "No processes found" +``` + +### Systemd Service Management + +```bash +# Stop all services (keeps Anyone proxy running on 9050) +sudo systemctl stop debros-* + +# Disable services from auto-start +sudo systemctl disable debros-* + +# Restart all services +sudo systemctl restart debros-* + +# Enable services for auto-start on boot +sudo systemctl enable debros-* + +# View all DeBros services +systemctl list-units 'debros-*' + +# Clean up failed services +sudo systemctl reset-failed +``` + +### Reset Installation (⚠️ Destroys All Data) + +```bash +# Start fresh (production) +sudo dbn prod uninstall +sudo rm -rf /home/debros/.debros +sudo dbn prod install --bootstrap --branch nightly +``` + +## Operations Cheat Sheet + +### User Management (Linux) + +```bash +# Switch to DeBros user +sudo -u debros bash + +# Kill all DeBros user processes +sudo killall -9 -u debros + +# Remove DeBros user completely +sudo userdel -r -f debros +``` + +### Installation & Deployment + +```bash +# Local development +make dev + +# Install nightly branch +wget https://raw.githubusercontent.com/DeBrosOfficial/network/refs/heads/nightly/scripts/install-debros-network.sh +chmod +x ./install-debros-network.sh +./install-debros-network.sh --prerelease --nightly + +# Production bootstrap node +sudo dbn prod install --bootstrap --branch nightly + +# Production secondary node +sudo dbn prod install \ + --vps-ip \ + --peers /ip4//tcp/4001/p2p/ \ + --branch nightly +``` + +### Configuration & Sudoers (Deploy User) + +```bash +# Add to sudoers for deploy automation +ubuntu ALL=(ALL) NOPASSWD: /bin/bash +ubuntu ALL=(ALL) NOPASSWD: /usr/bin/make + +# Git configuration +git config --global --add safe.directory /home/debros/src +``` + +### Authentication + +```bash +# Login to gateway +env DEBROS_GATEWAY_URL=https://node-kv4la8.debros.network dbn auth login +``` + ## Resources +- [RQLite CLI Documentation](https://rqlite.io/docs/cli/) +- [RQLite Features](https://rqlite.io/docs/features/) +- [RQLite Clustering Guide](https://rqlite.io/docs/clustering/) +- [RQLite Security](https://rqlite.io/docs/security/) +- [RQLite Backup & Restore](https://rqlite.io/docs/backup-and-restore/) - Go modules: `go mod tidy`, `go test ./...` - Automation: `make build`, `make dev`, `make run-gateway`, `make lint` - API reference: `openapi/gateway.yaml` diff --git a/pkg/cli/dev_commands.go b/pkg/cli/dev_commands.go index de25a3e..2173caf 100644 --- a/pkg/cli/dev_commands.go +++ b/pkg/cli/dev_commands.go @@ -136,7 +136,10 @@ func handleDevDown(args []string) { if err := pm.StopAll(ctx); err != nil { fmt.Fprintf(os.Stderr, "⚠️ Error stopping services: %v\n", err) + os.Exit(1) } + + fmt.Printf("✅ All services have been stopped\n\n") } func handleDevStatus(args []string) { diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index 5f9a41c..56d1456 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -6,6 +6,7 @@ import ( "errors" "io" "strconv" + "strings" "time" "github.com/libp2p/go-libp2p/core/host" @@ -420,11 +421,20 @@ func (d *Manager) requestPeersFromPeer(ctx context.Context, peerID peer.ID, limi // Open a stream to the peer stream, err := d.host.NewStream(ctx, peerID, PeerExchangeProtocol) if err != nil { - // Suppress repeated warnings for the same peer (log once per minute max) + // Check if this is a "protocols not supported" error (expected for lightweight clients like gateway) + if strings.Contains(err.Error(), "protocols not supported") { + // This is a lightweight client (gateway, etc.) that doesn't support peer exchange - expected behavior + // Track it to avoid repeated attempts, but don't log as it's not an error + d.failedPeerExchanges[peerID] = time.Now() + return nil + } + + // For actual connection errors, log but suppress repeated warnings for the same peer lastFailure, seen := d.failedPeerExchanges[peerID] if !seen || time.Since(lastFailure) > time.Minute { - d.logger.Debug("Failed to open peer exchange stream", + d.logger.Debug("Failed to open peer exchange stream with node", zap.String("peer_id", peerID.String()[:8]+"..."), + zap.String("reason", "peer does not support peer exchange protocol or connection failed"), zap.Error(err)) d.failedPeerExchanges[peerID] = time.Now() } diff --git a/pkg/environments/development/runner.go b/pkg/environments/development/runner.go index d4e665a..5d67a20 100644 --- a/pkg/environments/development/runner.go +++ b/pkg/environments/development/runner.go @@ -95,7 +95,7 @@ func (pm *ProcessManager) StartAll(ctx context.Context) error { // StopAll stops all running processes func (pm *ProcessManager) StopAll(ctx context.Context) error { - fmt.Fprintf(pm.logWriter, "\n🛑 Stopping development environment...\n") + fmt.Fprintf(pm.logWriter, "\n🛑 Stopping development environment...\n\n") topology := DefaultTopology() var services []string @@ -116,11 +116,22 @@ func (pm *ProcessManager) StopAll(ctx context.Context) error { } services = append(services, "olric", "anon") + fmt.Fprintf(pm.logWriter, "Stopping %d services...\n\n", len(services)) + + // Stop all processes sequentially (in dependency order) and wait for each + stoppedCount := 0 for _, svc := range services { - pm.stopProcess(svc) + if err := pm.stopProcess(svc); err != nil { + fmt.Fprintf(pm.logWriter, "⚠️ Error stopping %s: %v\n", svc, err) + } else { + stoppedCount++ + } + + // Show progress + fmt.Fprintf(pm.logWriter, " [%d/%d] stopped\n", stoppedCount, len(services)) } - fmt.Fprintf(pm.logWriter, "✓ All services stopped\n\n") + fmt.Fprintf(pm.logWriter, "\n✅ All %d services have been stopped\n\n", stoppedCount) return nil } diff --git a/pkg/gateway/push_notifications.go b/pkg/gateway/push_notifications.go new file mode 100644 index 0000000..dd7a5bb --- /dev/null +++ b/pkg/gateway/push_notifications.go @@ -0,0 +1,184 @@ +package gateway + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "go.uber.org/zap" +) + +// PushNotificationService handles sending push notifications via Expo +type PushNotificationService struct { + logger *zap.Logger + client *http.Client +} + +// ExpoTicket represents the response from Expo API +type ExpoTicket struct { + ID string `json:"id"` + Error string `json:"error,omitempty"` +} + +// ExpoPushMessage represents a message to send via Expo +type ExpoPushMessage struct { + To string `json:"to"` + Title string `json:"title"` + Body string `json:"body"` + Data map[string]interface{} `json:"data,omitempty"` + Sound string `json:"sound,omitempty"` + Badge int `json:"badge,omitempty"` + Priority string `json:"priority,omitempty"` + // iOS specific + MutableContent bool `json:"mutableContent,omitempty"` + IosIcon string `json:"iosIcon,omitempty"` + // Android specific + AndroidBigLargeIcon string `json:"androidBigLargeIcon,omitempty"` + ChannelID string `json:"channelId,omitempty"` +} + +// NewPushNotificationService creates a new push notification service +func NewPushNotificationService(logger *zap.Logger) *PushNotificationService { + return &PushNotificationService{ + logger: logger, + client: &http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +// SendNotification sends a push notification via Expo +func (pns *PushNotificationService) SendNotification( + ctx context.Context, + expoPushToken string, + title string, + body string, + data map[string]interface{}, + avatarURL string, +) error { + if expoPushToken == "" { + return fmt.Errorf("empty expo push token") + } + + message := ExpoPushMessage{ + To: expoPushToken, + Title: title, + Body: body, + Data: data, + Sound: "default", + Priority: "high", + // Enable mutable content for iOS to allow Notification Service Extension + MutableContent: true, + ChannelID: "messages", + AndroidBigLargeIcon: avatarURL, + } + + // For iOS, include avatar in data so Notification Service Extension can fetch it + if avatarURL != "" { + if message.Data == nil { + message.Data = make(map[string]interface{}) + } + message.Data["avatar_url"] = avatarURL + } + + return pns.sendExpoRequest(ctx, message) +} + +// SendBulkNotifications sends notifications to multiple users +func (pns *PushNotificationService) SendBulkNotifications( + ctx context.Context, + expoPushTokens []string, + title string, + body string, + data map[string]interface{}, + avatarURL string, +) []error { + errors := make([]error, 0) + + for _, token := range expoPushTokens { + if err := pns.SendNotification(ctx, token, title, body, data, avatarURL); err != nil { + errors = append(errors, fmt.Errorf("failed to send to token %s: %w", token, err)) + } + } + + return errors +} + +// sendExpoRequest sends a request to the Expo push notification API +func (pns *PushNotificationService) sendExpoRequest(ctx context.Context, message ExpoPushMessage) error { + const expoAPIURL = "https://exp.host/--/api/v2/push/send" + + body, err := json.Marshal(message) + if err != nil { + pns.logger.Error("failed to marshal push notification", + zap.Error(err), + zap.String("to", message.To)) + return fmt.Errorf("marshal error: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, expoAPIURL, bytes.NewBuffer(body)) + if err != nil { + pns.logger.Error("failed to create push notification request", + zap.Error(err), + zap.String("to", message.To)) + return fmt.Errorf("request creation error: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := pns.client.Do(req) + if err != nil { + pns.logger.Error("failed to send push notification", + zap.Error(err), + zap.String("to", message.To)) + return fmt.Errorf("send error: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + pns.logger.Error("failed to read push notification response", + zap.Error(err), + zap.String("to", message.To)) + return fmt.Errorf("response read error: %w", err) + } + + // Check for API errors + if resp.StatusCode != http.StatusOK { + pns.logger.Warn("push notification API error", + zap.Int("status_code", resp.StatusCode), + zap.String("response", string(respBody)), + zap.String("to", message.To)) + return fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(respBody)) + } + + // Parse response + var tickets []ExpoTicket + if err := json.Unmarshal(respBody, &tickets); err != nil { + pns.logger.Error("failed to parse push notification response", + zap.Error(err), + zap.String("response", string(respBody))) + return fmt.Errorf("parse error: %w", err) + } + + // Check for errors in tickets + for _, ticket := range tickets { + if ticket.Error != "" { + pns.logger.Warn("push notification error in ticket", + zap.String("error", ticket.Error), + zap.String("to", message.To)) + return fmt.Errorf("ticket error: %s", ticket.Error) + } + } + + pns.logger.Info("push notification sent successfully", + zap.String("to", message.To), + zap.String("title", message.Title)) + + return nil +} + diff --git a/pkg/node/node.go b/pkg/node/node.go index aa7edd8..e7abe0f 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -70,15 +70,26 @@ func NewNode(cfg *config.Config) (*Node, error) { func (n *Node) startRQLite(ctx context.Context) error { n.logger.Info("Starting RQLite database") + // Determine node identifier for log filename - use node ID for unique filenames + nodeID := n.config.Node.ID + if nodeID == "" { + // Fallback to type if ID is not set + nodeID = n.config.Node.Type + if nodeID == "" { + nodeID = "node" + } + } + // Create RQLite manager n.rqliteManager = database.NewRQLiteManager(&n.config.Database, &n.config.Discovery, n.config.Node.DataDir, n.logger.Logger) + n.rqliteManager.SetNodeType(nodeID) // Initialize cluster discovery service if LibP2P host is available if n.host != nil && n.discoveryManager != nil { - // Determine node type - nodeType := "node" + // Determine node type for cluster discovery (bootstrap or node) + discoveryNodeType := "node" if n.config.Node.Type == "bootstrap" { - nodeType = "bootstrap" + discoveryNodeType = "bootstrap" } // Create cluster discovery service @@ -87,7 +98,7 @@ func (n *Node) startRQLite(ctx context.Context) error { n.discoveryManager, n.rqliteManager, n.config.Node.ID, - nodeType, + discoveryNodeType, n.config.Discovery.RaftAdvAddress, n.config.Discovery.HttpAdvAddress, n.config.Node.DataDir, diff --git a/pkg/rqlite/cluster_discovery.go b/pkg/rqlite/cluster_discovery.go index 067e7b3..50204b1 100644 --- a/pkg/rqlite/cluster_discovery.go +++ b/pkg/rqlite/cluster_discovery.go @@ -633,11 +633,7 @@ func (c *ClusterDiscoveryService) WaitForDiscoverySettling(ctx context.Context) // TriggerSync manually triggers a cluster membership sync func (c *ClusterDiscoveryService) TriggerSync() { - // For bootstrap nodes, wait a bit for peer discovery to stabilize - if c.nodeType == "bootstrap" { - time.Sleep(5 * time.Second) - } - + // All nodes use the same discovery timing for consistency c.updateClusterMembership() } diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index 5a5439b..9cfa881 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -10,7 +10,6 @@ import ( "os" "os/exec" "path/filepath" - "strconv" "strings" "syscall" "time" @@ -26,6 +25,7 @@ type RQLiteManager struct { config *config.DatabaseConfig discoverConfig *config.DiscoveryConfig dataDir string + nodeType string // "bootstrap" or "node" logger *zap.Logger cmd *exec.Cmd connection *gorqlite.Connection @@ -81,6 +81,13 @@ func (r *RQLiteManager) SetDiscoveryService(service *ClusterDiscoveryService) { r.discoveryService = service } +// SetNodeType sets the node type for this RQLite manager ("bootstrap" or "node") +func (r *RQLiteManager) SetNodeType(nodeType string) { + if nodeType != "" { + r.nodeType = nodeType + } +} + // UpdateAdvertisedAddresses overrides the discovery advertised addresses when cluster discovery // infers a better host than what was provided via configuration (e.g. replacing localhost). func (r *RQLiteManager) UpdateAdvertisedAddresses(raftAddr, httpAddr string) { @@ -233,7 +240,7 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) "-raft-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLiteRaftPort), } - // Add join address if specified (for non-bootstrap or secondary bootstrap nodes) + // All nodes follow the same join logic - either join specified address or start as single-node cluster if r.config.RQLiteJoinAddress != "" { r.logger.Info("Joining RQLite cluster", zap.String("join_address", r.config.RQLiteJoinAddress)) @@ -259,28 +266,9 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) // Add retry parameters to handle slow cluster startup (e.g., during recovery) args = append(args, "-join", joinArg, "-join-attempts", "30", "-join-interval", "10s") } else { - r.logger.Info("No join address specified - starting as new cluster") - - // For bootstrap nodes, use bootstrap-expect if we know about other peers - if r.discoveryService != nil { - allPeers := r.discoveryService.GetAllPeers() - remotePeerCount := 0 - for _, peer := range allPeers { - if peer.NodeID != r.discoverConfig.RaftAdvAddress { - remotePeerCount++ - } - } - - // Use bootstrap-expect if we have discovered enough peers - // This tells RQLite to wait for the expected number of nodes before forming cluster - if remotePeerCount >= (r.config.MinClusterSize - 1) { - expectedPeers := r.config.MinClusterSize - args = append(args, "-bootstrap-expect", strconv.Itoa(expectedPeers)) - r.logger.Info("Using bootstrap-expect to wait for cluster formation", - zap.Int("expected_peers", expectedPeers), - zap.Int("remote_peers_discovered", remotePeerCount)) - } - } + r.logger.Info("No join address specified - starting as single-node cluster") + // When no join address is provided, rqlited will start as a single-node cluster + // This is expected for the first node in a fresh cluster } // Add data directory as positional argument @@ -295,14 +283,41 @@ func (r *RQLiteManager) launchProcess(ctx context.Context, rqliteDataDir string) // Start RQLite process (not bound to ctx for graceful Stop handling) r.cmd = exec.Command("rqlited", args...) - // Enable debug logging of RQLite process to help diagnose issues - r.cmd.Stdout = os.Stdout - r.cmd.Stderr = os.Stderr + // Setup log file for RQLite output + // Determine node type for log filename + nodeType := r.nodeType + if nodeType == "" { + nodeType = "node" + } + + // Create logs directory + logsDir := filepath.Join(filepath.Dir(r.dataDir), "logs") + if err := os.MkdirAll(logsDir, 0755); err != nil { + return fmt.Errorf("failed to create logs directory at %s: %w", logsDir, err) + } + + // Open log file for RQLite output + logPath := filepath.Join(logsDir, fmt.Sprintf("rqlite-%s.log", nodeType)) + logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("failed to open RQLite log file at %s: %w", logPath, err) + } + + r.logger.Info("RQLite logs will be written to file", + zap.String("path", logPath)) + + r.cmd.Stdout = logFile + r.cmd.Stderr = logFile if err := r.cmd.Start(); err != nil { + logFile.Close() return fmt.Errorf("failed to start RQLite: %w", err) } + // Close the log file handle after process starts (the subprocess maintains its own reference) + // This allows the file to be rotated or inspected while the process is running + logFile.Close() + return nil } @@ -337,19 +352,18 @@ func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error { // Check if error is "store is not open" (recovery scenario) if strings.Contains(err.Error(), "store is not open") { if attempt < maxConnectAttempts-1 { - // Only retry for joining nodes; bootstrap nodes should fail fast - if r.config.RQLiteJoinAddress != "" { - if attempt%3 == 0 { - r.logger.Debug("RQLite store not yet accessible for connection, retrying...", - zap.Int("attempt", attempt+1), zap.Error(err)) - } - time.Sleep(connectBackoff) - connectBackoff = time.Duration(float64(connectBackoff) * 1.5) - if connectBackoff > 5*time.Second { - connectBackoff = 5 * time.Second - } - continue + // Retry with exponential backoff for all nodes during recovery + // The store may not open immediately, especially during cluster recovery + if attempt%3 == 0 { + r.logger.Debug("RQLite store not yet accessible for connection, retrying...", + zap.Int("attempt", attempt+1), zap.Error(err)) } + time.Sleep(connectBackoff) + connectBackoff = time.Duration(float64(connectBackoff) * 1.5) + if connectBackoff > 5*time.Second { + connectBackoff = 5 * time.Second + } + continue } } @@ -376,130 +390,52 @@ func (r *RQLiteManager) waitForReadyAndConnect(ctx context.Context) error { return nil } -// establishLeadershipOrJoin establishes leadership (bootstrap) or waits for SQL availability (joining) +// establishLeadershipOrJoin handles post-startup cluster establishment +// All nodes follow the same pattern: wait for SQL availability +// For nodes without a join address, RQLite automatically forms a single-node cluster and becomes leader func (r *RQLiteManager) establishLeadershipOrJoin(ctx context.Context, rqliteDataDir string) error { if r.config.RQLiteJoinAddress == "" { - // Bootstrap node logic with data safety checks - r.logger.Info("Bootstrap node: checking if safe to lead") + // First node - no join address specified + // RQLite will automatically form a single-node cluster and become leader + r.logger.Info("Starting as first node in cluster") - // SAFETY: Check if we can safely become leader - canLead, err := r.canSafelyBecomeLeader() - if !canLead && err != nil { - r.logger.Warn("Not safe to become leader, attempting to join existing cluster", - zap.Error(err)) - - // Find node with highest log index and join it - if r.discoveryService != nil { - targetNode := r.discoveryService.GetNodeWithHighestLogIndex() - if targetNode != nil { - r.logger.Info("Joining node with higher data", - zap.String("target_node", targetNode.NodeID), - zap.String("raft_address", targetNode.RaftAddress), - zap.Uint64("their_index", targetNode.RaftLogIndex)) - return r.joinExistingCluster(ctx, targetNode.RaftAddress) - } - } - } - - // Safe to lead - attempt leadership - leadershipErr := r.waitForLeadership(ctx) - if leadershipErr == nil { - r.logger.Info("Bootstrap node successfully established leadership") - return nil - } - - r.logger.Warn("Initial leadership attempt failed, may need cluster recovery", - zap.Error(leadershipErr)) - - // Try recovery if we have peers.json from discovery - if r.discoveryService != nil { - peersPath := filepath.Join(rqliteDataDir, "raft", "peers.json") - if _, err := os.Stat(peersPath); err == nil { - r.logger.Info("Attempting cluster recovery using peers.json", - zap.String("peers_file", peersPath)) - - if recoveryErr := r.recoverCluster(ctx, peersPath); recoveryErr == nil { - r.logger.Info("Cluster recovery successful, retrying leadership") - leadershipErr = r.waitForLeadership(ctx) - if leadershipErr == nil { - r.logger.Info("Bootstrap node established leadership after recovery") - return nil - } - } else { - r.logger.Warn("Cluster recovery failed", zap.Error(recoveryErr)) - } - } - - // Check if we're stuck in configuration mismatch after recovery failed - if leadershipErr != nil && r.isStuckInConfigurationMismatch() { - r.logger.Warn("Detected persistent configuration mismatch, attempting automatic recovery") - - // Verify it's safe to clear state (peers have higher log indexes) - if r.isSafeToClearState(rqliteDataDir) { - r.logger.Info("Clearing stale Raft state to resolve configuration mismatch") - if err := r.clearRaftState(rqliteDataDir); err != nil { - r.logger.Error("Failed to clear Raft state", zap.Error(err)) - } else { - // Force write peers.json after clearing state - if r.discoveryService != nil { - r.logger.Info("Force writing peers.json after clearing state for configuration mismatch recovery") - if err := r.discoveryService.ForceWritePeersJSON(); err != nil { - r.logger.Error("Failed to force write peers.json", zap.Error(err)) - } - // Update peersPath after force write - peersPath = filepath.Join(rqliteDataDir, "raft", "peers.json") - } - // Restart RQLite with clean state - r.logger.Info("Raft state cleared, restarting RQLite for clean rejoin") - if recoveryErr := r.recoverCluster(ctx, peersPath); recoveryErr == nil { - // Retry leadership after state clear - leadershipErr = r.waitForLeadership(ctx) - if leadershipErr == nil { - r.logger.Info("Bootstrap node established leadership after state clear") - return nil - } - } - } - } else { - r.logger.Warn("Configuration mismatch detected but clearing state is unsafe", - zap.String("reason", "peers may not have more recent data"), - zap.String("action", "manual intervention may be required")) - } - } - } - - // Final fallback: SQL availability - r.logger.Warn("Leadership failed, trying SQL availability") + // Wait for SQL to be available (indicates RQLite cluster is ready) sqlCtx := ctx if _, hasDeadline := ctx.Deadline(); !hasDeadline { var cancel context.CancelFunc sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() } + if err := r.waitForSQLAvailable(sqlCtx); err != nil { if r.cmd != nil && r.cmd.Process != nil { _ = r.cmd.Process.Kill() } - return fmt.Errorf("RQLite SQL not available: %w", err) - } - return nil - } else { - // Joining node logic - r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") - sqlCtx := ctx - if _, hasDeadline := ctx.Deadline(); !hasDeadline { - var cancel context.CancelFunc - sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - } - if err := r.waitForSQLAvailable(sqlCtx); err != nil { - if r.cmd != nil && r.cmd.Process != nil { - _ = r.cmd.Process.Kill() - } - return fmt.Errorf("RQLite SQL not available: %w", err) + return fmt.Errorf("SQL not available for first node: %w", err) } + + r.logger.Info("First node established successfully") return nil } + + // Joining node - wait for SQL availability (indicates it joined the leader) + r.logger.Info("Waiting for RQLite SQL availability (joining cluster)") + sqlCtx := ctx + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + } + + if err := r.waitForSQLAvailable(sqlCtx); err != nil { + if r.cmd != nil && r.cmd.Process != nil { + _ = r.cmd.Process.Kill() + } + return fmt.Errorf("RQLite SQL not available: %w", err) + } + + r.logger.Info("Node successfully joined cluster") + return nil } // hasExistingState returns true if the rqlite data directory already contains files or subdirectories. @@ -526,16 +462,9 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error { url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort) client := &http.Client{Timeout: 2 * time.Second} - // Determine timeout based on whether this is a joining node - // Joining nodes in recovery may take longer to open the store - var maxAttempts int - if r.config.RQLiteJoinAddress != "" { - // Joining node: allow up to 180 seconds (3 minutes) for recovery - maxAttempts = 180 - } else { - // Bootstrap node: allow 30 seconds - maxAttempts = 30 - } + // All nodes may need time to open the store during recovery + // Use consistent timeout for cluster consistency + maxAttempts := 180 // 180 seconds (3 minutes) for all nodes for i := 0; i < maxAttempts; i++ { select { @@ -589,46 +518,6 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error { } // waitForLeadership waits for RQLite to establish leadership (for bootstrap nodes) -func (r *RQLiteManager) waitForLeadership(ctx context.Context) error { - r.logger.Info("Waiting for RQLite to establish leadership...") - - maxAttempts := 30 - attempt := 0 - backoffDelay := 500 * time.Millisecond - maxBackoff := 5 * time.Second - - for attempt < maxAttempts { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - // Try a simple query to check if leadership is established - if r.connection != nil { - _, err := r.connection.QueryOne("SELECT 1") - if err == nil { - r.logger.Info("RQLite leadership established") - return nil - } - // Log every 5th attempt or on first attempt to reduce noise - if attempt%5 == 0 || attempt == 0 { - r.logger.Debug("Waiting for leadership", zap.Int("attempt", attempt+1), zap.Error(err)) - } - } - - // Exponential backoff with jitter - time.Sleep(backoffDelay) - backoffDelay = time.Duration(float64(backoffDelay) * 1.5) - if backoffDelay > maxBackoff { - backoffDelay = maxBackoff - } - attempt++ - } - - return fmt.Errorf("RQLite failed to establish leadership within timeout") -} - // GetConnection returns the RQLite connection func (r *RQLiteManager) GetConnection() *gorqlite.Connection { return r.connection @@ -819,69 +708,6 @@ func (r *RQLiteManager) testJoinAddress(joinAddress string) error { return nil } -// canSafelyBecomeLeader checks if this node can safely become leader without causing data loss -func (r *RQLiteManager) canSafelyBecomeLeader() (bool, error) { - // Get our current Raft log index - ourLogIndex := r.getRaftLogIndex() - - // If no discovery service, assume it's safe (backward compatibility) - if r.discoveryService == nil { - r.logger.Debug("No discovery service, assuming safe to lead") - return true, nil - } - - // Query discovery service for other nodes - otherNodes := r.discoveryService.GetActivePeers() - - if len(otherNodes) == 0 { - // No other nodes - safe to bootstrap - r.logger.Debug("No other nodes discovered, safe to lead", - zap.Uint64("our_log_index", ourLogIndex)) - return true, nil - } - - // Check if any other node has higher log index - for _, peer := range otherNodes { - if peer.RaftLogIndex > ourLogIndex { - // Other node has more data - we should join them - return false, fmt.Errorf( - "node %s has higher log index (%d > %d), should join as follower", - peer.NodeID, peer.RaftLogIndex, ourLogIndex) - } - } - - // We have most recent data or equal - safe to lead - r.logger.Info("Safe to lead - we have most recent data", - zap.Uint64("our_log_index", ourLogIndex), - zap.Int("other_nodes_checked", len(otherNodes))) - return true, nil -} - -// joinExistingCluster attempts to join an existing cluster as a follower -func (r *RQLiteManager) joinExistingCluster(ctx context.Context, raftAddress string) error { - r.logger.Info("Attempting to join existing cluster", - zap.String("target_raft_address", raftAddress)) - - // Wait for the target to be reachable - if err := r.waitForJoinTarget(ctx, raftAddress, 2*time.Minute); err != nil { - return fmt.Errorf("join target not reachable: %w", err) - } - - // Wait for SQL availability (the target should have a leader) - sqlCtx := ctx - if _, hasDeadline := ctx.Deadline(); !hasDeadline { - var cancel context.CancelFunc - sqlCtx, cancel = context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - } - - if err := r.waitForSQLAvailable(sqlCtx); err != nil { - return fmt.Errorf("failed to join cluster - SQL not available: %w", err) - } - - r.logger.Info("Successfully joined existing cluster") - return nil -} // exponentialBackoff calculates exponential backoff duration with jitter func (r *RQLiteManager) exponentialBackoff(attempt int, baseDelay time.Duration, maxDelay time.Duration) time.Duration { @@ -1038,111 +864,6 @@ func (r *RQLiteManager) clearRaftState(rqliteDataDir string) error { return nil } -// isStuckInConfigurationMismatch checks if we're stuck due to configuration mismatch -// This detects both configuration mismatch AND split-brain scenarios -func (r *RQLiteManager) isStuckInConfigurationMismatch() bool { - // First check for split-brain (all followers, term 0, no peers) - if r.isInSplitBrainState() { - return true - } - - // Then check for traditional configuration mismatch - status, err := r.getRQLiteStatus() - if err != nil { - r.logger.Debug("Cannot check Raft status for configuration mismatch", zap.Error(err)) - return false // Can't determine, don't clear - } - - raftState := strings.ToLower(status.Store.Raft.State) - hasLeader := status.Store.Raft.LeaderAddr != "" - - // Stuck if: no leader AND state is not "leader" or "follower" - if !hasLeader && raftState != "leader" && raftState != "follower" { - if r.allPeersAreStuck() { - return true - } - } - - return false -} - -// allPeersAreStuck checks if all discovered peers also report no leader -// This helps confirm we're in a cluster-wide configuration mismatch, not just a local issue -func (r *RQLiteManager) allPeersAreStuck() bool { - if r.discoveryService == nil { - r.logger.Debug("No discovery service available to check peer status") - return false - } - - peers := r.discoveryService.GetActivePeers() - if len(peers) == 0 { - r.logger.Debug("No peers discovered, might be network issue") - return false // No peers discovered, might be network issue - } - - // Check if we can query peers and they all report no leader - stuckCount := 0 - reachableCount := 0 - for _, peer := range peers { - if r.peerHasLeader(peer.HTTPAddress) { - // Peer has a leader, so we're not in cluster-wide mismatch - return false - } - // Check if peer is at least reachable - if r.isPeerReachable(peer.HTTPAddress) { - reachableCount++ - stuckCount++ - } - } - - // If we have reachable peers and they're all stuck, we're likely in cluster-wide config mismatch - if reachableCount > 0 && stuckCount == reachableCount { - r.logger.Debug("All reachable peers are also stuck", - zap.Int("reachable_peers", reachableCount), - zap.Int("total_peers", len(peers))) - return true - } - - return false -} - -// peerHasLeader checks if a peer has a leader by querying its status endpoint -func (r *RQLiteManager) peerHasLeader(httpAddr string) bool { - url := fmt.Sprintf("http://%s/status", httpAddr) - client := &http.Client{Timeout: 3 * time.Second} - - resp, err := client.Get(url) - if err != nil { - return false // Can't reach peer - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return false - } - - var status RQLiteStatus - if err := json.NewDecoder(resp.Body).Decode(&status); err != nil { - return false - } - - // Peer has leader if leader address is set - return status.Store.Raft.LeaderAddr != "" -} - -// isPeerReachable checks if a peer is at least responding to HTTP requests -func (r *RQLiteManager) isPeerReachable(httpAddr string) bool { - url := fmt.Sprintf("http://%s/status", httpAddr) - client := &http.Client{Timeout: 3 * time.Second} - - resp, err := client.Get(url) - if err != nil { - return false - } - defer resp.Body.Close() - - return resp.StatusCode == http.StatusOK -} // isInSplitBrainState detects if we're in a split-brain scenario where all nodes // are followers with no peers (each node thinks it's alone) @@ -1215,6 +936,20 @@ func (r *RQLiteManager) isInSplitBrainState() bool { return false } +// isPeerReachable checks if a peer is at least responding to HTTP requests +func (r *RQLiteManager) isPeerReachable(httpAddr string) bool { + url := fmt.Sprintf("http://%s/status", httpAddr) + client := &http.Client{Timeout: 3 * time.Second} + + resp, err := client.Get(url) + if err != nil { + return false + } + defer resp.Body.Close() + + return resp.StatusCode == http.StatusOK +} + // getPeerRQLiteStatus queries a peer's status endpoint func (r *RQLiteManager) getPeerRQLiteStatus(httpAddr string) (*RQLiteStatus, error) { url := fmt.Sprintf("http://%s/status", httpAddr)