Merge pull request #51 from DeBrosOfficial/config-validation

Config validation
This commit is contained in:
anonpenguin 2025-10-24 11:14:22 +03:00 committed by GitHub
commit b0c8c8c5f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 2493 additions and 821 deletions

View File

@ -16,6 +16,26 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
### Fixed ### Fixed
## [0.51.5] - 2025-10-24
### Added
- Added validation for yaml files
- Added authenticaiton command on cli
### Changed
- Updated readme
- Where we read .yaml files from and where data is saved to ~/.debros
### Deprecated
### Removed
### Fixed
- Regular nodes rqlite not starting
## [0.51.2] - 2025-09-26 ## [0.51.2] - 2025-09-26
### Added ### Added

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.51.2-beta VERSION := 0.51.5-beta
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
@ -46,35 +46,35 @@ clean:
# Run bootstrap node (auto-selects identity and data dir) # Run bootstrap node (auto-selects identity and data dir)
run-node: run-node:
@echo "Starting bootstrap node with config..." @echo "Starting bootstrap node..."
go run ./cmd/node --config configs/bootstrap.yaml @echo "Config: ~/.debros/bootstrap.yaml"
@echo "Generate it with: network-cli config init --type bootstrap"
go run ./cmd/node --config node.yaml
# Run second node (regular) - requires join address of bootstrap node # Run second node (regular) - requires join address of bootstrap node
# Usage: make run-node2 JOINADDR=/ip4/127.0.0.1/tcp/5001 HTTP=5002 RAFT=7002 P2P=4002 # Usage: make run-node2 JOINADDR=/ip4/127.0.0.1/tcp/5001 HTTP=5002 RAFT=7002 P2P=4002
run-node2: run-node2:
@echo "Starting regular node2 with config..." @echo "Starting regular node (node.yaml)..."
go run ./cmd/node --config configs/node.yaml @echo "Config: ~/.debros/node.yaml"
@echo "Generate it with: network-cli config init --type node --join localhost:5001 --bootstrap-peers '<peer_multiaddr>'"
go run ./cmd/node --config node2.yaml
# Run third node (regular) - requires join address of bootstrap node # Run third node (regular) - requires join address of bootstrap node
# Usage: make run-node3 JOINADDR=/ip4/127.0.0.1/tcp/5001 HTTP=5003 RAFT=7003 P2P=4003 # Usage: make run-node3 JOINADDR=/ip4/127.0.0.1/tcp/5001 HTTP=5003 RAFT=7003 P2P=4003
run-node3: run-node3:
@echo "Starting regular node3 with config..." @echo "Starting regular node (node2.yaml)..."
go run ./cmd/node --config configs/node3.yaml @echo "Config: ~/.debros/node2.yaml"
@echo "Generate it with: network-cli config init --type node --name node2.yaml --join localhost:5001 --bootstrap-peers '<peer_multiaddr>'"
go run ./cmd/node --config node3.yaml
# Run gateway HTTP server # Run gateway HTTP server
# Usage examples: # Usage examples:
# make run-gateway # uses defaults (:8080, namespace=default) # make run-gateway # uses ~/.debros/gateway.yaml
# GATEWAY_ADDR=":8081" make run-gateway # override listen addr via env # Config generated with: network-cli config init --type gateway
# GATEWAY_NAMESPACE=myapp make run-gateway # set namespace
# GATEWAY_BOOTSTRAP_PEERS="/ip4/127.0.0.1/tcp/4001/p2p/<ID>" make run-gateway
# GATEWAY_REQUIRE_AUTH=1 GATEWAY_API_KEYS="key1:ns1,key2:ns2" make run-gateway
run-gateway: run-gateway:
@echo "Starting gateway HTTP server..." @echo "Starting gateway HTTP server..."
GATEWAY_ADDR=$(or $(ADDR),$(GATEWAY_ADDR)) \ @echo "Note: Config must be in ~/.debros/gateway.yaml"
GATEWAY_NAMESPACE=$(or $(NAMESPACE),$(GATEWAY_NAMESPACE)) \ @echo "Generate it with: network-cli config init --type gateway"
GATEWAY_BOOTSTRAP_PEERS=$(GATEWAY_BOOTSTRAP_PEERS) \
GATEWAY_REQUIRE_AUTH=$(GATEWAY_REQUIRE_AUTH) \
GATEWAY_API_KEYS=$(GATEWAY_API_KEYS) \
go run ./cmd/gateway go run ./cmd/gateway
# Run basic usage example # Run basic usage example
@ -155,15 +155,28 @@ dev-setup: deps
# Start development cluster (requires multiple terminals) # Start development cluster (requires multiple terminals)
dev-cluster: dev-cluster:
@echo "To start a development cluster, run these commands in separate terminals:" @echo "To start a development cluster with 3 nodes:"
@echo "1. make run-node # Start bootstrap node (uses configs/bootstrap.yaml)" @echo ""
@echo "2. make run-node2 # Start second node (uses configs/node.yaml)" @echo "1. Generate config files in ~/.debros:"
@echo "3. make run-node3 # Start third node (uses configs/node.yaml)" @echo " make build"
@echo "4. make run-example # Test basic functionality" @echo " ./bin/network-cli config init --type bootstrap"
@echo "5. make cli-health # Check network health" @echo " ./bin/network-cli config init --type node --name node.yaml --bootstrap-peers '<bootstrap_peer_multiaddr>'"
@echo "6. make cli-peers # List peers" @echo " ./bin/network-cli config init --type node --name node2.yaml --bootstrap-peers '<bootstrap_peer_multiaddr>'"
@echo "7. make cli-storage-test # Test storage" @echo ""
@echo "8. make cli-pubsub-test # Test messaging" @echo "2. Run in separate terminals:"
@echo " Terminal 1: make run-node # Start bootstrap node (bootstrap.yaml)"
@echo " Terminal 2: make run-node2 # Start node 1 (node.yaml)"
@echo " Terminal 3: make run-node3 # Start node 2 (node2.yaml)"
@echo " Terminal 4: make run-gateway # Start gateway"
@echo ""
@echo "3. Or run custom node with any config file:"
@echo " go run ./cmd/node --config custom-node.yaml"
@echo ""
@echo "4. Test:"
@echo " make cli-health # Check network health"
@echo " make cli-peers # List peers"
@echo " make cli-storage-test # Test storage"
@echo " make cli-pubsub-test # Test messaging"
# Full development workflow # Full development workflow
dev: clean build test dev: clean build test
@ -175,22 +188,43 @@ help:
@echo " build - Build all executables" @echo " build - Build all executables"
@echo " clean - Clean build artifacts" @echo " clean - Clean build artifacts"
@echo " test - Run tests" @echo " test - Run tests"
@echo ""
@echo "Configuration (NEW):"
@echo " First, generate config files in ~/.debros with:"
@echo " make build # Build CLI first"
@echo " ./bin/network-cli config init --type bootstrap # Generate bootstrap config"
@echo " ./bin/network-cli config init --type node --bootstrap-peers '<peer_multiaddr>'"
@echo " ./bin/network-cli config init --type gateway"
@echo ""
@echo "Network Targets (requires config files in ~/.debros):"
@echo " run-node - Start bootstrap node" @echo " run-node - Start bootstrap node"
@echo " run-node2 - Start second node (requires JOINADDR, optional HTTP/RAFT/P2P)" @echo " run-node2 - Start second node"
@echo " run-node3 - Start third node (requires JOINADDR, optional HTTP/RAFT/P2P)" @echo " run-node3 - Start third node"
@echo " run-gateway - Start HTTP gateway (flags via env: GATEWAY_ADDR, GATEWAY_NAMESPACE, GATEWAY_BOOTSTRAP_PEERS, GATEWAY_REQUIRE_AUTH, GATEWAY_API_KEYS)" @echo " run-gateway - Start HTTP gateway"
@echo " run-example - Run usage example" @echo " run-example - Run usage example"
@echo ""
@echo "Running Multiple Nodes:"
@echo " Nodes use --config flag to select which YAML file in ~/.debros to load:"
@echo " go run ./cmd/node --config bootstrap.yaml"
@echo " go run ./cmd/node --config node.yaml"
@echo " go run ./cmd/node --config node2.yaml"
@echo " Generate configs with: ./bin/network-cli config init --name <filename.yaml>"
@echo ""
@echo "CLI Commands:"
@echo " run-cli - Run network CLI help" @echo " run-cli - Run network CLI help"
@echo " show-bootstrap - Show example bootstrap usage with flags"
@echo " cli-health - Check network health" @echo " cli-health - Check network health"
@echo " cli-peers - List network peers" @echo " cli-peers - List network peers"
@echo " cli-status - Get network status" @echo " cli-status - Get network status"
@echo " cli-storage-test - Test storage operations" @echo " cli-storage-test - Test storage operations"
@echo " cli-pubsub-test - Test pub/sub operations" @echo " cli-pubsub-test - Test pub/sub operations"
@echo ""
@echo "Development:"
@echo " test-multinode - Full multi-node test with 1 bootstrap + 2 nodes" @echo " test-multinode - Full multi-node test with 1 bootstrap + 2 nodes"
@echo " test-peer-discovery - Test peer discovery (requires running nodes)" @echo " test-peer-discovery - Test peer discovery (requires running nodes)"
@echo " test-replication - Test data replication (requires running nodes)" @echo " test-replication - Test data replication (requires running nodes)"
@echo " test-consensus - Test database consensus (requires running nodes)" @echo " test-consensus - Test database consensus (requires running nodes)"
@echo ""
@echo "Maintenance:"
@echo " deps - Download dependencies" @echo " deps - Download dependencies"
@echo " tidy - Tidy dependencies" @echo " tidy - Tidy dependencies"
@echo " fmt - Format code" @echo " fmt - Format code"

1069
README.md

File diff suppressed because it is too large Load Diff

View File

@ -7,12 +7,14 @@ import (
"log" "log"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/DeBrosOfficial/network/pkg/auth" "github.com/DeBrosOfficial/network/pkg/auth"
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/config"
"github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
) )
@ -76,6 +78,10 @@ func main() {
handleConnect(args[0]) handleConnect(args[0])
case "peer-id": case "peer-id":
handlePeerID() handlePeerID()
case "auth":
handleAuth(args)
case "config":
handleConfig(args)
case "help", "--help", "-h": case "help", "--help", "-h":
showHelp() showHelp()
@ -289,6 +295,145 @@ func handlePubSub(args []string) {
} }
} }
func handleAuth(args []string) {
if len(args) == 0 {
showAuthHelp()
return
}
subcommand := args[0]
switch subcommand {
case "login":
handleAuthLogin()
case "logout":
handleAuthLogout()
case "whoami":
handleAuthWhoami()
case "status":
handleAuthStatus()
default:
fmt.Fprintf(os.Stderr, "Unknown auth command: %s\n", subcommand)
showAuthHelp()
os.Exit(1)
}
}
func handleAuthLogin() {
gatewayURL := auth.GetDefaultGatewayURL()
fmt.Printf("🔐 Authenticating with gateway at: %s\n", gatewayURL)
// Use the wallet authentication flow
creds, err := auth.PerformWalletAuthentication(gatewayURL)
if err != nil {
fmt.Fprintf(os.Stderr, "❌ Authentication failed: %v\n", err)
os.Exit(1)
}
// Save credentials to file
if err := auth.SaveCredentialsForDefaultGateway(creds); err != nil {
fmt.Fprintf(os.Stderr, "❌ Failed to save credentials: %v\n", err)
os.Exit(1)
}
credsPath, _ := auth.GetCredentialsPath()
fmt.Printf("✅ Authentication successful!\n")
fmt.Printf("📁 Credentials saved to: %s\n", credsPath)
fmt.Printf("🎯 Wallet: %s\n", creds.Wallet)
fmt.Printf("🏢 Namespace: %s\n", creds.Namespace)
}
func handleAuthLogout() {
if err := auth.ClearAllCredentials(); err != nil {
fmt.Fprintf(os.Stderr, "❌ Failed to clear credentials: %v\n", err)
os.Exit(1)
}
fmt.Println("✅ Logged out successfully - all credentials have been cleared")
}
func handleAuthWhoami() {
store, err := auth.LoadCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "❌ Failed to load credentials: %v\n", err)
os.Exit(1)
}
gatewayURL := auth.GetDefaultGatewayURL()
creds, exists := store.GetCredentialsForGateway(gatewayURL)
if !exists || !creds.IsValid() {
fmt.Println("❌ Not authenticated - run 'network-cli auth login' to authenticate")
os.Exit(1)
}
fmt.Println("✅ Authenticated")
fmt.Printf(" Wallet: %s\n", creds.Wallet)
fmt.Printf(" Namespace: %s\n", creds.Namespace)
fmt.Printf(" Issued At: %s\n", creds.IssuedAt.Format("2006-01-02 15:04:05"))
if !creds.ExpiresAt.IsZero() {
fmt.Printf(" Expires At: %s\n", creds.ExpiresAt.Format("2006-01-02 15:04:05"))
}
if !creds.LastUsedAt.IsZero() {
fmt.Printf(" Last Used: %s\n", creds.LastUsedAt.Format("2006-01-02 15:04:05"))
}
if creds.Plan != "" {
fmt.Printf(" Plan: %s\n", creds.Plan)
}
}
func handleAuthStatus() {
store, err := auth.LoadCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "❌ Failed to load credentials: %v\n", err)
os.Exit(1)
}
gatewayURL := auth.GetDefaultGatewayURL()
creds, exists := store.GetCredentialsForGateway(gatewayURL)
fmt.Println("🔐 Authentication Status")
fmt.Printf(" Gateway URL: %s\n", gatewayURL)
if !exists || creds == nil {
fmt.Println(" Status: ❌ Not authenticated")
return
}
if !creds.IsValid() {
fmt.Println(" Status: ⚠️ Credentials expired")
if !creds.ExpiresAt.IsZero() {
fmt.Printf(" Expired At: %s\n", creds.ExpiresAt.Format("2006-01-02 15:04:05"))
}
return
}
fmt.Println(" Status: ✅ Authenticated")
fmt.Printf(" Wallet: %s\n", creds.Wallet)
fmt.Printf(" Namespace: %s\n", creds.Namespace)
if !creds.ExpiresAt.IsZero() {
fmt.Printf(" Expires: %s\n", creds.ExpiresAt.Format("2006-01-02 15:04:05"))
}
if !creds.LastUsedAt.IsZero() {
fmt.Printf(" Last Used: %s\n", creds.LastUsedAt.Format("2006-01-02 15:04:05"))
}
}
func showAuthHelp() {
fmt.Printf("🔐 Authentication Commands\n\n")
fmt.Printf("Usage: network-cli auth <subcommand>\n\n")
fmt.Printf("Subcommands:\n")
fmt.Printf(" login - Authenticate with wallet\n")
fmt.Printf(" logout - Clear stored credentials\n")
fmt.Printf(" whoami - Show current authentication status\n")
fmt.Printf(" status - Show detailed authentication info\n\n")
fmt.Printf("Examples:\n")
fmt.Printf(" network-cli auth login\n")
fmt.Printf(" network-cli auth whoami\n")
fmt.Printf(" network-cli auth status\n")
fmt.Printf(" network-cli auth logout\n\n")
fmt.Printf("Environment Variables:\n")
fmt.Printf(" DEBROS_GATEWAY_URL - Gateway URL (default: http://localhost:6001)\n")
}
func ensureAuthenticated() *auth.Credentials { func ensureAuthenticated() *auth.Credentials {
gatewayURL := auth.GetDefaultGatewayURL() gatewayURL := auth.GetDefaultGatewayURL()
@ -435,11 +580,370 @@ func isPrintableText(s string) bool {
return len(s) > 0 && float64(printableCount)/float64(len(s)) > 0.8 return len(s) > 0 && float64(printableCount)/float64(len(s)) > 0.8
} }
func handleConfig(args []string) {
if len(args) == 0 {
showConfigHelp()
return
}
subcommand := args[0]
subargs := args[1:]
switch subcommand {
case "init":
handleConfigInit(subargs)
case "validate":
handleConfigValidate(subargs)
case "help":
showConfigHelp()
default:
fmt.Fprintf(os.Stderr, "Unknown config subcommand: %s\n", subcommand)
showConfigHelp()
os.Exit(1)
}
}
func showConfigHelp() {
fmt.Printf("Config Management Commands\n\n")
fmt.Printf("Usage: network-cli config <subcommand> [options]\n\n")
fmt.Printf("Subcommands:\n")
fmt.Printf(" init - Generate configuration files in ~/.debros\n")
fmt.Printf(" validate --name <file> - Validate a config file\n\n")
fmt.Printf("Init Options:\n")
fmt.Printf(" --type <type> - Config type: node, bootstrap, gateway (default: node)\n")
fmt.Printf(" --name <file> - Output filename (default: node.yaml)\n")
fmt.Printf(" --id <id> - Node ID for bootstrap peers\n")
fmt.Printf(" --listen-port <port> - LibP2P listen port (default: 4001)\n")
fmt.Printf(" --rqlite-http-port <port> - RQLite HTTP port (default: 5001)\n")
fmt.Printf(" --rqlite-raft-port <port> - RQLite Raft port (default: 7001)\n")
fmt.Printf(" --join <host:port> - RQLite address to join (required for non-bootstrap)\n")
fmt.Printf(" --bootstrap-peers <peers> - Comma-separated bootstrap peer multiaddrs\n")
fmt.Printf(" --force - Overwrite existing config\n\n")
fmt.Printf("Examples:\n")
fmt.Printf(" network-cli config init\n")
fmt.Printf(" network-cli config init --type node --bootstrap-peers /ip4/127.0.0.1/tcp/4001/p2p/QmXxx,/ip4/127.0.0.1/tcp/4002/p2p/QmYyy\n")
fmt.Printf(" network-cli config init --type bootstrap\n")
fmt.Printf(" network-cli config init --type gateway\n")
fmt.Printf(" network-cli config validate --name node.yaml\n")
}
func handleConfigInit(args []string) {
// Parse flags
var (
cfgType = "node"
name = "" // Will be set based on type if not provided
id string
listenPort = 4001
rqliteHTTPPort = 5001
rqliteRaftPort = 7001
joinAddr string
bootstrapPeers string
force bool
)
for i := 0; i < len(args); i++ {
switch args[i] {
case "--type":
if i+1 < len(args) {
cfgType = args[i+1]
i++
}
case "--name":
if i+1 < len(args) {
name = args[i+1]
i++
}
case "--id":
if i+1 < len(args) {
id = args[i+1]
i++
}
case "--listen-port":
if i+1 < len(args) {
if p, err := strconv.Atoi(args[i+1]); err == nil {
listenPort = p
}
i++
}
case "--rqlite-http-port":
if i+1 < len(args) {
if p, err := strconv.Atoi(args[i+1]); err == nil {
rqliteHTTPPort = p
}
i++
}
case "--rqlite-raft-port":
if i+1 < len(args) {
if p, err := strconv.Atoi(args[i+1]); err == nil {
rqliteRaftPort = p
}
i++
}
case "--join":
if i+1 < len(args) {
joinAddr = args[i+1]
i++
}
case "--bootstrap-peers":
if i+1 < len(args) {
bootstrapPeers = args[i+1]
i++
}
case "--force":
force = true
}
}
// Validate type
if cfgType != "node" && cfgType != "bootstrap" && cfgType != "gateway" {
fmt.Fprintf(os.Stderr, "Invalid --type: %s (expected: node, bootstrap, or gateway)\n", cfgType)
os.Exit(1)
}
// Set default name based on type if not provided
if name == "" {
switch cfgType {
case "bootstrap":
name = "bootstrap.yaml"
case "gateway":
name = "gateway.yaml"
default:
name = "node.yaml"
}
}
// Ensure config directory exists
configDir, err := config.EnsureConfigDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to ensure config directory: %v\n", err)
os.Exit(1)
}
configPath := filepath.Join(configDir, name)
// Check if file exists
if !force {
if _, err := os.Stat(configPath); err == nil {
fmt.Fprintf(os.Stderr, "Config file already exists at %s (use --force to overwrite)\n", configPath)
os.Exit(1)
}
}
// Generate config based on type
var configContent string
switch cfgType {
case "node":
configContent = generateNodeConfig(name, id, listenPort, rqliteHTTPPort, rqliteRaftPort, joinAddr, bootstrapPeers)
case "bootstrap":
configContent = generateBootstrapConfig(name, id, listenPort, rqliteHTTPPort, rqliteRaftPort)
case "gateway":
configContent = generateGatewayConfig(bootstrapPeers)
}
// Write config file
if err := os.WriteFile(configPath, []byte(configContent), 0644); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write config file: %v\n", err)
os.Exit(1)
}
fmt.Printf("✅ Configuration file created: %s\n", configPath)
fmt.Printf(" Type: %s\n", cfgType)
fmt.Printf("\nYou can now start the %s using the generated config.\n", cfgType)
}
func handleConfigValidate(args []string) {
var name string
for i := 0; i < len(args); i++ {
if args[i] == "--name" && i+1 < len(args) {
name = args[i+1]
i++
}
}
if name == "" {
fmt.Fprintf(os.Stderr, "Missing --name flag\n")
showConfigHelp()
os.Exit(1)
}
configDir, err := config.ConfigDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to get config directory: %v\n", err)
os.Exit(1)
}
configPath := filepath.Join(configDir, name)
file, err := os.Open(configPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to open config file: %v\n", err)
os.Exit(1)
}
defer file.Close()
var cfg config.Config
if err := config.DecodeStrict(file, &cfg); err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse config: %v\n", err)
os.Exit(1)
}
// Run validation
errs := cfg.Validate()
if len(errs) > 0 {
fmt.Fprintf(os.Stderr, "\n❌ Configuration errors (%d):\n", len(errs))
for _, err := range errs {
fmt.Fprintf(os.Stderr, " - %s\n", err)
}
os.Exit(1)
}
fmt.Printf("✅ Config is valid: %s\n", configPath)
}
func generateNodeConfig(name, id string, listenPort, rqliteHTTPPort, rqliteRaftPort int, joinAddr, bootstrapPeers string) string {
nodeID := id
if nodeID == "" {
nodeID = fmt.Sprintf("node-%d", time.Now().Unix())
}
// Parse bootstrap peers
var peers []string
if bootstrapPeers != "" {
for _, p := range strings.Split(bootstrapPeers, ",") {
if p = strings.TrimSpace(p); p != "" {
peers = append(peers, p)
}
}
}
// Construct data_dir from name stem (remove .yaml)
dataDir := strings.TrimSuffix(name, ".yaml")
dataDir = filepath.Join(os.ExpandEnv("~"), ".debros", dataDir)
var peersYAML strings.Builder
if len(peers) == 0 {
peersYAML.WriteString(" bootstrap_peers: []")
} else {
peersYAML.WriteString(" bootstrap_peers:\n")
for _, p := range peers {
fmt.Fprintf(&peersYAML, " - \"%s\"\n", p)
}
}
if joinAddr == "" {
joinAddr = "localhost:5001"
}
return fmt.Sprintf(`node:
id: "%s"
type: "node"
listen_addresses:
- "/ip4/0.0.0.0/tcp/%d"
data_dir: "%s"
max_connections: 50
database:
data_dir: "%s/rqlite"
replication_factor: 3
shard_count: 16
max_database_size: 1073741824
backup_interval: "24h"
rqlite_port: %d
rqlite_raft_port: %d
rqlite_join_address: "%s"
discovery:
%s
discovery_interval: "15s"
bootstrap_port: %d
http_adv_address: "127.0.0.1:%d"
raft_adv_address: "127.0.0.1:%d"
node_namespace: "default"
security:
enable_tls: false
logging:
level: "info"
format: "console"
`, nodeID, listenPort, dataDir, dataDir, rqliteHTTPPort, rqliteRaftPort, joinAddr, peersYAML.String(), 4001, rqliteHTTPPort, rqliteRaftPort)
}
func generateBootstrapConfig(name, id string, listenPort, rqliteHTTPPort, rqliteRaftPort int) string {
nodeID := id
if nodeID == "" {
nodeID = "bootstrap"
}
dataDir := filepath.Join(os.ExpandEnv("~"), ".debros", "bootstrap")
return fmt.Sprintf(`node:
id: "%s"
type: "bootstrap"
listen_addresses:
- "/ip4/0.0.0.0/tcp/%d"
data_dir: "%s"
max_connections: 50
database:
data_dir: "%s/rqlite"
replication_factor: 3
shard_count: 16
max_database_size: 1073741824
backup_interval: "24h"
rqlite_port: %d
rqlite_raft_port: %d
rqlite_join_address: ""
discovery:
bootstrap_peers: []
discovery_interval: "15s"
bootstrap_port: %d
http_adv_address: "127.0.0.1:%d"
raft_adv_address: "127.0.0.1:%d"
node_namespace: "default"
security:
enable_tls: false
logging:
level: "info"
format: "console"
`, nodeID, listenPort, dataDir, dataDir, rqliteHTTPPort, rqliteRaftPort, 4001, rqliteHTTPPort, rqliteRaftPort)
}
func generateGatewayConfig(bootstrapPeers string) string {
var peers []string
if bootstrapPeers != "" {
for _, p := range strings.Split(bootstrapPeers, ",") {
if p = strings.TrimSpace(p); p != "" {
peers = append(peers, p)
}
}
}
var peersYAML strings.Builder
if len(peers) == 0 {
peersYAML.WriteString(" bootstrap_peers: []")
} else {
peersYAML.WriteString(" bootstrap_peers:\n")
for _, p := range peers {
fmt.Fprintf(&peersYAML, " - \"%s\"\n", p)
}
}
return fmt.Sprintf(`listen_addr: ":6001"
client_namespace: "default"
rqlite_dsn: ""
%s
`, peersYAML.String())
}
func showHelp() { func showHelp() {
fmt.Printf("Network CLI - Distributed P2P Network Management Tool\n\n") fmt.Printf("Network CLI - Distributed P2P Network Management Tool\n\n")
fmt.Printf("Usage: network-cli <command> [args...]\n\n") fmt.Printf("Usage: network-cli <command> [args...]\n\n")
fmt.Printf("🔐 Authentication: Commands requiring authentication will automatically prompt for wallet connection.\n\n") fmt.Printf("🔐 Authentication: Commands requiring authentication will automatically prompt for wallet connection.\n\n")
fmt.Printf("Commands:\n") fmt.Printf("Commands:\n")
fmt.Printf(" auth <subcommand> 🔐 Authentication management (login, logout, whoami, status)\n")
fmt.Printf(" health - Check network health\n") fmt.Printf(" health - Check network health\n")
fmt.Printf(" peers - List connected peers\n") fmt.Printf(" peers - List connected peers\n")
fmt.Printf(" status - Show network status\n") fmt.Printf(" status - Show network status\n")
@ -449,6 +953,7 @@ func showHelp() {
fmt.Printf(" pubsub subscribe <topic> [duration] 🔐 Subscribe to topic\n") fmt.Printf(" pubsub subscribe <topic> [duration] 🔐 Subscribe to topic\n")
fmt.Printf(" pubsub topics 🔐 List topics\n") fmt.Printf(" pubsub topics 🔐 List topics\n")
fmt.Printf(" connect <peer_address> - Connect to peer\n") fmt.Printf(" connect <peer_address> - Connect to peer\n")
fmt.Printf(" config - Show current configuration\n")
fmt.Printf(" help - Show this help\n\n") fmt.Printf(" help - Show this help\n\n")
fmt.Printf("Global Flags:\n") fmt.Printf("Global Flags:\n")
@ -457,10 +962,13 @@ func showHelp() {
fmt.Printf(" -t, --timeout <duration> - Operation timeout (default: 30s)\n") fmt.Printf(" -t, --timeout <duration> - Operation timeout (default: 30s)\n")
fmt.Printf(" --production - Connect to production bootstrap peers\n\n") fmt.Printf(" --production - Connect to production bootstrap peers\n\n")
fmt.Printf("Authentication:\n") fmt.Printf("Authentication:\n")
fmt.Printf(" Use 'network-cli auth login' to authenticate with your wallet\n")
fmt.Printf(" Commands marked with 🔐 will automatically prompt for wallet authentication\n") fmt.Printf(" Commands marked with 🔐 will automatically prompt for wallet authentication\n")
fmt.Printf(" if no valid credentials are found. You can manage multiple wallets and\n") fmt.Printf(" if no valid credentials are found. You can manage multiple wallets and\n")
fmt.Printf(" choose between them during the authentication flow.\n\n") fmt.Printf(" choose between them during the authentication flow.\n\n")
fmt.Printf("Examples:\n") fmt.Printf("Examples:\n")
fmt.Printf(" network-cli auth login\n")
fmt.Printf(" network-cli auth whoami\n")
fmt.Printf(" network-cli health\n") fmt.Printf(" network-cli health\n")
fmt.Printf(" network-cli peer-id\n") fmt.Printf(" network-cli peer-id\n")
fmt.Printf(" network-cli peer-id --format json\n") fmt.Printf(" network-cli peer-id --format json\n")

View File

@ -1,14 +1,14 @@
package main package main
import ( import (
"flag" "fmt"
"os" "os"
"strings" "strings"
"github.com/DeBrosOfficial/network/pkg/config"
"github.com/DeBrosOfficial/network/pkg/gateway" "github.com/DeBrosOfficial/network/pkg/gateway"
"github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/logging"
"go.uber.org/zap" "go.uber.org/zap"
"gopkg.in/yaml.v3"
) )
// For transition, alias main.GatewayConfig to pkg/gateway.Config // For transition, alias main.GatewayConfig to pkg/gateway.Config
@ -37,10 +37,43 @@ func getEnvBoolDefault(key string, def bool) bool {
} }
} }
// parseGatewayConfig loads optional configs/gateway.yaml then applies env and flags. // parseGatewayConfig loads gateway.yaml from ~/.debros exclusively.
// Priority: flags > env > yaml > defaults.
func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
// Base defaults // Determine config path
configPath, err := config.DefaultPath("gateway.yaml")
if err != nil {
logger.ComponentError(logging.ComponentGeneral, "Failed to determine config path", zap.Error(err))
fmt.Fprintf(os.Stderr, "Configuration error: %v\n", err)
os.Exit(1)
}
// Load YAML
type yamlCfg struct {
ListenAddr string `yaml:"listen_addr"`
ClientNamespace string `yaml:"client_namespace"`
RQLiteDSN string `yaml:"rqlite_dsn"`
BootstrapPeers []string `yaml:"bootstrap_peers"`
}
data, err := os.ReadFile(configPath)
if err != nil {
logger.ComponentError(logging.ComponentGeneral, "Config file not found",
zap.String("path", configPath),
zap.Error(err))
fmt.Fprintf(os.Stderr, "\nConfig file not found at %s\n", configPath)
fmt.Fprintf(os.Stderr, "Generate it using: network-cli config init --type gateway\n")
os.Exit(1)
}
var y yamlCfg
// Use strict YAML decoding to reject unknown fields
if err := config.DecodeStrict(strings.NewReader(string(data)), &y); err != nil {
logger.ComponentError(logging.ComponentGeneral, "Failed to parse gateway config", zap.Error(err))
fmt.Fprintf(os.Stderr, "Configuration parse error: %v\n", err)
os.Exit(1)
}
// Build config from YAML
cfg := &gateway.Config{ cfg := &gateway.Config{
ListenAddr: ":6001", ListenAddr: ":6001",
ClientNamespace: "default", ClientNamespace: "default",
@ -48,20 +81,6 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
RQLiteDSN: "", RQLiteDSN: "",
} }
// 1) YAML (optional)
{
type yamlCfg struct {
ListenAddr string `yaml:"listen_addr"`
ClientNamespace string `yaml:"client_namespace"`
RQLiteDSN string `yaml:"rqlite_dsn"`
BootstrapPeers []string `yaml:"bootstrap_peers"`
}
const path = "configs/gateway.yaml"
if data, err := os.ReadFile(path); err == nil {
var y yamlCfg
if err := yaml.Unmarshal(data, &y); err != nil {
logger.ComponentWarn(logging.ComponentGeneral, "failed to parse configs/gateway.yaml; ignoring", zap.Error(err))
} else {
if v := strings.TrimSpace(y.ListenAddr); v != "" { if v := strings.TrimSpace(y.ListenAddr); v != "" {
cfg.ListenAddr = v cfg.ListenAddr = v
} }
@ -83,59 +102,19 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
cfg.BootstrapPeers = bp cfg.BootstrapPeers = bp
} }
} }
// Validate configuration
if errs := cfg.ValidateConfig(); len(errs) > 0 {
fmt.Fprintf(os.Stderr, "\nGateway configuration errors (%d):\n", len(errs))
for _, err := range errs {
fmt.Fprintf(os.Stderr, " - %s\n", err)
} }
} fmt.Fprintf(os.Stderr, "\nPlease fix the configuration and try again.\n")
os.Exit(1)
} }
// 2) Env overrides logger.ComponentInfo(logging.ComponentGeneral, "Loaded gateway configuration from YAML",
if v := strings.TrimSpace(os.Getenv("GATEWAY_ADDR")); v != "" { zap.String("path", configPath),
cfg.ListenAddr = v
}
if v := strings.TrimSpace(os.Getenv("GATEWAY_NAMESPACE")); v != "" {
cfg.ClientNamespace = v
}
if v := strings.TrimSpace(os.Getenv("GATEWAY_RQLITE_DSN")); v != "" {
cfg.RQLiteDSN = v
}
if v := strings.TrimSpace(os.Getenv("GATEWAY_BOOTSTRAP_PEERS")); v != "" {
parts := strings.Split(v, ",")
var bp []string
for _, part := range parts {
s := strings.TrimSpace(part)
if s != "" {
bp = append(bp, s)
}
}
cfg.BootstrapPeers = bp
}
// 3) Flags (override env)
addr := flag.String("addr", "", "HTTP listen address (e.g., :6001)")
ns := flag.String("namespace", "", "Client namespace for scoping resources")
peers := flag.String("bootstrap-peers", "", "Comma-separated bootstrap peers for network client")
// Do not call flag.Parse() elsewhere to avoid double-parsing
flag.Parse()
if a := strings.TrimSpace(*addr); a != "" {
cfg.ListenAddr = a
}
if n := strings.TrimSpace(*ns); n != "" {
cfg.ClientNamespace = n
}
if p := strings.TrimSpace(*peers); p != "" {
parts := strings.Split(p, ",")
var bp []string
for _, part := range parts {
s := strings.TrimSpace(part)
if s != "" {
bp = append(bp, s)
}
}
cfg.BootstrapPeers = bp
}
logger.ComponentInfo(logging.ComponentGeneral, "Loaded gateway configuration",
zap.String("addr", cfg.ListenAddr), zap.String("addr", cfg.ListenAddr),
zap.String("namespace", cfg.ClientNamespace), zap.String("namespace", cfg.ClientNamespace),
zap.Int("bootstrap_peer_count", len(cfg.BootstrapPeers)), zap.Int("bootstrap_peer_count", len(cfg.BootstrapPeers)),

View File

@ -4,18 +4,17 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"log"
"os" "os"
"os/signal" "os/signal"
"path/filepath" "path/filepath"
"strconv"
"strings"
"syscall" "syscall"
"github.com/DeBrosOfficial/network/pkg/config" "github.com/DeBrosOfficial/network/pkg/config"
"github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/logging"
"github.com/DeBrosOfficial/network/pkg/node" "github.com/DeBrosOfficial/network/pkg/node"
"go.uber.org/zap" "go.uber.org/zap"
"gopkg.in/yaml.v3"
) )
// setup_logger initializes a logger for the given component. // setup_logger initializes a logger for the given component.
@ -24,75 +23,33 @@ func setup_logger(component logging.Component) (logger *logging.ColoredLogger) {
logger, err = logging.NewColoredLogger(component, true) logger, err = logging.NewColoredLogger(component, true)
if err != nil { if err != nil {
log.Fatalf("Failed to create logger: %v", err) fmt.Fprintf(os.Stderr, "Failed to create logger: %v\n", err)
os.Exit(1)
} }
return logger return logger
} }
// parse_and_return_network_flags it initializes all the network flags coming from the .yaml files // parse_flags parses command-line flags and returns them.
func parse_and_return_network_flags() (configPath *string, dataDir, nodeID *string, p2pPort, rqlHTTP, rqlRaft *int, rqlJoinAddr *string, advAddr *string, help *bool) { func parse_flags() (configName *string, help *bool) {
logger := setup_logger(logging.ComponentNode) configName = flag.String("config", "node.yaml", "Config filename in ~/.debros (default: node.yaml)")
configPath = flag.String("config", "", "Path to config YAML file (overrides defaults)")
dataDir = flag.String("data", "", "Data directory (auto-detected if not provided)")
nodeID = flag.String("id", "", "Node identifier (for running multiple local nodes)")
p2pPort = flag.Int("p2p-port", 4001, "LibP2P listen port")
rqlHTTP = flag.Int("rqlite-http-port", 5001, "RQLite HTTP API port")
rqlRaft = flag.Int("rqlite-raft-port", 7001, "RQLite Raft port")
rqlJoinAddr = flag.String("rqlite-join-address", "", "RQLite address to join (e.g., /ip4/)")
advAddr = flag.String("adv-addr", "127.0.0.1", "Default Advertise address for rqlite and rafts")
help = flag.Bool("help", false, "Show help") help = flag.Bool("help", false, "Show help")
flag.Parse() flag.Parse()
logger.Info("Successfully parsed all flags and arguments.")
if *configPath != "" {
cfg, err := LoadConfigFromYAML(*configPath)
if err != nil {
logger.Error("Failed to load config from YAML", zap.Error(err))
os.Exit(1)
}
logger.ComponentInfo(logging.ComponentNode, "Configuration loaded from YAML file", zap.String("path", *configPath))
// Instead of returning flag values, return config values
// For ListenAddresses, extract port from multiaddr string if possible, else use default
var p2pPortVal int
if len(cfg.Node.ListenAddresses) > 0 {
// Try to parse port from multiaddr string
var port int
_, err := fmt.Sscanf(cfg.Node.ListenAddresses[0], "/ip4/0.0.0.0/tcp/%d", &port)
if err == nil {
p2pPortVal = port
} else {
p2pPortVal = 4001
}
} else {
p2pPortVal = 4001
}
return configPath,
&cfg.Node.DataDir,
&cfg.Node.ID,
&p2pPortVal,
&cfg.Database.RQLitePort,
&cfg.Database.RQLiteRaftPort,
&cfg.Database.RQLiteJoinAddress,
&cfg.Discovery.HttpAdvAddress,
help
}
return return
} }
// LoadConfigFromYAML loads a config from a YAML file // LoadConfigFromYAML loads a config from a YAML file using strict decoding.
func LoadConfigFromYAML(path string) (*config.Config, error) { func LoadConfigFromYAML(path string) (*config.Config, error) {
data, err := ioutil.ReadFile(path) file, err := os.Open(path)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err) return nil, fmt.Errorf("failed to open config file: %w", err)
} }
defer file.Close()
var cfg config.Config var cfg config.Config
if err := yaml.Unmarshal(data, &cfg); err != nil { if err := config.DecodeStrict(file, &cfg); err != nil {
return nil, fmt.Errorf("failed to unmarshal YAML: %w", err) return nil, err
} }
return &cfg, nil return &cfg, nil
} }
@ -101,21 +58,44 @@ func LoadConfigFromYAML(path string) (*config.Config, error) {
func check_if_should_open_help(help *bool) { func check_if_should_open_help(help *bool) {
if *help { if *help {
flag.Usage() flag.Usage()
return os.Exit(0)
} }
} }
// select_data_dir selects the data directory for the node // select_data_dir validates that we can load the config from ~/.debros
// If none of (hasConfigFile, nodeID, dataDir) are present, throw an error and do not start func select_data_dir_check(configName *string) {
func select_data_dir(dataDir *string, nodeID *string, hasConfigFile bool) {
logger := setup_logger(logging.ComponentNode) logger := setup_logger(logging.ComponentNode)
if !hasConfigFile && (*nodeID == "" || nodeID == nil) && (*dataDir == "" || dataDir == nil) { // Ensure config directory exists and is writable
logger.Error("No config file, node ID, or data directory specified. Please provide at least one. Refusing to start.") _, err := config.EnsureConfigDir()
if err != nil {
logger.Error("Failed to ensure config directory", zap.Error(err))
fmt.Fprintf(os.Stderr, "\n❌ Configuration Error:\n")
fmt.Fprintf(os.Stderr, "Failed to create/access config directory: %v\n", err)
fmt.Fprintf(os.Stderr, "\nPlease ensure:\n")
fmt.Fprintf(os.Stderr, " 1. Home directory is accessible: %s\n", os.ExpandEnv("~"))
fmt.Fprintf(os.Stderr, " 2. You have write permissions to home directory\n")
fmt.Fprintf(os.Stderr, " 3. Disk space is available\n")
os.Exit(1) os.Exit(1)
} }
logger.Info("Successfully selected Data Directory of: %s", zap.String("dataDir", *dataDir)) configPath, err := config.DefaultPath(*configName)
if err != nil {
logger.Error("Failed to determine config path", zap.Error(err))
os.Exit(1)
}
if _, err := os.Stat(configPath); err != nil {
logger.Error("Config file not found",
zap.String("path", configPath),
zap.Error(err))
fmt.Fprintf(os.Stderr, "\n❌ Configuration Error:\n")
fmt.Fprintf(os.Stderr, "Config file not found at %s\n", configPath)
fmt.Fprintf(os.Stderr, "\nGenerate it with one of:\n")
fmt.Fprintf(os.Stderr, " network-cli config init --type bootstrap\n")
fmt.Fprintf(os.Stderr, " network-cli config init --type node --bootstrap-peers '<peer_multiaddr>'\n")
os.Exit(1)
}
} }
// startNode starts the node with the given configuration and port // startNode starts the node with the given configuration and port
@ -125,15 +105,29 @@ func startNode(ctx context.Context, cfg *config.Config, port int) error {
n, err := node.NewNode(cfg) n, err := node.NewNode(cfg)
if err != nil { if err != nil {
logger.Error("failed to create node: %v", zap.Error(err)) logger.Error("failed to create node: %v", zap.Error(err))
return err
} }
if err := n.Start(ctx); err != nil { if err := n.Start(ctx); err != nil {
logger.Error("failed to start node: %v", zap.Error(err)) logger.Error("failed to start node: %v", zap.Error(err))
return err
}
// Expand data directory path for peer.info file
dataDir := os.ExpandEnv(cfg.Node.DataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
logger.Error("failed to determine home directory: %v", zap.Error(err))
dataDir = cfg.Node.DataDir
} else {
dataDir = filepath.Join(home, dataDir[1:])
}
} }
// Save the peer ID to a file for CLI access (especially useful for bootstrap) // Save the peer ID to a file for CLI access (especially useful for bootstrap)
peerID := n.GetPeerID() peerID := n.GetPeerID()
peerInfoFile := filepath.Join(cfg.Node.DataDir, "peer.info") peerInfoFile := filepath.Join(dataDir, "peer.info")
peerMultiaddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d/p2p/%s", port, peerID) peerMultiaddr := fmt.Sprintf("/ip4/0.0.0.0/tcp/%d/p2p/%s", port, peerID)
if err := os.WriteFile(peerInfoFile, []byte(peerMultiaddr), 0644); err != nil { if err := os.WriteFile(peerInfoFile, []byte(peerMultiaddr), 0644); err != nil {
@ -152,8 +146,8 @@ func startNode(ctx context.Context, cfg *config.Config, port int) error {
return n.Stop() return n.Stop()
} }
// load_args_into_config applies command line argument overrides to the config // apply_flag_overrides applies command line argument overrides to the config
func load_args_into_config(cfg *config.Config, p2pPort, rqlHTTP, rqlRaft *int, rqlJoinAddr *string, advAddr *string, dataDir *string) { func apply_flag_overrides(cfg *config.Config, p2pPort, rqlHTTP, rqlRaft *int, rqlJoinAddr *string, advAddr *string, dataDir *string) {
logger := setup_logger(logging.ComponentNode) logger := setup_logger(logging.ComponentNode)
// Apply RQLite HTTP port override // Apply RQLite HTTP port override
@ -183,8 +177,8 @@ func load_args_into_config(cfg *config.Config, p2pPort, rqlHTTP, rqlRaft *int, r
} }
if *advAddr != "" { if *advAddr != "" {
cfg.Discovery.HttpAdvAddress = fmt.Sprintf("%s:%d", *advAddr, *rqlHTTP) cfg.Discovery.HttpAdvAddress = fmt.Sprintf("%s:%d", *advAddr, cfg.Database.RQLitePort)
cfg.Discovery.RaftAdvAddress = fmt.Sprintf("%s:%d", *advAddr, *rqlRaft) cfg.Discovery.RaftAdvAddress = fmt.Sprintf("%s:%d", *advAddr, cfg.Database.RQLiteRaftPort)
} }
if *dataDir != "" { if *dataDir != "" {
@ -192,34 +186,116 @@ func load_args_into_config(cfg *config.Config, p2pPort, rqlHTTP, rqlRaft *int, r
} }
} }
// printValidationErrors prints aggregated validation errors and exits.
func printValidationErrors(errs []error) {
fmt.Fprintf(os.Stderr, "\nConfiguration errors (%d):\n", len(errs))
for _, err := range errs {
fmt.Fprintf(os.Stderr, " - %s\n", err)
}
fmt.Fprintf(os.Stderr, "\nPlease fix the configuration and try again.\n")
os.Exit(1)
}
// ensureDataDirectories ensures that all necessary data directories exist and have correct permissions.
func ensureDataDirectories(cfg *config.Config, logger *logging.ColoredLogger) error {
// Expand ~ in data_dir path
dataDir := os.ExpandEnv(cfg.Node.DataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err)
}
dataDir = filepath.Join(home, dataDir[1:])
}
// Ensure Node.DataDir exists and is writable
if err := os.MkdirAll(dataDir, 0755); err != nil {
return fmt.Errorf("failed to create data directory %s: %w", dataDir, err)
}
logger.ComponentInfo(logging.ComponentNode, "Data directory created/verified", zap.String("path", dataDir))
// Ensure RQLite data directory exists
rqliteDir := filepath.Join(dataDir, "rqlite")
if err := os.MkdirAll(rqliteDir, 0755); err != nil {
return fmt.Errorf("failed to create rqlite data directory: %w", err)
}
logger.ComponentInfo(logging.ComponentNode, "RQLite data directory created/verified", zap.String("path", rqliteDir))
return nil
}
func main() { func main() {
logger := setup_logger(logging.ComponentNode) logger := setup_logger(logging.ComponentNode)
configPath, dataDir, nodeID, p2pPort, rqlHTTP, rqlRaft, rqlJoinAddr, advAddr, help := parse_and_return_network_flags() // Parse command-line flags
configName, help := parse_flags()
check_if_should_open_help(help) check_if_should_open_help(help)
select_data_dir(dataDir, nodeID, *configPath != "")
// Load Node Configuration // Check if config file exists
select_data_dir_check(configName)
// Load configuration from ~/.debros/node.yaml
configPath, err := config.DefaultPath(*configName)
if err != nil {
logger.Error("Failed to determine config path", zap.Error(err))
fmt.Fprintf(os.Stderr, "Configuration error: %v\n", err)
os.Exit(1)
}
var cfg *config.Config var cfg *config.Config
cfg = config.DefaultConfig() var cfgErr error
logger.ComponentInfo(logging.ComponentNode, "Default configuration loaded successfully") cfg, cfgErr = LoadConfigFromYAML(configPath)
if cfgErr != nil {
logger.Error("Failed to load config from YAML", zap.Error(cfgErr))
fmt.Fprintf(os.Stderr, "Configuration load error: %v\n", cfgErr)
os.Exit(1)
}
logger.ComponentInfo(logging.ComponentNode, "Configuration loaded from YAML file", zap.String("path", configPath))
// Apply command line argument overrides // Set default advertised addresses if empty
load_args_into_config(cfg, p2pPort, rqlHTTP, rqlRaft, rqlJoinAddr, advAddr, dataDir) if cfg.Discovery.HttpAdvAddress == "" {
logger.ComponentInfo(logging.ComponentNode, "Command line arguments applied to configuration") cfg.Discovery.HttpAdvAddress = fmt.Sprintf("127.0.0.1:%d", cfg.Database.RQLitePort)
}
if cfg.Discovery.RaftAdvAddress == "" {
cfg.Discovery.RaftAdvAddress = fmt.Sprintf("127.0.0.1:%d", cfg.Database.RQLiteRaftPort)
}
// LibP2P uses configurable port (default 4001); RQLite uses 5001 (HTTP) and 7001 (Raft) // Validate configuration
port := *p2pPort if errs := cfg.Validate(); len(errs) > 0 {
printValidationErrors(errs)
}
// Expand and create data directories
if err := ensureDataDirectories(cfg, logger); err != nil {
logger.Error("Failed to create data directories", zap.Error(err))
fmt.Fprintf(os.Stderr, "\n❌ Data Directory Error:\n")
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
logger.ComponentInfo(logging.ComponentNode, "Node configuration summary", logger.ComponentInfo(logging.ComponentNode, "Node configuration summary",
zap.Strings("listen_addresses", cfg.Node.ListenAddresses), zap.Strings("listen_addresses", cfg.Node.ListenAddresses),
zap.Int("rqlite_http_port", cfg.Database.RQLitePort), zap.Int("rqlite_http_port", cfg.Database.RQLitePort),
zap.Int("rqlite_raft_port", cfg.Database.RQLiteRaftPort), zap.Int("rqlite_raft_port", cfg.Database.RQLiteRaftPort),
zap.Int("p2p_port", port),
zap.Strings("bootstrap_peers", cfg.Discovery.BootstrapPeers), zap.Strings("bootstrap_peers", cfg.Discovery.BootstrapPeers),
zap.String("rqlite_join_address", cfg.Database.RQLiteJoinAddress), zap.String("rqlite_join_address", cfg.Database.RQLiteJoinAddress),
zap.String("data_directory", *dataDir)) zap.String("data_directory", cfg.Node.DataDir))
// Extract P2P port from listen addresses
p2pPort := 4001 // default
if len(cfg.Node.ListenAddresses) > 0 {
// Parse port from multiaddr like "/ip4/0.0.0.0/tcp/4001"
parts := strings.Split(cfg.Node.ListenAddresses[0], "/")
for i, part := range parts {
if part == "tcp" && i+1 < len(parts) {
if port, err := strconv.Atoi(parts[i+1]); err == nil {
p2pPort = port
break
}
}
}
}
// Create context for graceful shutdown // Create context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -229,7 +305,7 @@ func main() {
errChan := make(chan error, 1) errChan := make(chan error, 1)
doneChan := make(chan struct{}) doneChan := make(chan struct{})
go func() { go func() {
if err := startNode(ctx, cfg, port); err != nil { if err := startNode(ctx, cfg, p2pPort); err != nil {
errChan <- err errChan <- err
} }
close(doneChan) close(doneChan)

View File

@ -108,12 +108,7 @@ func DefaultConfig() *Config {
RQLiteJoinAddress: "", // Empty for bootstrap node RQLiteJoinAddress: "", // Empty for bootstrap node
}, },
Discovery: DiscoveryConfig{ Discovery: DiscoveryConfig{
BootstrapPeers: []string{ BootstrapPeers: []string{},
"/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1",
"/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR",
"/ip4/155.133.27.199/tcp/4001/p2p/12D3KooWC69SBzM5QUgrLrfLWUykE8au32X5LwT7zwv9bixrQPm1",
"/ip4/217.76.56.2/tcp/4001/p2p/12D3KooWEiqJHvznxqJ5p2y8mUs6Ky6dfU1xTYFQbyKRCABfcZz4",
},
BootstrapPort: 4001, // Default LibP2P port BootstrapPort: 4001, // Default LibP2P port
DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing
HttpAdvAddress: "", HttpAdvAddress: "",

38
pkg/config/paths.go Normal file
View File

@ -0,0 +1,38 @@
package config
import (
"fmt"
"os"
"path/filepath"
)
// ConfigDir returns the path to the DeBros config directory (~/.debros).
func ConfigDir() (string, error) {
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("failed to determine home directory: %w", err)
}
return filepath.Join(home, ".debros"), nil
}
// EnsureConfigDir creates the config directory if it does not exist.
func EnsureConfigDir() (string, error) {
dir, err := ConfigDir()
if err != nil {
return "", err
}
if err := os.MkdirAll(dir, 0700); err != nil {
return "", fmt.Errorf("failed to create config directory %s: %w", dir, err)
}
return dir, nil
}
// DefaultPath returns the path to the config file for the given component name.
// component should be e.g., "node.yaml", "bootstrap.yaml", "gateway.yaml"
func DefaultPath(component string) (string, error) {
dir, err := ConfigDir()
if err != nil {
return "", err
}
return filepath.Join(dir, component), nil
}

582
pkg/config/validate.go Normal file
View File

@ -0,0 +1,582 @@
package config
import (
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
// ValidationError represents a single validation error with context.
type ValidationError struct {
Path string // e.g., "discovery.bootstrap_peers[0]"
Message string // e.g., "invalid multiaddr"
Hint string // e.g., "expected /ip{4,6}/.../tcp/<port>/p2p/<peerID>"
}
func (e ValidationError) Error() string {
if e.Hint != "" {
return fmt.Sprintf("%s: %s; %s", e.Path, e.Message, e.Hint)
}
return fmt.Sprintf("%s: %s", e.Path, e.Message)
}
// Validate performs comprehensive validation of the entire config.
// It aggregates all errors and returns them, allowing the caller to print all issues at once.
func (c *Config) Validate() []error {
var errs []error
// Validate node config
errs = append(errs, c.validateNode()...)
// Validate database config
errs = append(errs, c.validateDatabase()...)
// Validate discovery config
errs = append(errs, c.validateDiscovery()...)
// Validate security config
errs = append(errs, c.validateSecurity()...)
// Validate logging config
errs = append(errs, c.validateLogging()...)
// Cross-field validations
errs = append(errs, c.validateCrossFields()...)
return errs
}
func (c *Config) validateNode() []error {
var errs []error
nc := c.Node
// Validate type
if nc.Type != "bootstrap" && nc.Type != "node" {
errs = append(errs, ValidationError{
Path: "node.type",
Message: fmt.Sprintf("must be one of [bootstrap node]; got %q", nc.Type),
})
}
// Validate listen_addresses
if len(nc.ListenAddresses) == 0 {
errs = append(errs, ValidationError{
Path: "node.listen_addresses",
Message: "must not be empty",
})
}
seen := make(map[string]bool)
for i, addr := range nc.ListenAddresses {
path := fmt.Sprintf("node.listen_addresses[%d]", i)
// Parse as multiaddr
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
errs = append(errs, ValidationError{
Path: path,
Message: fmt.Sprintf("invalid multiaddr: %v", err),
Hint: "expected /ip{4,6}/.../ tcp/<port>",
})
continue
}
// Check for TCP and valid port
tcpAddr, err := manet.ToNetAddr(ma)
if err != nil {
errs = append(errs, ValidationError{
Path: path,
Message: fmt.Sprintf("cannot convert multiaddr to network address: %v", err),
Hint: "ensure multiaddr contains /tcp/<port>",
})
continue
}
tcpPort := tcpAddr.(*net.TCPAddr).Port
if tcpPort < 1 || tcpPort > 65535 {
errs = append(errs, ValidationError{
Path: path,
Message: fmt.Sprintf("invalid TCP port %d", tcpPort),
Hint: "port must be between 1 and 65535",
})
}
if seen[addr] {
errs = append(errs, ValidationError{
Path: path,
Message: "duplicate listen address",
})
}
seen[addr] = true
}
// Validate data_dir
if nc.DataDir == "" {
errs = append(errs, ValidationError{
Path: "node.data_dir",
Message: "must not be empty",
})
} else {
if err := validateDataDir(nc.DataDir); err != nil {
errs = append(errs, ValidationError{
Path: "node.data_dir",
Message: err.Error(),
})
}
}
// Validate max_connections
if nc.MaxConnections <= 0 {
errs = append(errs, ValidationError{
Path: "node.max_connections",
Message: fmt.Sprintf("must be > 0; got %d", nc.MaxConnections),
})
}
return errs
}
func (c *Config) validateDatabase() []error {
var errs []error
dc := c.Database
// Validate data_dir
if dc.DataDir == "" {
errs = append(errs, ValidationError{
Path: "database.data_dir",
Message: "must not be empty",
})
} else {
if err := validateDataDir(dc.DataDir); err != nil {
errs = append(errs, ValidationError{
Path: "database.data_dir",
Message: err.Error(),
})
}
}
// Validate replication_factor
if dc.ReplicationFactor < 1 {
errs = append(errs, ValidationError{
Path: "database.replication_factor",
Message: fmt.Sprintf("must be >= 1; got %d", dc.ReplicationFactor),
})
} else if dc.ReplicationFactor%2 == 0 {
// Warn about even replication factor (Raft best practice: odd)
// For now we log a note but don't error
_ = fmt.Sprintf("note: database.replication_factor %d is even; Raft recommends odd numbers for quorum", dc.ReplicationFactor)
}
// Validate shard_count
if dc.ShardCount < 1 {
errs = append(errs, ValidationError{
Path: "database.shard_count",
Message: fmt.Sprintf("must be >= 1; got %d", dc.ShardCount),
})
}
// Validate max_database_size
if dc.MaxDatabaseSize < 0 {
errs = append(errs, ValidationError{
Path: "database.max_database_size",
Message: fmt.Sprintf("must be >= 0; got %d", dc.MaxDatabaseSize),
})
}
// Validate rqlite_port
if dc.RQLitePort < 1 || dc.RQLitePort > 65535 {
errs = append(errs, ValidationError{
Path: "database.rqlite_port",
Message: fmt.Sprintf("must be between 1 and 65535; got %d", dc.RQLitePort),
})
}
// Validate rqlite_raft_port
if dc.RQLiteRaftPort < 1 || dc.RQLiteRaftPort > 65535 {
errs = append(errs, ValidationError{
Path: "database.rqlite_raft_port",
Message: fmt.Sprintf("must be between 1 and 65535; got %d", dc.RQLiteRaftPort),
})
}
// Ports must differ
if dc.RQLitePort == dc.RQLiteRaftPort {
errs = append(errs, ValidationError{
Path: "database.rqlite_raft_port",
Message: fmt.Sprintf("must differ from database.rqlite_port (%d)", dc.RQLitePort),
})
}
// Validate rqlite_join_address context-dependently
if c.Node.Type == "node" {
if dc.RQLiteJoinAddress == "" {
errs = append(errs, ValidationError{
Path: "database.rqlite_join_address",
Message: "required for node type (non-bootstrap)",
})
} else {
if err := validateHostPort(dc.RQLiteJoinAddress); err != nil {
errs = append(errs, ValidationError{
Path: "database.rqlite_join_address",
Message: err.Error(),
Hint: "expected format: host:port",
})
}
}
} else if c.Node.Type == "bootstrap" {
if dc.RQLiteJoinAddress != "" {
errs = append(errs, ValidationError{
Path: "database.rqlite_join_address",
Message: "must be empty for bootstrap type",
})
}
}
return errs
}
func (c *Config) validateDiscovery() []error {
var errs []error
disc := c.Discovery
// Validate discovery_interval
if disc.DiscoveryInterval <= 0 {
errs = append(errs, ValidationError{
Path: "discovery.discovery_interval",
Message: fmt.Sprintf("must be > 0; got %v", disc.DiscoveryInterval),
})
}
// Validate bootstrap_port
if disc.BootstrapPort < 1 || disc.BootstrapPort > 65535 {
errs = append(errs, ValidationError{
Path: "discovery.bootstrap_port",
Message: fmt.Sprintf("must be between 1 and 65535; got %d", disc.BootstrapPort),
})
}
// Validate bootstrap_peers context-dependently
if c.Node.Type == "node" {
if len(disc.BootstrapPeers) == 0 {
errs = append(errs, ValidationError{
Path: "discovery.bootstrap_peers",
Message: "required for node type (must not be empty)",
})
}
}
// Validate each bootstrap peer multiaddr
seenPeers := make(map[string]bool)
for i, peer := range disc.BootstrapPeers {
path := fmt.Sprintf("discovery.bootstrap_peers[%d]", i)
_, err := multiaddr.NewMultiaddr(peer)
if err != nil {
errs = append(errs, ValidationError{
Path: path,
Message: fmt.Sprintf("invalid multiaddr: %v", err),
Hint: "expected /ip{4,6}/.../tcp/<port>/p2p/<peerID>",
})
continue
}
// Check for /p2p/ component
if !strings.Contains(peer, "/p2p/") {
errs = append(errs, ValidationError{
Path: path,
Message: "missing /p2p/<peerID> component",
Hint: "expected /ip{4,6}/.../tcp/<port>/p2p/<peerID>",
})
}
// Extract TCP port by parsing the multiaddr string directly
// Look for /tcp/ in the peer string
tcpPortStr := extractTCPPort(peer)
if tcpPortStr == "" {
errs = append(errs, ValidationError{
Path: path,
Message: "missing /tcp/<port> component",
Hint: "expected /ip{4,6}/.../tcp/<port>/p2p/<peerID>",
})
continue
}
tcpPort, err := strconv.Atoi(tcpPortStr)
if err != nil || tcpPort < 1 || tcpPort > 65535 {
errs = append(errs, ValidationError{
Path: path,
Message: fmt.Sprintf("invalid TCP port %s", tcpPortStr),
Hint: "port must be between 1 and 65535",
})
}
if seenPeers[peer] {
errs = append(errs, ValidationError{
Path: path,
Message: "duplicate bootstrap peer",
})
}
seenPeers[peer] = true
}
// Validate http_adv_address
if disc.HttpAdvAddress != "" {
if err := validateHostOrHostPort(disc.HttpAdvAddress); err != nil {
errs = append(errs, ValidationError{
Path: "discovery.http_adv_address",
Message: err.Error(),
Hint: "expected format: host or host:port",
})
}
}
// Validate raft_adv_address
if disc.RaftAdvAddress != "" {
if err := validateHostOrHostPort(disc.RaftAdvAddress); err != nil {
errs = append(errs, ValidationError{
Path: "discovery.raft_adv_address",
Message: err.Error(),
Hint: "expected format: host or host:port",
})
}
}
return errs
}
func (c *Config) validateSecurity() []error {
var errs []error
sec := c.Security
// Validate logging level
if sec.EnableTLS {
if sec.PrivateKeyFile == "" {
errs = append(errs, ValidationError{
Path: "security.private_key_file",
Message: "required when enable_tls is true",
})
} else {
if err := validateFileReadable(sec.PrivateKeyFile); err != nil {
errs = append(errs, ValidationError{
Path: "security.private_key_file",
Message: err.Error(),
})
}
}
if sec.CertificateFile == "" {
errs = append(errs, ValidationError{
Path: "security.certificate_file",
Message: "required when enable_tls is true",
})
} else {
if err := validateFileReadable(sec.CertificateFile); err != nil {
errs = append(errs, ValidationError{
Path: "security.certificate_file",
Message: err.Error(),
})
}
}
}
return errs
}
func (c *Config) validateLogging() []error {
var errs []error
log := c.Logging
// Validate level
validLevels := map[string]bool{"debug": true, "info": true, "warn": true, "error": true}
if !validLevels[log.Level] {
errs = append(errs, ValidationError{
Path: "logging.level",
Message: fmt.Sprintf("invalid value %q", log.Level),
Hint: "allowed values: debug, info, warn, error",
})
}
// Validate format
validFormats := map[string]bool{"json": true, "console": true}
if !validFormats[log.Format] {
errs = append(errs, ValidationError{
Path: "logging.format",
Message: fmt.Sprintf("invalid value %q", log.Format),
Hint: "allowed values: json, console",
})
}
// Validate output_file
if log.OutputFile != "" {
dir := filepath.Dir(log.OutputFile)
if dir != "" && dir != "." {
if err := validateDirWritable(dir); err != nil {
errs = append(errs, ValidationError{
Path: "logging.output_file",
Message: fmt.Sprintf("parent directory not writable: %v", err),
})
}
}
}
return errs
}
func (c *Config) validateCrossFields() []error {
var errs []error
// If node.type is invalid, don't run cross-checks
if c.Node.Type != "bootstrap" && c.Node.Type != "node" {
return errs
}
// Cross-check rqlite_join_address vs node type
if c.Node.Type == "bootstrap" && c.Database.RQLiteJoinAddress != "" {
errs = append(errs, ValidationError{
Path: "database.rqlite_join_address",
Message: "must be empty for bootstrap node type",
})
}
if c.Node.Type == "node" && c.Database.RQLiteJoinAddress == "" {
errs = append(errs, ValidationError{
Path: "database.rqlite_join_address",
Message: "required for non-bootstrap node type",
})
}
return errs
}
// Helper validation functions
func validateDataDir(path string) error {
if path == "" {
return fmt.Errorf("must not be empty")
}
// Expand ~ to home directory
expandedPath := os.ExpandEnv(path)
if strings.HasPrefix(expandedPath, "~") {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("cannot determine home directory: %v", err)
}
expandedPath = filepath.Join(home, expandedPath[1:])
}
if info, err := os.Stat(expandedPath); err == nil {
// Directory exists; check if it's a directory and writable
if !info.IsDir() {
return fmt.Errorf("path exists but is not a directory")
}
// Try to write a test file to check permissions
testFile := filepath.Join(expandedPath, ".write_test")
if err := os.WriteFile(testFile, []byte(""), 0644); err != nil {
return fmt.Errorf("directory not writable: %v", err)
}
os.Remove(testFile)
} else if os.IsNotExist(err) {
// Directory doesn't exist; check if parent is writable
parent := filepath.Dir(expandedPath)
if parent == "" || parent == "." {
parent = "."
}
// Allow parent not existing - it will be created at runtime
if info, err := os.Stat(parent); err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("parent directory not accessible: %v", err)
}
// Parent doesn't exist either - that's ok, will be created
} else if !info.IsDir() {
return fmt.Errorf("parent path is not a directory")
} else {
// Parent exists, check if writable
if err := validateDirWritable(parent); err != nil {
return fmt.Errorf("parent directory not writable: %v", err)
}
}
} else {
return fmt.Errorf("cannot access path: %v", err)
}
return nil
}
func validateDirWritable(path string) error {
info, err := os.Stat(path)
if err != nil {
return fmt.Errorf("cannot access directory: %v", err)
}
if !info.IsDir() {
return fmt.Errorf("path is not a directory")
}
// Try to write a test file
testFile := filepath.Join(path, ".write_test")
if err := os.WriteFile(testFile, []byte(""), 0644); err != nil {
return fmt.Errorf("directory not writable: %v", err)
}
os.Remove(testFile)
return nil
}
func validateFileReadable(path string) error {
_, err := os.Stat(path)
if err != nil {
return fmt.Errorf("cannot read file: %v", err)
}
return nil
}
func validateHostPort(hostPort string) error {
parts := strings.Split(hostPort, ":")
if len(parts) != 2 {
return fmt.Errorf("expected format host:port")
}
host := parts[0]
port := parts[1]
if host == "" {
return fmt.Errorf("host must not be empty")
}
portNum, err := strconv.Atoi(port)
if err != nil || portNum < 1 || portNum > 65535 {
return fmt.Errorf("port must be a number between 1 and 65535; got %q", port)
}
return nil
}
func validateHostOrHostPort(addr string) error {
// Try to parse as host:port first
if strings.Contains(addr, ":") {
return validateHostPort(addr)
}
// Otherwise just check if it's a valid hostname/IP
if addr == "" {
return fmt.Errorf("address must not be empty")
}
return nil
}
func extractTCPPort(multiaddrStr string) string {
// Look for the /tcp/ protocol code
parts := strings.Split(multiaddrStr, "/")
for i := 0; i < len(parts); i++ {
if parts[i] == "tcp" {
// The port is the next part
if i+1 < len(parts) {
return parts[i+1]
}
break
}
}
return ""
}

409
pkg/config/validate_test.go Normal file
View File

@ -0,0 +1,409 @@
package config
import (
"testing"
"time"
)
func TestValidateNodeType(t *testing.T) {
tests := []struct {
name string
nodeType string
shouldError bool
}{
{"bootstrap", "bootstrap", false},
{"node", "node", false},
{"invalid", "invalid-type", true},
{"empty", "", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: tt.nodeType, ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateListenAddresses(t *testing.T) {
tests := []struct {
name string
addresses []string
shouldError bool
}{
{"valid single", []string{"/ip4/0.0.0.0/tcp/4001"}, false},
{"valid ipv6", []string{"/ip6/::/tcp/4001"}, false},
{"invalid port", []string{"/ip4/0.0.0.0/tcp/99999"}, true},
{"invalid port zero", []string{"/ip4/0.0.0.0/tcp/0"}, true},
{"invalid multiaddr", []string{"invalid"}, true},
{"empty", []string{}, true},
{"duplicate", []string{"/ip4/0.0.0.0/tcp/4001", "/ip4/0.0.0.0/tcp/4001"}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: "node", ListenAddresses: tt.addresses, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "localhost:7001"},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateReplicationFactor(t *testing.T) {
tests := []struct {
name string
replication int
shouldError bool
}{
{"valid 1", 1, false},
{"valid 3", 3, false},
{"valid even", 2, false}, // warn but not error
{"invalid zero", 0, true},
{"invalid negative", -1, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: "node", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: tt.replication, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "localhost:7001"},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateRQLitePorts(t *testing.T) {
tests := []struct {
name string
httpPort int
raftPort int
shouldError bool
}{
{"valid different", 5001, 7001, false},
{"invalid same", 5001, 5001, true},
{"invalid http port zero", 0, 7001, true},
{"invalid raft port zero", 5001, 0, true},
{"invalid http port too high", 99999, 7001, true},
{"invalid raft port too high", 5001, 99999, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: "node", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: tt.httpPort, RQLiteRaftPort: tt.raftPort, RQLiteJoinAddress: "localhost:7001"},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateRQLiteJoinAddress(t *testing.T) {
tests := []struct {
name string
nodeType string
joinAddr string
shouldError bool
}{
{"node with join", "node", "localhost:7001", false},
{"node without join", "node", "", true},
{"bootstrap with join", "bootstrap", "localhost:7001", true},
{"bootstrap without join", "bootstrap", "", false},
{"invalid join format", "node", "localhost", true},
{"invalid join port", "node", "localhost:99999", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: tt.nodeType, ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: tt.joinAddr},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateBootstrapPeers(t *testing.T) {
validPeer := "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"
tests := []struct {
name string
nodeType string
peers []string
shouldError bool
}{
{"node with peer", "node", []string{validPeer}, false},
{"node without peer", "node", []string{}, true},
{"bootstrap with peer", "bootstrap", []string{validPeer}, false},
{"bootstrap without peer", "bootstrap", []string{}, false},
{"invalid multiaddr", "node", []string{"invalid"}, true},
{"missing p2p", "node", []string{"/ip4/127.0.0.1/tcp/4001"}, true},
{"duplicate peer", "node", []string{validPeer, validPeer}, true},
{"invalid port", "node", []string{"/ip4/127.0.0.1/tcp/99999/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: tt.nodeType, ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: ""},
Discovery: DiscoveryConfig{BootstrapPeers: tt.peers, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateLoggingLevel(t *testing.T) {
tests := []struct {
name string
level string
shouldError bool
}{
{"debug", "debug", false},
{"info", "info", false},
{"warn", "warn", false},
{"error", "error", false},
{"invalid", "verbose", true},
{"empty", "", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: "node", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "localhost:7001"},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: tt.level, Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateLoggingFormat(t *testing.T) {
tests := []struct {
name string
format string
shouldError bool
}{
{"json", "json", false},
{"console", "console", false},
{"invalid", "text", true},
{"empty", "", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: "node", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "localhost:7001"},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: tt.format},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateMaxConnections(t *testing.T) {
tests := []struct {
name string
maxConn int
shouldError bool
}{
{"valid 50", 50, false},
{"valid 1", 1, false},
{"invalid zero", 0, true},
{"invalid negative", -1, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: "node", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: tt.maxConn},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "localhost:7001"},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateDiscoveryInterval(t *testing.T) {
tests := []struct {
name string
interval time.Duration
shouldError bool
}{
{"valid 15s", 15 * time.Second, false},
{"valid 1s", 1 * time.Second, false},
{"invalid zero", 0, true},
{"invalid negative", -5 * time.Second, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: "node", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "localhost:7001"},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: tt.interval, BootstrapPort: 4001, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateBootstrapPort(t *testing.T) {
tests := []struct {
name string
port int
shouldError bool
}{
{"valid 4001", 4001, false},
{"valid 4002", 4002, false},
{"invalid zero", 0, true},
{"invalid too high", 99999, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Node: NodeConfig{Type: "node", ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4001"}, DataDir: ".", MaxConnections: 50},
Database: DatabaseConfig{DataDir: ".", ReplicationFactor: 3, ShardCount: 16, MaxDatabaseSize: 1024, BackupInterval: 1 * time.Hour, RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "localhost:7001"},
Discovery: DiscoveryConfig{BootstrapPeers: []string{"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj"}, DiscoveryInterval: 15 * time.Second, BootstrapPort: tt.port, NodeNamespace: "default"},
Logging: LoggingConfig{Level: "info", Format: "console"},
}
errs := cfg.Validate()
if tt.shouldError && len(errs) == 0 {
t.Errorf("expected error, got none")
}
if !tt.shouldError && len(errs) > 0 {
t.Errorf("unexpected errors: %v", errs)
}
})
}
}
func TestValidateCompleteConfig(t *testing.T) {
// Test a complete valid config
validCfg := &Config{
Node: NodeConfig{
Type: "node",
ID: "node1",
ListenAddresses: []string{"/ip4/0.0.0.0/tcp/4002"},
DataDir: ".",
MaxConnections: 50,
},
Database: DatabaseConfig{
DataDir: ".",
ReplicationFactor: 3,
ShardCount: 16,
MaxDatabaseSize: 1073741824,
BackupInterval: 24 * time.Hour,
RQLitePort: 5002,
RQLiteRaftPort: 7002,
RQLiteJoinAddress: "127.0.0.1:7001",
},
Discovery: DiscoveryConfig{
BootstrapPeers: []string{
"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWHbcFcrGPXKUrHcxvd8MXEeUzRYyvY8fQcpEBxncSUwhj",
},
DiscoveryInterval: 15 * time.Second,
BootstrapPort: 4001,
HttpAdvAddress: "127.0.0.1",
NodeNamespace: "default",
},
Security: SecurityConfig{
EnableTLS: false,
},
Logging: LoggingConfig{
Level: "info",
Format: "console",
},
}
errs := validCfg.Validate()
if len(errs) > 0 {
t.Errorf("valid config should not have errors: %v", errs)
}
}

19
pkg/config/yaml.go Normal file
View File

@ -0,0 +1,19 @@
package config
import (
"fmt"
"io"
"gopkg.in/yaml.v3"
)
// DecodeStrict decodes YAML from a reader and rejects any unknown fields.
// This ensures the YAML only contains recognized configuration keys.
func DecodeStrict(r io.Reader, out interface{}) error {
decoder := yaml.NewDecoder(r)
decoder.KnownFields(true)
if err := decoder.Decode(out); err != nil {
return fmt.Errorf("invalid config: %w", err)
}
return nil
}

View File

@ -0,0 +1,137 @@
package gateway
import (
"fmt"
"net"
"net/url"
"strconv"
"strings"
"github.com/multiformats/go-multiaddr"
)
// ValidateConfig performs comprehensive validation of gateway configuration.
// It returns aggregated errors, allowing the caller to print all issues at once.
func (c *Config) ValidateConfig() []error {
var errs []error
// Validate listen_addr
if c.ListenAddr == "" {
errs = append(errs, fmt.Errorf("gateway.listen_addr: must not be empty"))
} else {
if err := validateListenAddr(c.ListenAddr); err != nil {
errs = append(errs, fmt.Errorf("gateway.listen_addr: %v", err))
}
}
// Validate client_namespace
if c.ClientNamespace == "" {
errs = append(errs, fmt.Errorf("gateway.client_namespace: must not be empty"))
}
// Validate bootstrap_peers if provided
seenPeers := make(map[string]bool)
for i, peer := range c.BootstrapPeers {
path := fmt.Sprintf("gateway.bootstrap_peers[%d]", i)
_, err := multiaddr.NewMultiaddr(peer)
if err != nil {
errs = append(errs, fmt.Errorf("%s: invalid multiaddr: %v; expected /ip{4,6}/.../tcp/<port>/p2p/<peerID>", path, err))
continue
}
// Check for /p2p/ component
if !strings.Contains(peer, "/p2p/") {
errs = append(errs, fmt.Errorf("%s: missing /p2p/<peerID> component; expected /ip{4,6}/.../tcp/<port>/p2p/<peerID>", path))
}
// Extract TCP port by parsing the multiaddr string directly
tcpPortStr := extractTCPPort(peer)
if tcpPortStr == "" {
errs = append(errs, fmt.Errorf("%s: missing /tcp/<port> component; expected /ip{4,6}/.../tcp/<port>/p2p/<peerID>", path))
continue
}
tcpPort, err := strconv.Atoi(tcpPortStr)
if err != nil || tcpPort < 1 || tcpPort > 65535 {
errs = append(errs, fmt.Errorf("%s: invalid TCP port %s; port must be between 1 and 65535", path, tcpPortStr))
}
if seenPeers[peer] {
errs = append(errs, fmt.Errorf("%s: duplicate bootstrap peer", path))
}
seenPeers[peer] = true
}
// Validate rqlite_dsn if provided
if c.RQLiteDSN != "" {
if err := validateRQLiteDSN(c.RQLiteDSN); err != nil {
errs = append(errs, fmt.Errorf("gateway.rqlite_dsn: %v", err))
}
}
return errs
}
// validateListenAddr checks if a listen address is valid (host:port format)
func validateListenAddr(addr string) error {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return fmt.Errorf("invalid format; expected host:port")
}
portNum, err := strconv.Atoi(port)
if err != nil || portNum < 1 || portNum > 65535 {
return fmt.Errorf("port must be a number between 1 and 65535; got %q", port)
}
// Allow empty host (for wildcard binds like :6001)
if host != "" && net.ParseIP(host) == nil {
// Try as hostname (may fail later during bind, but basic validation)
_, err := net.LookupHost(host)
if err != nil {
// Not an IP; assume it's a valid hostname for now
}
}
return nil
}
// validateRQLiteDSN checks if an RQLite DSN is a valid URL
func validateRQLiteDSN(dsn string) error {
u, err := url.Parse(dsn)
if err != nil {
return fmt.Errorf("invalid URL: %v", err)
}
if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("scheme must be http or https; got %q", u.Scheme)
}
if u.Host == "" {
return fmt.Errorf("host must not be empty")
}
return nil
}
// extractTCPPort extracts the TCP port from a multiaddr string.
// It assumes the multiaddr is in the format /ip{4,6}/.../tcp/<port>/p2p/<peerID>.
func extractTCPPort(multiaddrStr string) string {
// Find the last /tcp/ component
lastTCPIndex := strings.LastIndex(multiaddrStr, "/tcp/")
if lastTCPIndex == -1 {
return ""
}
// Extract the port part after /tcp/
portPart := multiaddrStr[lastTCPIndex+len("/tcp/"):]
// Find the first / component after the port part
firstSlashIndex := strings.Index(portPart, "/")
if firstSlashIndex == -1 {
return portPart
}
return portPart[:firstSlashIndex]
}

View File

@ -6,6 +6,7 @@ import (
mathrand "math/rand" mathrand "math/rand"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"time" "time"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
@ -384,6 +385,16 @@ func (n *Node) startLibP2P() error {
func (n *Node) loadOrCreateIdentity() (crypto.PrivKey, error) { func (n *Node) loadOrCreateIdentity() (crypto.PrivKey, error) {
identityFile := filepath.Join(n.config.Node.DataDir, "identity.key") identityFile := filepath.Join(n.config.Node.DataDir, "identity.key")
// Expand ~ in data directory path
identityFile = os.ExpandEnv(identityFile)
if strings.HasPrefix(identityFile, "~") {
home, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("failed to determine home directory: %w", err)
}
identityFile = filepath.Join(home, identityFile[1:])
}
// Try to load existing identity using the shared package // Try to load existing identity using the shared package
if _, err := os.Stat(identityFile); err == nil { if _, err := os.Stat(identityFile); err == nil {
info, err := encryption.LoadIdentity(identityFile) info, err := encryption.LoadIdentity(identityFile)
@ -489,8 +500,19 @@ func (n *Node) Stop() error {
func (n *Node) Start(ctx context.Context) error { func (n *Node) Start(ctx context.Context) error {
n.logger.Info("Starting network node", zap.String("data_dir", n.config.Node.DataDir)) n.logger.Info("Starting network node", zap.String("data_dir", n.config.Node.DataDir))
// Expand ~ in data directory path
dataDir := n.config.Node.DataDir
dataDir = os.ExpandEnv(dataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err)
}
dataDir = filepath.Join(home, dataDir[1:])
}
// Create data directory // Create data directory
if err := os.MkdirAll(n.config.Node.DataDir, 0755); err != nil { if err := os.MkdirAll(dataDir, 0755); err != nil {
return fmt.Errorf("failed to create data directory: %w", err) return fmt.Errorf("failed to create data directory: %w", err)
} }

View File

@ -69,8 +69,18 @@ func NewRQLiteManager(cfg *config.DatabaseConfig, discoveryCfg *config.Discovery
// Start starts the RQLite node // Start starts the RQLite node
func (r *RQLiteManager) Start(ctx context.Context) error { func (r *RQLiteManager) Start(ctx context.Context) error {
// Expand ~ in data directory path
dataDir := os.ExpandEnv(r.dataDir)
if strings.HasPrefix(dataDir, "~") {
home, err := os.UserHomeDir()
if err != nil {
return fmt.Errorf("failed to determine home directory: %w", err)
}
dataDir = filepath.Join(home, dataDir[1:])
}
// Create data directory // Create data directory
rqliteDataDir := filepath.Join(r.dataDir, "rqlite") rqliteDataDir := filepath.Join(dataDir, "rqlite")
if err := os.MkdirAll(rqliteDataDir, 0755); err != nil { if err := os.MkdirAll(rqliteDataDir, 0755); err != nil {
return fmt.Errorf("failed to create RQLite data directory: %w", err) return fmt.Errorf("failed to create RQLite data directory: %w", err)
} }
@ -100,7 +110,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
// Wait for join target to become reachable to avoid forming a separate cluster (wait indefinitely) // Wait for join target to become reachable to avoid forming a separate cluster (wait indefinitely)
if err := r.waitForJoinTarget(ctx, joinArg, 0); err != nil { if err := r.waitForJoinTarget(ctx, r.config.RQLiteJoinAddress, 0); err != nil {
r.logger.Warn("Join target did not become reachable within timeout; will still attempt to join", r.logger.Warn("Join target did not become reachable within timeout; will still attempt to join",
zap.String("join_address", r.config.RQLiteJoinAddress), zap.String("join_address", r.config.RQLiteJoinAddress),
zap.Error(err)) zap.Error(err))
@ -126,7 +136,7 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
// Start RQLite process (not bound to ctx for graceful Stop handling) // Start RQLite process (not bound to ctx for graceful Stop handling)
r.cmd = exec.Command("rqlited", args...) r.cmd = exec.Command("rqlited", args...)
// Uncomment if you want to see the stdout/stderr of the RQLite process // Enable debug logging of RQLite process to help diagnose issues
// r.cmd.Stdout = os.Stdout // r.cmd.Stdout = os.Stdout
// r.cmd.Stderr = os.Stderr // r.cmd.Stderr = os.Stderr
@ -166,7 +176,15 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
} }
} else { } else {
r.logger.Info("Waiting for RQLite SQL availability (leader discovery)") r.logger.Info("Waiting for RQLite SQL availability (leader discovery)")
if err := r.waitForSQLAvailable(ctx); err != nil { // For joining nodes, wait longer for SQL availability
sqlCtx := ctx
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// If no deadline in context, create one for SQL availability check
var cancel context.CancelFunc
sqlCtx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
}
if err := r.waitForSQLAvailable(sqlCtx); err != nil {
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
_ = r.cmd.Process.Kill() _ = r.cmd.Process.Kill()
} }
@ -207,7 +225,9 @@ func (r *RQLiteManager) waitForReady(ctx context.Context) error {
url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort) url := fmt.Sprintf("http://localhost:%d/status", r.config.RQLitePort)
client := &http.Client{Timeout: 2 * time.Second} client := &http.Client{Timeout: 2 * time.Second}
for i := 0; i < 30; i++ { // Give joining nodes more time (120 seconds vs 30)
maxAttempts := 30
for i := 0; i < maxAttempts; i++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()