diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md deleted file mode 100644 index d235ee2..0000000 --- a/.github/copilot-instructions.md +++ /dev/null @@ -1,46 +0,0 @@ - - -# Network - Distributed P2P Database System - -This is a distributed peer-to-peer network project built with Go and LibP2P. The system provides decentralized database capabilities with consensus and replication. - -## Key Components - -- **LibP2P Network Layer**: Core networking built on LibP2P for P2P communication -- **Distributed Database**: RQLite-based distributed SQLite with Raft consensus -- **Client Library**: Go API for applications to interact with the network -- **Application Isolation**: Each app gets isolated namespaces for data and messaging - -## Development Guidelines - -1. **Architecture Patterns**: Follow the client-server pattern where applications use the client library to interact with the distributed network -2. **Namespacing**: All data (database, storage, pub/sub) is namespaced per application to ensure isolation -3. **Error Handling**: Always check for connection status before performing operations -4. **Async Operations**: Use context.Context for cancellation and timeouts -5. **Logging**: Use structured logging with appropriate log levels - -## Code Style - -- Use standard Go conventions and naming -- Implement interfaces for testability -- Include comprehensive error messages -- Add context parameters to all network operations -- Use dependency injection for components - -## Testing Strategy - -- Unit tests for individual components -- Integration tests for client library -- E2E tests for full network scenarios -- Mock external dependencies (LibP2P, database) - -## Future Applications - -This network is designed to support applications like: - -- Anchat (encrypted messaging) -- Distributed file storage -- IoT data collection -- Social networks - -When implementing applications, they should use the client library rather than directly accessing network internals. diff --git a/AI_CONTEXT.md b/AI_CONTEXT.md index e84be44..1640375 100644 --- a/AI_CONTEXT.md +++ b/AI_CONTEXT.md @@ -118,10 +118,9 @@ Create a robust, decentralized network platform that enables applications to sea ## Codebase Structure ``` -debros-testing/ +network/ ├── cmd/ # Executables -│ ├── bootstrap/main.go # Bootstrap node (network entry point) -│ ├── node/main.go # Regular network node +│ ├── node/main.go # Network node (bootstrap via flag/auto) │ └── cli/main.go # Command-line interface ├── pkg/ # Core packages │ ├── client/ # Client API and implementations @@ -358,7 +357,7 @@ discovery: database: rqlite_port: 5001 rqlite_raft_port: 7001 - rqlite_join_address: "http://localhost:5001" + rqlite_join_address: "localhost:7001" ``` ## API Reference @@ -366,7 +365,7 @@ database: ### Client Creation ```go -import "network/pkg/client" +import "git.debros.io/DeBros/network/pkg/client" config := client.DefaultClientConfig("my-app") config.BootstrapPeers = []string{"/ip4/127.0.0.1/tcp/4001/p2p/{PEER_ID}"} diff --git a/Makefile b/Makefile index 4b55742..ca428c9 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # Network - Distributed P2P Database System # Makefile for development and build tasks -.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet +.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports # Build targets build: deps @@ -42,6 +42,11 @@ run-node3: @if [ -z "$(BOOTSTRAP)" ]; then echo "ERROR: Provide BOOTSTRAP multiaddr: make run-node3 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/ [HTTP=5003 RAFT=7003 P2P=4003]"; exit 1; fi go run cmd/node/main.go -role node -id node3 -data ./data/node3 -bootstrap $(BOOTSTRAP) -rqlite-http-port $${HTTP:-5003} -rqlite-raft-port $${RAFT:-7003} -p2p-port $${P2P:-4003} -advertise $${ADVERTISE:-localhost} +# Run basic usage example +run-example: + @echo "Running basic usage example..." + go run examples/basic_usage.go + # Show how to run with flags show-bootstrap: @echo "Provide bootstrap via flags, e.g.:" @@ -96,10 +101,20 @@ vet: @echo "Vetting code..." go vet ./... +# Lint alias (lightweight for now) +lint: fmt vet + @echo "Linting complete (fmt + vet)" + +# Clear common development ports +clear-ports: + @echo "Clearing common dev ports (4001/4002, 5001/5002, 7001/7002)..." + @chmod +x scripts/clear-ports.sh || true + @scripts/clear-ports.sh + # Development setup dev-setup: deps @echo "Setting up development environment..." - @mkdir -p data/bootstrap data/node data/node-node2 data/node-node3 + @mkdir -p data/bootstrap data/node data/node2 data/node3 @mkdir -p data/test-bootstrap data/test-node1 data/test-node2 @echo "Development setup complete!" @@ -168,6 +183,8 @@ help: @echo " tidy - Tidy dependencies" @echo " fmt - Format code" @echo " vet - Vet code" + @echo " lint - Lint code (fmt + vet)" + @echo " clear-ports - Clear common dev ports" @echo " dev-setup - Setup development environment" @echo " dev-cluster - Show cluster startup commands" @echo " dev - Full development workflow" diff --git a/README.md b/README.md index bcd1113..e4a41e0 100644 --- a/README.md +++ b/README.md @@ -31,12 +31,8 @@ A distributed peer-to-peer network built with Go and LibP2P, providing decentral - [Network Discovery](#network-discovery) - [Updates and Maintenance](#updates-and-maintenance) - [Monitoring and Troubleshooting](#monitoring-and-troubleshooting) -- [Environment Configuration](#environment-configuration) - - [Bootstrap Peers Configuration](#bootstrap-peers-configuration) - - [Setup for Development](#setup-for-development) - - [Configuration Files](#configuration-files-1) - - [Multiple Bootstrap Peers](#multiple-bootstrap-peers) - - [Checking Configuration](#checking-configuration) +- [Configuration](#configuration) + - [Bootstrap and Ports (via flags)](#bootstrap-and-ports-via-flags) - [CLI Commands](#cli-commands) - [Network Operations](#network-operations) - [Storage Operations](#storage-operations) @@ -226,14 +222,14 @@ go run cmd/node/main.go \ make show-bootstrap # Check network health -./bin/cli health +./bin/network-cli health # Test storage operations -./bin/cli storage put test-key "Hello Network" -./bin/cli storage get test-key +./bin/network-cli storage put test-key "Hello Network" +./bin/network-cli storage get test-key # List connected peers -./bin/cli peers +./bin/network-cli peers ``` ## Deployment @@ -334,9 +330,9 @@ sudo systemctl enable debros-node sudo systemctl disable debros-node # Use CLI tools -/opt/debros/bin/cli health -/opt/debros/bin/cli peers -/opt/debros/bin/cli storage put key value +/opt/debros/bin/network-cli health +/opt/debros/bin/network-cli peers +/opt/debros/bin/network-cli storage put key value ``` #### Configuration Files @@ -407,8 +403,8 @@ sudo netstat -tuln | grep -E "(4001|5001|7001)" sudo journalctl -u debros-node.service --since "1 hour ago" # Check network connectivity -/opt/debros/bin/cli health -/opt/debros/bin/cli peers +/opt/debros/bin/network-cli health +/opt/debros/bin/network-cli peers # Check disk usage du -sh /opt/debros/data/* @@ -429,6 +425,41 @@ For more advanced configuration options and development setup, see the sections Examples are shown in Quick Start above for local multi-node on a single machine. +### Environment Variables + +Precedence: CLI flags > Environment variables > Code defaults. Set any of the following in your shell or `.env`: + +- NODE_ID: custom node identifier (e.g. "node2") +- NODE_TYPE: "bootstrap" or "node" +- NODE_LISTEN_ADDRESSES: comma-separated multiaddrs (e.g. "/ip4/0.0.0.0/tcp/4001,/ip4/0.0.0.0/udp/4001/quic") +- DATA_DIR: node data directory (default `./data`) +- MAX_CONNECTIONS: max peer connections (int) + +- DB_DATA_DIR: database data directory (default `./data/db`) +- REPLICATION_FACTOR: int (default 3) +- SHARD_COUNT: int (default 16) +- MAX_DB_SIZE: e.g. "1g", "512m", or bytes +- BACKUP_INTERVAL: Go duration (e.g. "24h") +- RQLITE_HTTP_PORT: int (default 5001) +- RQLITE_RAFT_PORT: int (default 7001) +- RQLITE_JOIN_ADDRESS: host:port for Raft join (regular nodes) +- ADVERTISE_MODE: "auto" | "localhost" | "ip" + +- BOOTSTRAP_PEERS: comma-separated multiaddrs for bootstrap peers +- ENABLE_MDNS: true/false +- ENABLE_DHT: true/false +- DHT_PREFIX: string (default `/network/kad/1.0.0`) +- DISCOVERY_INTERVAL: duration (e.g. "5m") + +- ENABLE_TLS: true/false +- PRIVATE_KEY_FILE: path +- CERT_FILE: path +- AUTH_ENABLED: true/false + +- LOG_LEVEL: "debug" | "info" | "warn" | "error" +- LOG_FORMAT: "json" | "console" +- LOG_OUTPUT_FILE: path (empty = stdout) + ## CLI Commands The CLI can still accept `--bootstrap ` to override discovery when needed. @@ -436,32 +467,32 @@ The CLI can still accept `--bootstrap ` to override discovery when ne ### Network Operations ```bash -./bin/cli health # Check network health -./bin/cli status # Get network status -./bin/cli peers # List connected peers +./bin/network-cli health # Check network health +./bin/network-cli status # Get network status +./bin/network-cli peers # List connected peers ``` ### Storage Operations ```bash -./bin/cli storage put # Store data -./bin/cli storage get # Retrieve data -./bin/cli storage list [prefix] # List keys +./bin/network-cli storage put # Store data +./bin/network-cli storage get # Retrieve data +./bin/network-cli storage list [prefix] # List keys ``` ### Database Operations ```bash -./bin/cli query "SELECT * FROM table" # Execute SQL -./bin/cli query "CREATE TABLE users (id INTEGER)" # DDL operations +./bin/network-cli query "SELECT * FROM table" # Execute SQL +./bin/network-cli query "CREATE TABLE users (id INTEGER)" # DDL operations ``` ### Pub/Sub Messaging ```bash -./bin/cli pubsub publish # Send message -./bin/cli pubsub subscribe [duration] # Listen for messages -./bin/cli pubsub topics # List active topics +./bin/network-cli pubsub publish # Send message +./bin/network-cli pubsub subscribe [duration] # Listen for messages +./bin/network-cli pubsub topics # List active topics ``` ### CLI Options @@ -479,8 +510,7 @@ The CLI can still accept `--bootstrap ` to override discovery when ne ``` network/ ├── cmd/ -│ ├── bootstrap/ # Bootstrap node -│ ├── node/ # Regular network node +│ ├── node/ # Network node (bootstrap via flag) │ └── cli/ # Command-line interface ├── pkg/ │ ├── client/ # Client library @@ -489,8 +519,7 @@ network/ │ ├── storage/ # Storage service │ ├── constants/ # Bootstrap configuration │ └── config/ # System configuration -├── scripts/ # Helper scripts (install, security, tests) -├── scripts/ # Helper scripts (install, security, tests) +├── scripts/ # Helper scripts (install, security, tests) ├── bin/ # Built executables ``` @@ -546,8 +575,8 @@ make dev make run-node # Terminal 3: Test with CLI - ./bin/cli health - ./bin/cli peers + ./bin/network-cli health + ./bin/network-cli peers ``` ### Environment Setup @@ -607,7 +636,7 @@ package main import ( "context" "log" - "network/pkg/client" + "git.debros.io/DeBros/network/pkg/client" ) func main() { diff --git a/clear-ports.sh b/clear-ports.sh deleted file mode 100755 index 12bfdb3..0000000 --- a/clear-ports.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash -# clear-ports.sh -# Safely terminate any processes listening on specified TCP ports. -# Usage: -# ./clear-ports.sh # clears 4001, 5001, 7001 by default -# ./clear-ports.sh 4001 5001 7001 # clears the specified ports - -set -euo pipefail - -# Collect ports from args or use defaults -PORTS=("$@") -if [ ${#PORTS[@]} -eq 0 ]; then - PORTS=(4001 4002 5001 5002 7001 7002) -fi - -echo "Gracefully terminating listeners on: ${PORTS[*]}" -for p in "${PORTS[@]}"; do - PIDS=$(lsof -t -n -P -iTCP:"$p" -sTCP:LISTEN 2>/dev/null || true) - if [ -n "$PIDS" ]; then - echo "Port $p -> PIDs: $PIDS (SIGTERM)" - # shellcheck disable=SC2086 - kill -TERM $PIDS 2>/dev/null || true - else - echo "Port $p -> no listeners" - fi -done - -sleep 1 - -echo "Force killing any remaining listeners..." -for p in "${PORTS[@]}"; do - PIDS=$(lsof -t -n -P -iTCP:"$p" -sTCP:LISTEN 2>/dev/null || true) - if [ -n "$PIDS" ]; then - echo "Port $p -> PIDs: $PIDS (SIGKILL)" - # shellcheck disable=SC2086 - kill -9 $PIDS 2>/dev/null || true - else - echo "Port $p -> none remaining" - fi -done - -echo "\nVerification (should be empty):" -for p in "${PORTS[@]}"; do - echo "--- Port $p ---" - lsof -n -P -iTCP:"$p" -sTCP:LISTEN 2>/dev/null || true - echo -done - diff --git a/cmd/node/main.go b/cmd/node/main.go index a4ed9eb..46faa39 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -88,6 +88,10 @@ func main() { logger.Printf("Starting regular node...") } + // Apply environment variable overrides before applying CLI flags so that + // precedence is: flags > env > defaults + config.ApplyEnvOverrides(cfg) + // Set basic configuration cfg.Node.DataDir = *dataDir cfg.Node.ListenAddresses = []string{ @@ -153,8 +157,11 @@ func main() { } logger.Printf("Using command line bootstrap peer: %s", *bootstrap) } else { - // Use environment-configured bootstrap peers - bootstrapPeers := constants.GetBootstrapPeers() + // Use environment-configured bootstrap peers if provided; otherwise fallback to constants + bootstrapPeers := cfg.Discovery.BootstrapPeers + if len(bootstrapPeers) == 0 { + bootstrapPeers = constants.GetBootstrapPeers() + } if len(bootstrapPeers) > 0 { cfg.Discovery.BootstrapPeers = bootstrapPeers // Use the first bootstrap peer for RQLite join diff --git a/generate-bootstrap-identity.go b/generate-bootstrap-identity.go deleted file mode 100644 index 1641a4c..0000000 --- a/generate-bootstrap-identity.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -import ( - "crypto/rand" - "fmt" - "os" - "path/filepath" - - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" -) - -func main() { - // Generate a fixed identity - priv, pub, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, rand.Reader) - if err != nil { - panic(err) - } - - // Get peer ID - peerID, err := peer.IDFromPublicKey(pub) - if err != nil { - panic(err) - } - - fmt.Printf("Generated Peer ID: %s\n", peerID.String()) - - // Marshal private key - data, err := crypto.MarshalPrivateKey(priv) - if err != nil { - panic(err) - } - - // Create data directory - dataDir := "./data/bootstrap" - if err := os.MkdirAll(dataDir, 0755); err != nil { - panic(err) - } - - // Save identity - identityFile := filepath.Join(dataDir, "identity.key") - if err := os.WriteFile(identityFile, data, 0600); err != nil { - panic(err) - } - - fmt.Printf("Identity saved to: %s\n", identityFile) - fmt.Printf("Bootstrap address: /ip4/127.0.0.1/tcp/4001/p2p/%s\n", peerID.String()) -} diff --git a/pkg/client/client.go b/pkg/client/client.go index 603e9cf..cba6930 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -84,22 +84,8 @@ func NewClient(config *ClientConfig) (NetworkClient, error) { return nil, fmt.Errorf("app name is required") } - // Create zap logger - use different config for quiet mode - var logger *zap.Logger - var err error - if config.QuietMode { - // For quiet mode, only show warnings and errors - zapConfig := zap.NewProductionConfig() - zapConfig.Level = zap.NewAtomicLevelAt(zap.WarnLevel) - // Disable caller info for cleaner output - zapConfig.DisableCaller = true - // Disable stacktrace for cleaner output - zapConfig.DisableStacktrace = true - logger, err = zapConfig.Build() - } else { - // Development logger shows debug/info logs - logger, err = zap.NewDevelopment() - } + // Create zap logger via helper for consistency + logger, err := newClientLogger(config.QuietMode) if err != nil { return nil, fmt.Errorf("failed to create logger: %w", err) } diff --git a/pkg/client/logging.go b/pkg/client/logging.go new file mode 100644 index 0000000..f0404c3 --- /dev/null +++ b/pkg/client/logging.go @@ -0,0 +1,19 @@ +package client + +import ( + "go.uber.org/zap" +) + +// newClientLogger creates a zap.Logger based on quiet mode preference. +// Quiet mode returns a production logger with Warn+ level and reduced noise. +// Non-quiet returns a development logger with debug/info output. +func newClientLogger(quiet bool) (*zap.Logger, error) { + if quiet { + cfg := zap.NewProductionConfig() + cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel) + cfg.DisableCaller = true + cfg.DisableStacktrace = true + return cfg.Build() + } + return zap.NewDevelopment() +} diff --git a/pkg/config/config.go b/pkg/config/config.go index c91c914..a7a3103 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,6 +1,9 @@ package config import ( + "os" + "strconv" + "strings" "time" "github.com/multiformats/go-multiaddr" @@ -156,3 +159,184 @@ func BootstrapConfig() *Config { } return config } + +// NewConfigFromEnv constructs a config (bootstrap or regular) and applies environment overrides. +// If isBootstrap is true, starts from BootstrapConfig; otherwise from DefaultConfig. +func NewConfigFromEnv(isBootstrap bool) *Config { + var cfg *Config + if isBootstrap { + cfg = BootstrapConfig() + } else { + cfg = DefaultConfig() + } + ApplyEnvOverrides(cfg) + return cfg +} + +// ApplyEnvOverrides mutates cfg based on environment variables. +// Precedence: CLI flags (outside this function) > ENV variables > defaults in code. +func ApplyEnvOverrides(cfg *Config) { + // Node + if v := os.Getenv("NODE_ID"); v != "" { + cfg.Node.ID = v + } + if v := os.Getenv("NODE_TYPE"); v != "" { // "bootstrap" or "node" + cfg.Node.Type = strings.ToLower(v) + cfg.Node.IsBootstrap = cfg.Node.Type == "bootstrap" + } + if v := os.Getenv("NODE_LISTEN_ADDRESSES"); v != "" { + parts := splitAndTrim(v) + if len(parts) > 0 { + cfg.Node.ListenAddresses = parts + } + } + if v := os.Getenv("DATA_DIR"); v != "" { + cfg.Node.DataDir = v + } + if v := os.Getenv("MAX_CONNECTIONS"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + cfg.Node.MaxConnections = n + } + } + + // Database + if v := os.Getenv("DB_DATA_DIR"); v != "" { + cfg.Database.DataDir = v + } + if v := os.Getenv("REPLICATION_FACTOR"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + cfg.Database.ReplicationFactor = n + } + } + if v := os.Getenv("SHARD_COUNT"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + cfg.Database.ShardCount = n + } + } + if v := os.Getenv("MAX_DB_SIZE"); v != "" { // bytes + if n, err := parseInt64(v); err == nil { + cfg.Database.MaxDatabaseSize = n + } + } + if v := os.Getenv("BACKUP_INTERVAL"); v != "" { // duration, e.g. 24h + if d, err := time.ParseDuration(v); err == nil { + cfg.Database.BackupInterval = d + } + } + if v := os.Getenv("RQLITE_HTTP_PORT"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + cfg.Database.RQLitePort = n + } + } + if v := os.Getenv("RQLITE_RAFT_PORT"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + cfg.Database.RQLiteRaftPort = n + } + } + if v := os.Getenv("RQLITE_JOIN_ADDRESS"); v != "" { + cfg.Database.RQLiteJoinAddress = v + } + if v := os.Getenv("ADVERTISE_MODE"); v != "" { // auto | localhost | ip + cfg.Database.AdvertiseMode = strings.ToLower(v) + } + + // Discovery + if v := os.Getenv("BOOTSTRAP_PEERS"); v != "" { + parts := splitAndTrim(v) + if len(parts) > 0 { + cfg.Discovery.BootstrapPeers = parts + } + } + if v := os.Getenv("ENABLE_MDNS"); v != "" { + if b, err := parseBool(v); err == nil { + cfg.Discovery.EnableMDNS = b + } + } + if v := os.Getenv("ENABLE_DHT"); v != "" { + if b, err := parseBool(v); err == nil { + cfg.Discovery.EnableDHT = b + } + } + if v := os.Getenv("DHT_PREFIX"); v != "" { + cfg.Discovery.DHTPrefix = v + } + if v := os.Getenv("DISCOVERY_INTERVAL"); v != "" { // e.g. 5m + if d, err := time.ParseDuration(v); err == nil { + cfg.Discovery.DiscoveryInterval = d + } + } + + // Security + if v := os.Getenv("ENABLE_TLS"); v != "" { + if b, err := parseBool(v); err == nil { + cfg.Security.EnableTLS = b + } + } + if v := os.Getenv("PRIVATE_KEY_FILE"); v != "" { + cfg.Security.PrivateKeyFile = v + } + if v := os.Getenv("CERT_FILE"); v != "" { + cfg.Security.CertificateFile = v + } + if v := os.Getenv("AUTH_ENABLED"); v != "" { + if b, err := parseBool(v); err == nil { + cfg.Security.AuthEnabled = b + } + } + + // Logging + if v := os.Getenv("LOG_LEVEL"); v != "" { + cfg.Logging.Level = strings.ToLower(v) + } + if v := os.Getenv("LOG_FORMAT"); v != "" { + cfg.Logging.Format = strings.ToLower(v) + } + if v := os.Getenv("LOG_OUTPUT_FILE"); v != "" { + cfg.Logging.OutputFile = v + } +} + +// Helpers +func splitAndTrim(csv string) []string { + parts := strings.Split(csv, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + s := strings.TrimSpace(p) + if s != "" { + out = append(out, s) + } + } + return out +} + +func parseBool(s string) (bool, error) { + switch strings.ToLower(strings.TrimSpace(s)) { + case "1", "true", "t", "yes", "y", "on": + return true, nil + case "0", "false", "f", "no", "n", "off": + return false, nil + default: + return strconv.ParseBool(s) + } +} + +func parseInt64(s string) (int64, error) { + // Allow plain int or with optional suffixes k, m, g (base-1024) + s = strings.TrimSpace(strings.ToLower(s)) + mul := int64(1) + if strings.HasSuffix(s, "k") { + mul = 1024 + s = strings.TrimSuffix(s, "k") + } else if strings.HasSuffix(s, "m") { + mul = 1024 * 1024 + s = strings.TrimSuffix(s, "m") + } else if strings.HasSuffix(s, "g") { + mul = 1024 * 1024 * 1024 + s = strings.TrimSuffix(s, "g") + } + n, err := strconv.ParseInt(strings.TrimSpace(s), 10, 64) + if err != nil { + return 0, err + } + return n * mul, nil +} diff --git a/pkg/pubsub/discovery_integration.go b/pkg/pubsub/discovery_integration.go new file mode 100644 index 0000000..02d3bd6 --- /dev/null +++ b/pkg/pubsub/discovery_integration.go @@ -0,0 +1,130 @@ +package pubsub + +import ( + "context" + "time" + + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +// announceTopicInterest helps with peer discovery by announcing interest in a topic +func (m *Manager) announceTopicInterest(topicName string) { + // Wait a bit for the subscription to be established + time.Sleep(100 * time.Millisecond) + + // Get the topic + m.mu.RLock() + topic, exists := m.topics[topicName] + m.mu.RUnlock() + + if !exists { + return + } + + // For Anchat specifically, be more aggressive about finding peers + if len(m.namespace) > 6 && m.namespace[:6] == "anchat" { + go m.aggressiveTopicPeerDiscovery(topicName, topic) + } else { + // Start a periodic check to monitor topic peer growth + go m.monitorTopicPeers(topicName, topic) + } +} + +// aggressiveTopicPeerDiscovery for Anchat - actively seeks topic peers +func (m *Manager) aggressiveTopicPeerDiscovery(topicName string, topic *pubsub.Topic) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for i := 0; i < 30; i++ { // Monitor for 30 seconds + <-ticker.C + peers := topic.ListPeers() + + // If we have peers, reduce frequency but keep monitoring + if len(peers) > 0 { + // Switch to normal monitoring once we have peers + go m.monitorTopicPeers(topicName, topic) + return + } + + // For Anchat, try to actively discover and connect to peers on this topic + // This is critical because LibP2P pubsub requires direct connections for message propagation + m.forceTopicPeerDiscovery(topicName, topic) + } +} + +// enhancedAnchatTopicDiscovery implements enhanced peer discovery specifically for Anchat +func (m *Manager) enhancedAnchatTopicDiscovery(topicName string, topic *pubsub.Topic) { + // Wait for subscription to be fully established + time.Sleep(200 * time.Millisecond) + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for i := 0; i < 20; i++ { // Monitor for 20 seconds + <-ticker.C + + peers := topic.ListPeers() + if len(peers) > 0 { + // Success! We found topic peers + return + } + + // Try various discovery strategies + if i%3 == 0 { + // Strategy: Send discovery heartbeat + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + discoveryMsg := []byte("ANCHAT_DISCOVERY_PING") + topic.Publish(ctx, discoveryMsg) + cancel() + } + + // Wait a bit and check again + time.Sleep(500 * time.Millisecond) + peers = topic.ListPeers() + if len(peers) > 0 { + return + } + } +} + +// forceTopicPeerDiscovery uses multiple strategies to find and connect to topic peers +func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) { + // Strategy 1: Check if pubsub knows about any peers for this topic + peers := topic.ListPeers() + if len(peers) > 0 { + return // We already have peers + } + + // Strategy 2: Try to actively announce our presence and wait for responses + // Send a ping/heartbeat to the topic to announce our presence + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // Create a discovery message to announce our presence on this topic + discoveryMsg := []byte("ANCHAT_PEER_DISCOVERY") + topic.Publish(ctx, discoveryMsg) + + // Strategy 3: Wait briefly and check again + time.Sleep(500 * time.Millisecond) + _ = topic.ListPeers() // Check again but we don't need to use the result + + // Note: In LibP2P, topics don't automatically form connections between subscribers + // The underlying network layer needs to ensure peers are connected first + // This is why our enhanced client peer discovery is crucial +} + +// monitorTopicPeers periodically checks topic peer connectivity +func (m *Manager) monitorTopicPeers(topicName string, topic *pubsub.Topic) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for i := 0; i < 6; i++ { // Monitor for 30 seconds + <-ticker.C + peers := topic.ListPeers() + + // If we have peers, we're good + if len(peers) > 0 { + return + } + } +} diff --git a/pkg/pubsub/logging.go b/pkg/pubsub/logging.go new file mode 100644 index 0000000..d221994 --- /dev/null +++ b/pkg/pubsub/logging.go @@ -0,0 +1,17 @@ +package pubsub + +import "go.uber.org/zap" + +// newPubSubLogger creates a zap.Logger for pubsub components. +// Quiet mode can be handled by callers by using production config externally; +// here we default to development logger for richer diagnostics during dev. +func newPubSubLogger(quiet bool) (*zap.Logger, error) { + if quiet { + cfg := zap.NewProductionConfig() + cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel) + cfg.DisableCaller = true + cfg.DisableStacktrace = true + return cfg.Build() + } + return zap.NewDevelopment() +} diff --git a/pkg/pubsub/manager.go b/pkg/pubsub/manager.go index 592b17a..cd911aa 100644 --- a/pkg/pubsub/manager.go +++ b/pkg/pubsub/manager.go @@ -1,332 +1,32 @@ package pubsub import ( - "context" - "fmt" - "sync" - "time" + "sync" - pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub "github.com/libp2p/go-libp2p-pubsub" ) // Manager handles pub/sub operations type Manager struct { - pubsub *pubsub.PubSub - topics map[string]*pubsub.Topic - subscriptions map[string]*subscription - namespace string - mu sync.RWMutex + pubsub *pubsub.PubSub + topics map[string]*pubsub.Topic + subscriptions map[string]*subscription + namespace string + mu sync.RWMutex } // subscription holds subscription data type subscription struct { - sub *pubsub.Subscription - cancel context.CancelFunc + sub *pubsub.Subscription + cancel func() } // NewManager creates a new pubsub manager func NewManager(ps *pubsub.PubSub, namespace string) *Manager { - return &Manager{ - pubsub: ps, - topics: make(map[string]*pubsub.Topic), - subscriptions: make(map[string]*subscription), - namespace: namespace, - } -} - -// getOrCreateTopic gets an existing topic or creates a new one -func (m *Manager) getOrCreateTopic(topicName string) (*pubsub.Topic, error) { - m.mu.Lock() - defer m.mu.Unlock() - - // Return existing topic if available - if topic, exists := m.topics[topicName]; exists { - return topic, nil - } - - // Join the topic - LibP2P allows multiple clients to join the same topic - topic, err := m.pubsub.Join(topicName) - if err != nil { - return nil, fmt.Errorf("failed to join topic: %w", err) - } - - m.topics[topicName] = topic - return topic, nil -} - -// Subscribe subscribes to a topic -func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { - if m.pubsub == nil { - return fmt.Errorf("pubsub not initialized") - } - - namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) - - // Check if already subscribed - m.mu.Lock() - if _, exists := m.subscriptions[namespacedTopic]; exists { - m.mu.Unlock() - // Already subscribed - this is normal for LibP2P pubsub - return nil - } - m.mu.Unlock() - - // Get or create topic - libp2pTopic, err := m.getOrCreateTopic(namespacedTopic) - if err != nil { - return fmt.Errorf("failed to get topic: %w", err) - } - - // Subscribe to topic - sub, err := libp2pTopic.Subscribe() - if err != nil { - return fmt.Errorf("failed to subscribe to topic: %w", err) - } - - // Create cancellable context for this subscription - subCtx, cancel := context.WithCancel(context.Background()) - - // Store subscription - m.mu.Lock() - m.subscriptions[namespacedTopic] = &subscription{ - sub: sub, - cancel: cancel, - } - m.mu.Unlock() - - // Start message handler goroutine - go func() { - defer func() { - sub.Cancel() - }() - - for { - select { - case <-subCtx.Done(): - return - default: - msg, err := sub.Next(subCtx) - if err != nil { - if subCtx.Err() != nil { - return // Context cancelled - } - continue - } - - // Call the handler - if err := handler(topic, msg.Data); err != nil { - // Log error but continue processing - continue - } - } - } - }() - - // Force peer discovery for this topic - go m.announceTopicInterest(namespacedTopic) - - // For Anchat, also try to actively find topic peers through the libp2p pubsub system - if len(m.namespace) > 6 && m.namespace[:6] == "anchat" { - go m.enhancedAnchatTopicDiscovery(namespacedTopic, libp2pTopic) - } - - return nil -} - -// Publish publishes a message to a topic -func (m *Manager) Publish(ctx context.Context, topic string, data []byte) error { - if m.pubsub == nil { - return fmt.Errorf("pubsub not initialized") - } - - namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) - - // Get or create topic - libp2pTopic, err := m.getOrCreateTopic(namespacedTopic) - if err != nil { - return fmt.Errorf("failed to get topic for publishing: %w", err) - } - - // Publish message - if err := libp2pTopic.Publish(ctx, data); err != nil { - return fmt.Errorf("failed to publish message: %w", err) - } - - return nil -} - -// Unsubscribe unsubscribes from a topic -func (m *Manager) Unsubscribe(ctx context.Context, topic string) error { - m.mu.Lock() - defer m.mu.Unlock() - - namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) - - if subscription, exists := m.subscriptions[namespacedTopic]; exists { - // Cancel the subscription context to stop the message handler goroutine - subscription.cancel() - delete(m.subscriptions, namespacedTopic) - } - - return nil -} - -// ListTopics returns all subscribed topics -func (m *Manager) ListTopics(ctx context.Context) ([]string, error) { - m.mu.RLock() - defer m.mu.RUnlock() - - var topics []string - prefix := m.namespace + "." - - for topic := range m.subscriptions { - if len(topic) > len(prefix) && topic[:len(prefix)] == prefix { - originalTopic := topic[len(prefix):] - topics = append(topics, originalTopic) - } - } - - return topics, nil -} - -// Close closes all subscriptions and topics -func (m *Manager) Close() error { - m.mu.Lock() - defer m.mu.Unlock() - - // Cancel all subscriptions - for _, sub := range m.subscriptions { - sub.cancel() - } - m.subscriptions = make(map[string]*subscription) - - // Close all topics - for _, topic := range m.topics { - topic.Close() - } - m.topics = make(map[string]*pubsub.Topic) - - return nil -} - -// announceTopicInterest helps with peer discovery by announcing interest in a topic -func (m *Manager) announceTopicInterest(topicName string) { - // Wait a bit for the subscription to be established - time.Sleep(100 * time.Millisecond) - - // Get the topic - m.mu.RLock() - topic, exists := m.topics[topicName] - m.mu.RUnlock() - - if !exists { - return - } - - // For Anchat specifically, be more aggressive about finding peers - if len(m.namespace) > 6 && m.namespace[:6] == "anchat" { - go m.aggressiveTopicPeerDiscovery(topicName, topic) - } else { - // Start a periodic check to monitor topic peer growth - go m.monitorTopicPeers(topicName, topic) - } -} - -// aggressiveTopicPeerDiscovery for Anchat - actively seeks topic peers -func (m *Manager) aggressiveTopicPeerDiscovery(topicName string, topic *pubsub.Topic) { - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for i := 0; i < 30; i++ { // Monitor for 30 seconds - <-ticker.C - peers := topic.ListPeers() - - // If we have peers, reduce frequency but keep monitoring - if len(peers) > 0 { - // Switch to normal monitoring once we have peers - go m.monitorTopicPeers(topicName, topic) - return - } - - // For Anchat, try to actively discover and connect to peers on this topic - // This is critical because LibP2P pubsub requires direct connections for message propagation - m.forceTopicPeerDiscovery(topicName, topic) - } -} - -// enhancedAnchatTopicDiscovery implements enhanced peer discovery specifically for Anchat -func (m *Manager) enhancedAnchatTopicDiscovery(topicName string, topic *pubsub.Topic) { - // Wait for subscription to be fully established - time.Sleep(200 * time.Millisecond) - - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - for i := 0; i < 20; i++ { // Monitor for 20 seconds - <-ticker.C - - peers := topic.ListPeers() - if len(peers) > 0 { - // Success! We found topic peers - return - } - - // Try various discovery strategies - if i%3 == 0 { - // Strategy: Send discovery heartbeat - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - discoveryMsg := []byte("ANCHAT_DISCOVERY_PING") - topic.Publish(ctx, discoveryMsg) - cancel() - } - - // Wait a bit and check again - time.Sleep(500 * time.Millisecond) - peers = topic.ListPeers() - if len(peers) > 0 { - return - } - } -} - -// forceTopicPeerDiscovery uses multiple strategies to find and connect to topic peers -func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) { - // Strategy 1: Check if pubsub knows about any peers for this topic - peers := topic.ListPeers() - if len(peers) > 0 { - return // We already have peers - } - - // Strategy 2: Try to actively announce our presence and wait for responses - // Send a ping/heartbeat to the topic to announce our presence - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - // Create a discovery message to announce our presence on this topic - discoveryMsg := []byte("ANCHAT_PEER_DISCOVERY") - topic.Publish(ctx, discoveryMsg) - - // Strategy 3: Wait briefly and check again - time.Sleep(500 * time.Millisecond) - _ = topic.ListPeers() // Check again but we don't need to use the result - - // Note: In LibP2P, topics don't automatically form connections between subscribers - // The underlying network layer needs to ensure peers are connected first - // This is why our enhanced client peer discovery is crucial -} - -// monitorTopicPeers periodically checks topic peer connectivity -func (m *Manager) monitorTopicPeers(topicName string, topic *pubsub.Topic) { - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for i := 0; i < 6; i++ { // Monitor for 30 seconds - <-ticker.C - peers := topic.ListPeers() - - // If we have peers, we're good - if len(peers) > 0 { - return - } - } + return &Manager { + pubsub: ps, + topics: make(map[string]*pubsub.Topic), + subscriptions: make(map[string]*subscription), + namespace: namespace, + } } diff --git a/pkg/pubsub/publish.go b/pkg/pubsub/publish.go new file mode 100644 index 0000000..d5b2718 --- /dev/null +++ b/pkg/pubsub/publish.go @@ -0,0 +1,28 @@ +package pubsub + +import ( + "context" + "fmt" +) + +// Publish publishes a message to a topic +func (m *Manager) Publish(ctx context.Context, topic string, data []byte) error { + if m.pubsub == nil { + return fmt.Errorf("pubsub not initialized") + } + + namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) + + // Get or create topic + libp2pTopic, err := m.getOrCreateTopic(namespacedTopic) + if err != nil { + return fmt.Errorf("failed to get topic for publishing: %w", err) + } + + // Publish message + if err := libp2pTopic.Publish(ctx, data); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} diff --git a/pkg/pubsub/subscriptions.go b/pkg/pubsub/subscriptions.go new file mode 100644 index 0000000..fb30a18 --- /dev/null +++ b/pkg/pubsub/subscriptions.go @@ -0,0 +1,140 @@ +package pubsub + +import ( + "context" + "fmt" + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +// Subscribe subscribes to a topic +func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { + if m.pubsub == nil { + return fmt.Errorf("pubsub not initialized") + } + + namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) + + // Check if already subscribed + m.mu.Lock() + if _, exists := m.subscriptions[namespacedTopic]; exists { + m.mu.Unlock() + // Already subscribed - this is normal for LibP2P pubsub + return nil + } + m.mu.Unlock() + + // Get or create topic + libp2pTopic, err := m.getOrCreateTopic(namespacedTopic) + if err != nil { + return fmt.Errorf("failed to get topic: %w", err) + } + + // Subscribe to topic + sub, err := libp2pTopic.Subscribe() + if err != nil { + return fmt.Errorf("failed to subscribe to topic: %w", err) + } + + // Create cancellable context for this subscription + subCtx, cancel := context.WithCancel(context.Background()) + + // Store subscription + m.mu.Lock() + m.subscriptions[namespacedTopic] = &subscription{ + sub: sub, + cancel: cancel, + } + m.mu.Unlock() + + // Start message handler goroutine + go func() { + defer func() { + sub.Cancel() + }() + + for { + select { + case <-subCtx.Done(): + return + default: + msg, err := sub.Next(subCtx) + if err != nil { + if subCtx.Err() != nil { + return // Context cancelled + } + continue + } + + // Call the handler + if err := handler(topic, msg.Data); err != nil { + // Log error but continue processing + continue + } + } + } + }() + + // Force peer discovery for this topic + go m.announceTopicInterest(namespacedTopic) + + // For Anchat, also try to actively find topic peers through the libp2p pubsub system + if len(m.namespace) > 6 && m.namespace[:6] == "anchat" { + go m.enhancedAnchatTopicDiscovery(namespacedTopic, libp2pTopic) + } + + return nil +} + +// Unsubscribe unsubscribes from a topic +func (m *Manager) Unsubscribe(ctx context.Context, topic string) error { + m.mu.Lock() + defer m.mu.Unlock() + + namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic) + + if subscription, exists := m.subscriptions[namespacedTopic]; exists { + // Cancel the subscription context to stop the message handler goroutine + subscription.cancel() + delete(m.subscriptions, namespacedTopic) + } + + return nil +} + +// ListTopics returns all subscribed topics +func (m *Manager) ListTopics(ctx context.Context) ([]string, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + var topics []string + prefix := m.namespace + "." + + for topic := range m.subscriptions { + if len(topic) > len(prefix) && topic[:len(prefix)] == prefix { + originalTopic := topic[len(prefix):] + topics = append(topics, originalTopic) + } + } + + return topics, nil +} + +// Close closes all subscriptions and topics +func (m *Manager) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + // Cancel all subscriptions + for _, sub := range m.subscriptions { + sub.cancel() + } + m.subscriptions = make(map[string]*subscription) + + // Close all topics + for _, topic := range m.topics { + topic.Close() + } + m.topics = make(map[string]*pubsub.Topic) + + return nil +} diff --git a/pkg/pubsub/topics.go b/pkg/pubsub/topics.go new file mode 100644 index 0000000..d1f6053 --- /dev/null +++ b/pkg/pubsub/topics.go @@ -0,0 +1,27 @@ +package pubsub + +import ( + "fmt" + + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +// getOrCreateTopic gets an existing topic or creates a new one +func (m *Manager) getOrCreateTopic(topicName string) (*pubsub.Topic, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // Return existing topic if available + if topic, exists := m.topics[topicName]; exists { + return topic, nil + } + + // Join the topic - LibP2P allows multiple clients to join the same topic + topic, err := m.pubsub.Join(topicName) + if err != nil { + return nil, fmt.Errorf("failed to join topic: %w", err) + } + + m.topics[topicName] = topic + return topic, nil +} diff --git a/pkg/storage/kv_ops.go b/pkg/storage/kv_ops.go new file mode 100644 index 0000000..ad46b85 --- /dev/null +++ b/pkg/storage/kv_ops.go @@ -0,0 +1,182 @@ +package storage + +import ( + "database/sql" + "fmt" + + "go.uber.org/zap" +) + +// processRequest processes a storage request and returns a response +func (s *Service) processRequest(req *StorageRequest) *StorageResponse { + switch req.Type { + case MessageTypePut: + return s.handlePut(req) + case MessageTypeGet: + return s.handleGet(req) + case MessageTypeDelete: + return s.handleDelete(req) + case MessageTypeList: + return s.handleList(req) + case MessageTypeExists: + return s.handleExists(req) + default: + return &StorageResponse{ + Success: false, + Error: fmt.Sprintf("unknown message type: %s", req.Type), + } + } +} + +// handlePut stores a key-value pair +func (s *Service) handlePut(req *StorageRequest) *StorageResponse { + s.mu.Lock() + defer s.mu.Unlock() + + // Use REPLACE to handle both insert and update + query := ` + REPLACE INTO kv_storage (namespace, key, value, updated_at) + VALUES (?, ?, ?, CURRENT_TIMESTAMP) + ` + + _, err := s.db.Exec(query, req.Namespace, req.Key, req.Value) + if err != nil { + return &StorageResponse{ + Success: false, + Error: fmt.Sprintf("failed to store key: %v", err), + } + } + + s.logger.Debug("Stored key", zap.String("key", req.Key), zap.String("namespace", req.Namespace)) + return &StorageResponse{Success: true} +} + +// handleGet retrieves a value by key +func (s *Service) handleGet(req *StorageRequest) *StorageResponse { + s.mu.RLock() + defer s.mu.RUnlock() + + query := `SELECT value FROM kv_storage WHERE namespace = ? AND key = ?` + + var value []byte + err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&value) + if err != nil { + if err == sql.ErrNoRows { + return &StorageResponse{ + Success: false, + Error: fmt.Sprintf("key not found: %s", req.Key), + } + } + return &StorageResponse{ + Success: false, + Error: fmt.Sprintf("failed to get key: %v", err), + } + } + + return &StorageResponse{ + Success: true, + Value: value, + } +} + +// handleDelete removes a key +func (s *Service) handleDelete(req *StorageRequest) *StorageResponse { + s.mu.Lock() + defer s.mu.Unlock() + + query := `DELETE FROM kv_storage WHERE namespace = ? AND key = ?` + + result, err := s.db.Exec(query, req.Namespace, req.Key) + if err != nil { + return &StorageResponse{ + Success: false, + Error: fmt.Sprintf("failed to delete key: %v", err), + } + } + + rowsAffected, _ := result.RowsAffected() + if rowsAffected == 0 { + return &StorageResponse{ + Success: false, + Error: fmt.Sprintf("key not found: %s", req.Key), + } + } + + s.logger.Debug("Deleted key", zap.String("key", req.Key), zap.String("namespace", req.Namespace)) + return &StorageResponse{Success: true} +} + +// handleList lists keys with a prefix +func (s *Service) handleList(req *StorageRequest) *StorageResponse { + s.mu.RLock() + defer s.mu.RUnlock() + + var query string + var args []interface{} + + if req.Prefix == "" { + // List all keys in namespace + query = `SELECT key FROM kv_storage WHERE namespace = ?` + args = []interface{}{req.Namespace} + } else { + // List keys with prefix + query = `SELECT key FROM kv_storage WHERE namespace = ? AND key LIKE ?` + args = []interface{}{req.Namespace, req.Prefix + "%"} + } + + if req.Limit > 0 { + query += ` LIMIT ?` + args = append(args, req.Limit) + } + + rows, err := s.db.Query(query, args...) + if err != nil { + return &StorageResponse{ + Success: false, + Error: fmt.Sprintf("failed to query keys: %v", err), + } + } + defer rows.Close() + + var keys []string + for rows.Next() { + var key string + if err := rows.Scan(&key); err != nil { + continue + } + keys = append(keys, key) + } + + return &StorageResponse{ + Success: true, + Keys: keys, + } +} + +// handleExists checks if a key exists +func (s *Service) handleExists(req *StorageRequest) *StorageResponse { + s.mu.RLock() + defer s.mu.RUnlock() + + query := `SELECT 1 FROM kv_storage WHERE namespace = ? AND key = ? LIMIT 1` + + var exists int + err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&exists) + if err != nil { + if err == sql.ErrNoRows { + return &StorageResponse{ + Success: true, + Exists: false, + } + } + return &StorageResponse{ + Success: false, + Error: fmt.Sprintf("failed to check key existence: %v", err), + } + } + + return &StorageResponse{ + Success: true, + Exists: true, + } +} diff --git a/pkg/storage/logging.go b/pkg/storage/logging.go new file mode 100644 index 0000000..648af74 --- /dev/null +++ b/pkg/storage/logging.go @@ -0,0 +1,16 @@ +package storage + +import "go.uber.org/zap" + +// newStorageLogger creates a zap.Logger for storage components. +// Callers can pass quiet=true to reduce log verbosity. +func newStorageLogger(quiet bool) (*zap.Logger, error) { + if quiet { + cfg := zap.NewProductionConfig() + cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel) + cfg.DisableCaller = true + cfg.DisableStacktrace = true + return cfg.Build() + } + return zap.NewDevelopment() +} diff --git a/pkg/storage/rqlite_init.go b/pkg/storage/rqlite_init.go new file mode 100644 index 0000000..c339467 --- /dev/null +++ b/pkg/storage/rqlite_init.go @@ -0,0 +1,37 @@ +package storage + +import ( + "fmt" +) + +// initTables creates the necessary tables for key-value storage +func (s *Service) initTables() error { + // Create storage table with namespace support + createTableSQL := ` + CREATE TABLE IF NOT EXISTS kv_storage ( + namespace TEXT NOT NULL, + key TEXT NOT NULL, + value BLOB NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (namespace, key) + ) + ` + + // Create index for faster queries + createIndexSQL := ` + CREATE INDEX IF NOT EXISTS idx_kv_storage_namespace_key + ON kv_storage(namespace, key) + ` + + if _, err := s.db.Exec(createTableSQL); err != nil { + return fmt.Errorf("failed to create storage table: %w", err) + } + + if _, err := s.db.Exec(createIndexSQL); err != nil { + return fmt.Errorf("failed to create storage index: %w", err) + } + + s.logger.Info("Storage tables initialized successfully") + return nil +} diff --git a/pkg/storage/service.go b/pkg/storage/service.go index 8ff43a1..e3062c2 100644 --- a/pkg/storage/service.go +++ b/pkg/storage/service.go @@ -2,11 +2,8 @@ package storage import ( "database/sql" - "fmt" - "io" "sync" - "github.com/libp2p/go-libp2p/core/network" "go.uber.org/zap" ) @@ -24,260 +21,9 @@ func NewService(db *sql.DB, logger *zap.Logger) (*Service, error) { db: db, } - // Initialize storage tables - if err := service.initTables(); err != nil { - return nil, fmt.Errorf("failed to initialize storage tables: %w", err) - } - return service, nil } -// initTables creates the necessary tables for key-value storage -func (s *Service) initTables() error { - // Create storage table with namespace support - createTableSQL := ` - CREATE TABLE IF NOT EXISTS kv_storage ( - namespace TEXT NOT NULL, - key TEXT NOT NULL, - value BLOB NOT NULL, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (namespace, key) - ) - ` - - // Create index for faster queries - createIndexSQL := ` - CREATE INDEX IF NOT EXISTS idx_kv_storage_namespace_key - ON kv_storage(namespace, key) - ` - - if _, err := s.db.Exec(createTableSQL); err != nil { - return fmt.Errorf("failed to create storage table: %w", err) - } - - if _, err := s.db.Exec(createIndexSQL); err != nil { - return fmt.Errorf("failed to create storage index: %w", err) - } - - s.logger.Info("Storage tables initialized successfully") - return nil -} - -// HandleStorageStream handles incoming storage protocol streams -func (s *Service) HandleStorageStream(stream network.Stream) { - defer stream.Close() - - // Read request - data, err := io.ReadAll(stream) - if err != nil { - s.logger.Error("Failed to read storage request", zap.Error(err)) - return - } - - var request StorageRequest - if err := request.Unmarshal(data); err != nil { - s.logger.Error("Failed to unmarshal storage request", zap.Error(err)) - return - } - - // Process request - response := s.processRequest(&request) - - // Send response - responseData, err := response.Marshal() - if err != nil { - s.logger.Error("Failed to marshal storage response", zap.Error(err)) - return - } - - if _, err := stream.Write(responseData); err != nil { - s.logger.Error("Failed to write storage response", zap.Error(err)) - return - } - - s.logger.Debug("Handled storage request", - zap.String("type", string(request.Type)), - zap.String("key", request.Key), - zap.String("namespace", request.Namespace), - zap.Bool("success", response.Success), - ) -} - -// processRequest processes a storage request and returns a response -func (s *Service) processRequest(req *StorageRequest) *StorageResponse { - switch req.Type { - case MessageTypePut: - return s.handlePut(req) - case MessageTypeGet: - return s.handleGet(req) - case MessageTypeDelete: - return s.handleDelete(req) - case MessageTypeList: - return s.handleList(req) - case MessageTypeExists: - return s.handleExists(req) - default: - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("unknown message type: %s", req.Type), - } - } -} - -// handlePut stores a key-value pair -func (s *Service) handlePut(req *StorageRequest) *StorageResponse { - s.mu.Lock() - defer s.mu.Unlock() - - // Use REPLACE to handle both insert and update - query := ` - REPLACE INTO kv_storage (namespace, key, value, updated_at) - VALUES (?, ?, ?, CURRENT_TIMESTAMP) - ` - - _, err := s.db.Exec(query, req.Namespace, req.Key, req.Value) - if err != nil { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to store key: %v", err), - } - } - - s.logger.Debug("Stored key", zap.String("key", req.Key), zap.String("namespace", req.Namespace)) - return &StorageResponse{Success: true} -} - -// handleGet retrieves a value by key -func (s *Service) handleGet(req *StorageRequest) *StorageResponse { - s.mu.RLock() - defer s.mu.RUnlock() - - query := `SELECT value FROM kv_storage WHERE namespace = ? AND key = ?` - - var value []byte - err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&value) - if err != nil { - if err == sql.ErrNoRows { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("key not found: %s", req.Key), - } - } - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to get key: %v", err), - } - } - - return &StorageResponse{ - Success: true, - Value: value, - } -} - -// handleDelete removes a key -func (s *Service) handleDelete(req *StorageRequest) *StorageResponse { - s.mu.Lock() - defer s.mu.Unlock() - - query := `DELETE FROM kv_storage WHERE namespace = ? AND key = ?` - - result, err := s.db.Exec(query, req.Namespace, req.Key) - if err != nil { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to delete key: %v", err), - } - } - - rowsAffected, _ := result.RowsAffected() - if rowsAffected == 0 { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("key not found: %s", req.Key), - } - } - - s.logger.Debug("Deleted key", zap.String("key", req.Key), zap.String("namespace", req.Namespace)) - return &StorageResponse{Success: true} -} - -// handleList lists keys with a prefix -func (s *Service) handleList(req *StorageRequest) *StorageResponse { - s.mu.RLock() - defer s.mu.RUnlock() - - var query string - var args []interface{} - - if req.Prefix == "" { - // List all keys in namespace - query = `SELECT key FROM kv_storage WHERE namespace = ?` - args = []interface{}{req.Namespace} - } else { - // List keys with prefix - query = `SELECT key FROM kv_storage WHERE namespace = ? AND key LIKE ?` - args = []interface{}{req.Namespace, req.Prefix + "%"} - } - - if req.Limit > 0 { - query += ` LIMIT ?` - args = append(args, req.Limit) - } - - rows, err := s.db.Query(query, args...) - if err != nil { - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to query keys: %v", err), - } - } - defer rows.Close() - - var keys []string - for rows.Next() { - var key string - if err := rows.Scan(&key); err != nil { - continue - } - keys = append(keys, key) - } - - return &StorageResponse{ - Success: true, - Keys: keys, - } -} - -// handleExists checks if a key exists -func (s *Service) handleExists(req *StorageRequest) *StorageResponse { - s.mu.RLock() - defer s.mu.RUnlock() - - query := `SELECT 1 FROM kv_storage WHERE namespace = ? AND key = ? LIMIT 1` - - var exists int - err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&exists) - if err != nil { - if err == sql.ErrNoRows { - return &StorageResponse{ - Success: true, - Exists: false, - } - } - return &StorageResponse{ - Success: false, - Error: fmt.Sprintf("failed to check key existence: %v", err), - } - } - - return &StorageResponse{ - Success: true, - Exists: true, - } -} - // Close closes the storage service func (s *Service) Close() error { // The database connection is managed elsewhere diff --git a/pkg/storage/stream_handler.go b/pkg/storage/stream_handler.go new file mode 100644 index 0000000..4c38fa1 --- /dev/null +++ b/pkg/storage/stream_handler.go @@ -0,0 +1,48 @@ +package storage + +import ( + "io" + + "github.com/libp2p/go-libp2p/core/network" + "go.uber.org/zap" +) + +// HandleStorageStream handles incoming storage protocol streams +func (s *Service) HandleStorageStream(stream network.Stream) { + defer stream.Close() + + // Read request + data, err := io.ReadAll(stream) + if err != nil { + s.logger.Error("Failed to read storage request", zap.Error(err)) + return + } + + var request StorageRequest + if err := request.Unmarshal(data); err != nil { + s.logger.Error("Failed to unmarshal storage request", zap.Error(err)) + return + } + + // Process request + response := s.processRequest(&request) + + // Send response + responseData, err := response.Marshal() + if err != nil { + s.logger.Error("Failed to marshal storage response", zap.Error(err)) + return + } + + if _, err := stream.Write(responseData); err != nil { + s.logger.Error("Failed to write storage response", zap.Error(err)) + return + } + + s.logger.Debug("Handled storage request", + zap.String("type", string(request.Type)), + zap.String("key", request.Key), + zap.String("namespace", request.Namespace), + zap.Bool("success", response.Success), + ) +}