mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-12-13 02:48:49 +00:00
feat: enhance Raft log index retrieval and data directory management
- Improved the `getRaftLogIndex` method to accurately report the Raft log index by incorporating fallback logic to read persisted snapshot metadata when the RQLite status is unavailable or returns zero. - Added a new method `getPersistedRaftLogIndex` to read the highest Raft log index from snapshot metadata files, ensuring accurate reporting even before RQLite starts. - Centralized the data directory path resolution logic in `rqliteDataDirPath`, simplifying the codebase and enhancing maintainability.
This commit is contained in:
parent
5463df73d5
commit
239fb2084b
13
CHANGELOG.md
13
CHANGELOG.md
@ -13,6 +13,19 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
|
|||||||
### Deprecated
|
### Deprecated
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
## [0.64.1] - 2025-11-10
|
||||||
|
|
||||||
|
### Added
|
||||||
|
\n
|
||||||
|
### Changed
|
||||||
|
- Improved the accuracy of the Raft log index reporting by falling back to reading persisted snapshot metadata from disk if the running RQLite instance is not yet reachable or reports a zero index.
|
||||||
|
|
||||||
|
### Deprecated
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
\n
|
||||||
## [0.64.0] - 2025-11-10
|
## [0.64.0] - 2025-11-10
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|||||||
2
Makefile
2
Makefile
@ -19,7 +19,7 @@ test-e2e:
|
|||||||
|
|
||||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
|
||||||
|
|
||||||
VERSION := 0.64.0
|
VERSION := 0.64.1
|
||||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||||
|
|||||||
@ -5,26 +5,98 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// getRaftLogIndex returns the current Raft log index for this node
|
// getRaftLogIndex returns the current Raft log index for this node
|
||||||
|
// It first tries to get the index from the running RQLite instance via /status endpoint.
|
||||||
|
// If that fails or returns 0, it falls back to reading persisted snapshot metadata from disk.
|
||||||
|
// This ensures accurate log index reporting even before RQLite is fully started.
|
||||||
func (r *RQLiteManager) getRaftLogIndex() uint64 {
|
func (r *RQLiteManager) getRaftLogIndex() uint64 {
|
||||||
status, err := r.getRQLiteStatus()
|
status, err := r.getRQLiteStatus()
|
||||||
if err != nil {
|
if err == nil {
|
||||||
r.logger.Debug("Failed to get Raft log index", zap.Error(err))
|
// Return the highest index we have from runtime status
|
||||||
|
maxIndex := status.Store.Raft.LastLogIndex
|
||||||
|
if status.Store.Raft.AppliedIndex > maxIndex {
|
||||||
|
maxIndex = status.Store.Raft.AppliedIndex
|
||||||
|
}
|
||||||
|
if status.Store.Raft.CommitIndex > maxIndex {
|
||||||
|
maxIndex = status.Store.Raft.CommitIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// If runtime status reports a valid index, use it
|
||||||
|
if maxIndex > 0 {
|
||||||
|
return maxIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Runtime status returned 0, fall back to persisted snapshot metadata
|
||||||
|
// This handles the case where RQLite is running but hasn't applied any logs yet
|
||||||
|
if persisted := r.getPersistedRaftLogIndex(); persisted > 0 {
|
||||||
|
r.logger.Debug("Using persisted Raft log index because runtime status reported zero",
|
||||||
|
zap.Uint64("persisted_index", persisted))
|
||||||
|
return persisted
|
||||||
|
}
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the highest index we have
|
// RQLite status endpoint is not available (not started yet or unreachable)
|
||||||
maxIndex := status.Store.Raft.LastLogIndex
|
// Fall back to reading persisted snapshot metadata from disk
|
||||||
if status.Store.Raft.AppliedIndex > maxIndex {
|
persisted := r.getPersistedRaftLogIndex()
|
||||||
maxIndex = status.Store.Raft.AppliedIndex
|
if persisted > 0 {
|
||||||
|
r.logger.Debug("Using persisted Raft log index before RQLite is reachable",
|
||||||
|
zap.Uint64("persisted_index", persisted),
|
||||||
|
zap.Error(err))
|
||||||
|
return persisted
|
||||||
}
|
}
|
||||||
if status.Store.Raft.CommitIndex > maxIndex {
|
|
||||||
maxIndex = status.Store.Raft.CommitIndex
|
r.logger.Debug("Failed to get Raft log index", zap.Error(err))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPersistedRaftLogIndex reads the highest Raft log index from snapshot metadata files
|
||||||
|
// This allows us to report accurate log indexes even before RQLite is started
|
||||||
|
func (r *RQLiteManager) getPersistedRaftLogIndex() uint64 {
|
||||||
|
rqliteDataDir, err := r.rqliteDataDirPath()
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotsDir := filepath.Join(rqliteDataDir, "rsnapshots")
|
||||||
|
entries, err := os.ReadDir(snapshotsDir)
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
var maxIndex uint64
|
||||||
|
for _, entry := range entries {
|
||||||
|
// Only process directories (snapshot directories)
|
||||||
|
if !entry.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read meta.json from the snapshot directory
|
||||||
|
metaPath := filepath.Join(snapshotsDir, entry.Name(), "meta.json")
|
||||||
|
raw, err := os.ReadFile(metaPath)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the metadata JSON to extract the Index field
|
||||||
|
var meta struct {
|
||||||
|
Index uint64 `json:"Index"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track the highest index found
|
||||||
|
if meta.Index > maxIndex {
|
||||||
|
maxIndex = meta.Index
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return maxIndex
|
return maxIndex
|
||||||
|
|||||||
@ -123,8 +123,9 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepareDataDir expands and creates the RQLite data directory
|
// rqliteDataDirPath returns the resolved path to the RQLite data directory
|
||||||
func (r *RQLiteManager) prepareDataDir() (string, error) {
|
// This centralizes the path resolution logic used throughout the codebase
|
||||||
|
func (r *RQLiteManager) rqliteDataDirPath() (string, error) {
|
||||||
// Expand ~ in data directory path
|
// Expand ~ in data directory path
|
||||||
dataDir := os.ExpandEnv(r.dataDir)
|
dataDir := os.ExpandEnv(r.dataDir)
|
||||||
if strings.HasPrefix(dataDir, "~") {
|
if strings.HasPrefix(dataDir, "~") {
|
||||||
@ -135,8 +136,17 @@ func (r *RQLiteManager) prepareDataDir() (string, error) {
|
|||||||
dataDir = filepath.Join(home, dataDir[1:])
|
dataDir = filepath.Join(home, dataDir[1:])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return filepath.Join(dataDir, "rqlite"), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepareDataDir expands and creates the RQLite data directory
|
||||||
|
func (r *RQLiteManager) prepareDataDir() (string, error) {
|
||||||
|
rqliteDataDir, err := r.rqliteDataDirPath()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
// Create data directory
|
// Create data directory
|
||||||
rqliteDataDir := filepath.Join(dataDir, "rqlite")
|
|
||||||
if err := os.MkdirAll(rqliteDataDir, 0755); err != nil {
|
if err := os.MkdirAll(rqliteDataDir, 0755); err != nil {
|
||||||
return "", fmt.Errorf("failed to create RQLite data directory: %w", err)
|
return "", fmt.Errorf("failed to create RQLite data directory: %w", err)
|
||||||
}
|
}
|
||||||
@ -689,16 +699,11 @@ func (r *RQLiteManager) recoverCluster(peersJSONPath string) error {
|
|||||||
// Restart RQLite - it will automatically detect peers.json and perform recovery
|
// Restart RQLite - it will automatically detect peers.json and perform recovery
|
||||||
r.logger.Info("Restarting RQLite (will auto-recover using peers.json)")
|
r.logger.Info("Restarting RQLite (will auto-recover using peers.json)")
|
||||||
|
|
||||||
// Build the same args as original Start() - expand ~ in data directory
|
// Rebuild the launch arguments using the centralized path helper
|
||||||
dataDir := os.ExpandEnv(r.dataDir)
|
rqliteDataDir, err := r.rqliteDataDirPath()
|
||||||
if strings.HasPrefix(dataDir, "~") {
|
if err != nil {
|
||||||
home, err := os.UserHomeDir()
|
return fmt.Errorf("failed to resolve RQLite data directory: %w", err)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to determine home directory: %w", err)
|
|
||||||
}
|
|
||||||
dataDir = filepath.Join(home, dataDir[1:])
|
|
||||||
}
|
}
|
||||||
rqliteDataDir := filepath.Join(dataDir, "rqlite")
|
|
||||||
args := []string{
|
args := []string{
|
||||||
"-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort),
|
"-http-addr", fmt.Sprintf("0.0.0.0:%d", r.config.RQLitePort),
|
||||||
"-http-adv-addr", r.discoverConfig.HttpAdvAddress,
|
"-http-adv-addr", r.discoverConfig.HttpAdvAddress,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user