mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 13:49:07 +00:00
refactor: split pubsub manager into focused modules and add env config support
This commit is contained in:
parent
27f2460bf2
commit
e037773ece
46
.github/copilot-instructions.md
vendored
46
.github/copilot-instructions.md
vendored
@ -1,46 +0,0 @@
|
|||||||
<!-- Use this file to provide workspace-specific custom instructions to Copilot. For more details, visit https://code.visualstudio.com/docs/copilot/copilot-customization#_use-a-githubcopilotinstructionsmd-file -->
|
|
||||||
|
|
||||||
# Network - Distributed P2P Database System
|
|
||||||
|
|
||||||
This is a distributed peer-to-peer network project built with Go and LibP2P. The system provides decentralized database capabilities with consensus and replication.
|
|
||||||
|
|
||||||
## Key Components
|
|
||||||
|
|
||||||
- **LibP2P Network Layer**: Core networking built on LibP2P for P2P communication
|
|
||||||
- **Distributed Database**: RQLite-based distributed SQLite with Raft consensus
|
|
||||||
- **Client Library**: Go API for applications to interact with the network
|
|
||||||
- **Application Isolation**: Each app gets isolated namespaces for data and messaging
|
|
||||||
|
|
||||||
## Development Guidelines
|
|
||||||
|
|
||||||
1. **Architecture Patterns**: Follow the client-server pattern where applications use the client library to interact with the distributed network
|
|
||||||
2. **Namespacing**: All data (database, storage, pub/sub) is namespaced per application to ensure isolation
|
|
||||||
3. **Error Handling**: Always check for connection status before performing operations
|
|
||||||
4. **Async Operations**: Use context.Context for cancellation and timeouts
|
|
||||||
5. **Logging**: Use structured logging with appropriate log levels
|
|
||||||
|
|
||||||
## Code Style
|
|
||||||
|
|
||||||
- Use standard Go conventions and naming
|
|
||||||
- Implement interfaces for testability
|
|
||||||
- Include comprehensive error messages
|
|
||||||
- Add context parameters to all network operations
|
|
||||||
- Use dependency injection for components
|
|
||||||
|
|
||||||
## Testing Strategy
|
|
||||||
|
|
||||||
- Unit tests for individual components
|
|
||||||
- Integration tests for client library
|
|
||||||
- E2E tests for full network scenarios
|
|
||||||
- Mock external dependencies (LibP2P, database)
|
|
||||||
|
|
||||||
## Future Applications
|
|
||||||
|
|
||||||
This network is designed to support applications like:
|
|
||||||
|
|
||||||
- Anchat (encrypted messaging)
|
|
||||||
- Distributed file storage
|
|
||||||
- IoT data collection
|
|
||||||
- Social networks
|
|
||||||
|
|
||||||
When implementing applications, they should use the client library rather than directly accessing network internals.
|
|
@ -118,10 +118,9 @@ Create a robust, decentralized network platform that enables applications to sea
|
|||||||
## Codebase Structure
|
## Codebase Structure
|
||||||
|
|
||||||
```
|
```
|
||||||
debros-testing/
|
network/
|
||||||
├── cmd/ # Executables
|
├── cmd/ # Executables
|
||||||
│ ├── bootstrap/main.go # Bootstrap node (network entry point)
|
│ ├── node/main.go # Network node (bootstrap via flag/auto)
|
||||||
│ ├── node/main.go # Regular network node
|
|
||||||
│ └── cli/main.go # Command-line interface
|
│ └── cli/main.go # Command-line interface
|
||||||
├── pkg/ # Core packages
|
├── pkg/ # Core packages
|
||||||
│ ├── client/ # Client API and implementations
|
│ ├── client/ # Client API and implementations
|
||||||
@ -358,7 +357,7 @@ discovery:
|
|||||||
database:
|
database:
|
||||||
rqlite_port: 5001
|
rqlite_port: 5001
|
||||||
rqlite_raft_port: 7001
|
rqlite_raft_port: 7001
|
||||||
rqlite_join_address: "http://localhost:5001"
|
rqlite_join_address: "localhost:7001"
|
||||||
```
|
```
|
||||||
|
|
||||||
## API Reference
|
## API Reference
|
||||||
@ -366,7 +365,7 @@ database:
|
|||||||
### Client Creation
|
### Client Creation
|
||||||
|
|
||||||
```go
|
```go
|
||||||
import "network/pkg/client"
|
import "git.debros.io/DeBros/network/pkg/client"
|
||||||
|
|
||||||
config := client.DefaultClientConfig("my-app")
|
config := client.DefaultClientConfig("my-app")
|
||||||
config.BootstrapPeers = []string{"/ip4/127.0.0.1/tcp/4001/p2p/{PEER_ID}"}
|
config.BootstrapPeers = []string{"/ip4/127.0.0.1/tcp/4001/p2p/{PEER_ID}"}
|
||||||
|
21
Makefile
21
Makefile
@ -1,7 +1,7 @@
|
|||||||
# Network - Distributed P2P Database System
|
# Network - Distributed P2P Database System
|
||||||
# Makefile for development and build tasks
|
# Makefile for development and build tasks
|
||||||
|
|
||||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet
|
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
|
||||||
|
|
||||||
# Build targets
|
# Build targets
|
||||||
build: deps
|
build: deps
|
||||||
@ -42,6 +42,11 @@ run-node3:
|
|||||||
@if [ -z "$(BOOTSTRAP)" ]; then echo "ERROR: Provide BOOTSTRAP multiaddr: make run-node3 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/<ID> [HTTP=5003 RAFT=7003 P2P=4003]"; exit 1; fi
|
@if [ -z "$(BOOTSTRAP)" ]; then echo "ERROR: Provide BOOTSTRAP multiaddr: make run-node3 BOOTSTRAP=/ip4/127.0.0.1/tcp/4001/p2p/<ID> [HTTP=5003 RAFT=7003 P2P=4003]"; exit 1; fi
|
||||||
go run cmd/node/main.go -role node -id node3 -data ./data/node3 -bootstrap $(BOOTSTRAP) -rqlite-http-port $${HTTP:-5003} -rqlite-raft-port $${RAFT:-7003} -p2p-port $${P2P:-4003} -advertise $${ADVERTISE:-localhost}
|
go run cmd/node/main.go -role node -id node3 -data ./data/node3 -bootstrap $(BOOTSTRAP) -rqlite-http-port $${HTTP:-5003} -rqlite-raft-port $${RAFT:-7003} -p2p-port $${P2P:-4003} -advertise $${ADVERTISE:-localhost}
|
||||||
|
|
||||||
|
# Run basic usage example
|
||||||
|
run-example:
|
||||||
|
@echo "Running basic usage example..."
|
||||||
|
go run examples/basic_usage.go
|
||||||
|
|
||||||
# Show how to run with flags
|
# Show how to run with flags
|
||||||
show-bootstrap:
|
show-bootstrap:
|
||||||
@echo "Provide bootstrap via flags, e.g.:"
|
@echo "Provide bootstrap via flags, e.g.:"
|
||||||
@ -96,10 +101,20 @@ vet:
|
|||||||
@echo "Vetting code..."
|
@echo "Vetting code..."
|
||||||
go vet ./...
|
go vet ./...
|
||||||
|
|
||||||
|
# Lint alias (lightweight for now)
|
||||||
|
lint: fmt vet
|
||||||
|
@echo "Linting complete (fmt + vet)"
|
||||||
|
|
||||||
|
# Clear common development ports
|
||||||
|
clear-ports:
|
||||||
|
@echo "Clearing common dev ports (4001/4002, 5001/5002, 7001/7002)..."
|
||||||
|
@chmod +x scripts/clear-ports.sh || true
|
||||||
|
@scripts/clear-ports.sh
|
||||||
|
|
||||||
# Development setup
|
# Development setup
|
||||||
dev-setup: deps
|
dev-setup: deps
|
||||||
@echo "Setting up development environment..."
|
@echo "Setting up development environment..."
|
||||||
@mkdir -p data/bootstrap data/node data/node-node2 data/node-node3
|
@mkdir -p data/bootstrap data/node data/node2 data/node3
|
||||||
@mkdir -p data/test-bootstrap data/test-node1 data/test-node2
|
@mkdir -p data/test-bootstrap data/test-node1 data/test-node2
|
||||||
@echo "Development setup complete!"
|
@echo "Development setup complete!"
|
||||||
|
|
||||||
@ -168,6 +183,8 @@ help:
|
|||||||
@echo " tidy - Tidy dependencies"
|
@echo " tidy - Tidy dependencies"
|
||||||
@echo " fmt - Format code"
|
@echo " fmt - Format code"
|
||||||
@echo " vet - Vet code"
|
@echo " vet - Vet code"
|
||||||
|
@echo " lint - Lint code (fmt + vet)"
|
||||||
|
@echo " clear-ports - Clear common dev ports"
|
||||||
@echo " dev-setup - Setup development environment"
|
@echo " dev-setup - Setup development environment"
|
||||||
@echo " dev-cluster - Show cluster startup commands"
|
@echo " dev-cluster - Show cluster startup commands"
|
||||||
@echo " dev - Full development workflow"
|
@echo " dev - Full development workflow"
|
||||||
|
93
README.md
93
README.md
@ -31,12 +31,8 @@ A distributed peer-to-peer network built with Go and LibP2P, providing decentral
|
|||||||
- [Network Discovery](#network-discovery)
|
- [Network Discovery](#network-discovery)
|
||||||
- [Updates and Maintenance](#updates-and-maintenance)
|
- [Updates and Maintenance](#updates-and-maintenance)
|
||||||
- [Monitoring and Troubleshooting](#monitoring-and-troubleshooting)
|
- [Monitoring and Troubleshooting](#monitoring-and-troubleshooting)
|
||||||
- [Environment Configuration](#environment-configuration)
|
- [Configuration](#configuration)
|
||||||
- [Bootstrap Peers Configuration](#bootstrap-peers-configuration)
|
- [Bootstrap and Ports (via flags)](#bootstrap-and-ports-via-flags)
|
||||||
- [Setup for Development](#setup-for-development)
|
|
||||||
- [Configuration Files](#configuration-files-1)
|
|
||||||
- [Multiple Bootstrap Peers](#multiple-bootstrap-peers)
|
|
||||||
- [Checking Configuration](#checking-configuration)
|
|
||||||
- [CLI Commands](#cli-commands)
|
- [CLI Commands](#cli-commands)
|
||||||
- [Network Operations](#network-operations)
|
- [Network Operations](#network-operations)
|
||||||
- [Storage Operations](#storage-operations)
|
- [Storage Operations](#storage-operations)
|
||||||
@ -226,14 +222,14 @@ go run cmd/node/main.go \
|
|||||||
make show-bootstrap
|
make show-bootstrap
|
||||||
|
|
||||||
# Check network health
|
# Check network health
|
||||||
./bin/cli health
|
./bin/network-cli health
|
||||||
|
|
||||||
# Test storage operations
|
# Test storage operations
|
||||||
./bin/cli storage put test-key "Hello Network"
|
./bin/network-cli storage put test-key "Hello Network"
|
||||||
./bin/cli storage get test-key
|
./bin/network-cli storage get test-key
|
||||||
|
|
||||||
# List connected peers
|
# List connected peers
|
||||||
./bin/cli peers
|
./bin/network-cli peers
|
||||||
```
|
```
|
||||||
|
|
||||||
## Deployment
|
## Deployment
|
||||||
@ -334,9 +330,9 @@ sudo systemctl enable debros-node
|
|||||||
sudo systemctl disable debros-node
|
sudo systemctl disable debros-node
|
||||||
|
|
||||||
# Use CLI tools
|
# Use CLI tools
|
||||||
/opt/debros/bin/cli health
|
/opt/debros/bin/network-cli health
|
||||||
/opt/debros/bin/cli peers
|
/opt/debros/bin/network-cli peers
|
||||||
/opt/debros/bin/cli storage put key value
|
/opt/debros/bin/network-cli storage put key value
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Configuration Files
|
#### Configuration Files
|
||||||
@ -407,8 +403,8 @@ sudo netstat -tuln | grep -E "(4001|5001|7001)"
|
|||||||
sudo journalctl -u debros-node.service --since "1 hour ago"
|
sudo journalctl -u debros-node.service --since "1 hour ago"
|
||||||
|
|
||||||
# Check network connectivity
|
# Check network connectivity
|
||||||
/opt/debros/bin/cli health
|
/opt/debros/bin/network-cli health
|
||||||
/opt/debros/bin/cli peers
|
/opt/debros/bin/network-cli peers
|
||||||
|
|
||||||
# Check disk usage
|
# Check disk usage
|
||||||
du -sh /opt/debros/data/*
|
du -sh /opt/debros/data/*
|
||||||
@ -429,6 +425,41 @@ For more advanced configuration options and development setup, see the sections
|
|||||||
|
|
||||||
Examples are shown in Quick Start above for local multi-node on a single machine.
|
Examples are shown in Quick Start above for local multi-node on a single machine.
|
||||||
|
|
||||||
|
### Environment Variables
|
||||||
|
|
||||||
|
Precedence: CLI flags > Environment variables > Code defaults. Set any of the following in your shell or `.env`:
|
||||||
|
|
||||||
|
- NODE_ID: custom node identifier (e.g. "node2")
|
||||||
|
- NODE_TYPE: "bootstrap" or "node"
|
||||||
|
- NODE_LISTEN_ADDRESSES: comma-separated multiaddrs (e.g. "/ip4/0.0.0.0/tcp/4001,/ip4/0.0.0.0/udp/4001/quic")
|
||||||
|
- DATA_DIR: node data directory (default `./data`)
|
||||||
|
- MAX_CONNECTIONS: max peer connections (int)
|
||||||
|
|
||||||
|
- DB_DATA_DIR: database data directory (default `./data/db`)
|
||||||
|
- REPLICATION_FACTOR: int (default 3)
|
||||||
|
- SHARD_COUNT: int (default 16)
|
||||||
|
- MAX_DB_SIZE: e.g. "1g", "512m", or bytes
|
||||||
|
- BACKUP_INTERVAL: Go duration (e.g. "24h")
|
||||||
|
- RQLITE_HTTP_PORT: int (default 5001)
|
||||||
|
- RQLITE_RAFT_PORT: int (default 7001)
|
||||||
|
- RQLITE_JOIN_ADDRESS: host:port for Raft join (regular nodes)
|
||||||
|
- ADVERTISE_MODE: "auto" | "localhost" | "ip"
|
||||||
|
|
||||||
|
- BOOTSTRAP_PEERS: comma-separated multiaddrs for bootstrap peers
|
||||||
|
- ENABLE_MDNS: true/false
|
||||||
|
- ENABLE_DHT: true/false
|
||||||
|
- DHT_PREFIX: string (default `/network/kad/1.0.0`)
|
||||||
|
- DISCOVERY_INTERVAL: duration (e.g. "5m")
|
||||||
|
|
||||||
|
- ENABLE_TLS: true/false
|
||||||
|
- PRIVATE_KEY_FILE: path
|
||||||
|
- CERT_FILE: path
|
||||||
|
- AUTH_ENABLED: true/false
|
||||||
|
|
||||||
|
- LOG_LEVEL: "debug" | "info" | "warn" | "error"
|
||||||
|
- LOG_FORMAT: "json" | "console"
|
||||||
|
- LOG_OUTPUT_FILE: path (empty = stdout)
|
||||||
|
|
||||||
## CLI Commands
|
## CLI Commands
|
||||||
|
|
||||||
The CLI can still accept `--bootstrap <multiaddr>` to override discovery when needed.
|
The CLI can still accept `--bootstrap <multiaddr>` to override discovery when needed.
|
||||||
@ -436,32 +467,32 @@ The CLI can still accept `--bootstrap <multiaddr>` to override discovery when ne
|
|||||||
### Network Operations
|
### Network Operations
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
./bin/cli health # Check network health
|
./bin/network-cli health # Check network health
|
||||||
./bin/cli status # Get network status
|
./bin/network-cli status # Get network status
|
||||||
./bin/cli peers # List connected peers
|
./bin/network-cli peers # List connected peers
|
||||||
```
|
```
|
||||||
|
|
||||||
### Storage Operations
|
### Storage Operations
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
./bin/cli storage put <key> <value> # Store data
|
./bin/network-cli storage put <key> <value> # Store data
|
||||||
./bin/cli storage get <key> # Retrieve data
|
./bin/network-cli storage get <key> # Retrieve data
|
||||||
./bin/cli storage list [prefix] # List keys
|
./bin/network-cli storage list [prefix] # List keys
|
||||||
```
|
```
|
||||||
|
|
||||||
### Database Operations
|
### Database Operations
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
./bin/cli query "SELECT * FROM table" # Execute SQL
|
./bin/network-cli query "SELECT * FROM table" # Execute SQL
|
||||||
./bin/cli query "CREATE TABLE users (id INTEGER)" # DDL operations
|
./bin/network-cli query "CREATE TABLE users (id INTEGER)" # DDL operations
|
||||||
```
|
```
|
||||||
|
|
||||||
### Pub/Sub Messaging
|
### Pub/Sub Messaging
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
./bin/cli pubsub publish <topic> <message> # Send message
|
./bin/network-cli pubsub publish <topic> <message> # Send message
|
||||||
./bin/cli pubsub subscribe <topic> [duration] # Listen for messages
|
./bin/network-cli pubsub subscribe <topic> [duration] # Listen for messages
|
||||||
./bin/cli pubsub topics # List active topics
|
./bin/network-cli pubsub topics # List active topics
|
||||||
```
|
```
|
||||||
|
|
||||||
### CLI Options
|
### CLI Options
|
||||||
@ -479,8 +510,7 @@ The CLI can still accept `--bootstrap <multiaddr>` to override discovery when ne
|
|||||||
```
|
```
|
||||||
network/
|
network/
|
||||||
├── cmd/
|
├── cmd/
|
||||||
│ ├── bootstrap/ # Bootstrap node
|
│ ├── node/ # Network node (bootstrap via flag)
|
||||||
│ ├── node/ # Regular network node
|
|
||||||
│ └── cli/ # Command-line interface
|
│ └── cli/ # Command-line interface
|
||||||
├── pkg/
|
├── pkg/
|
||||||
│ ├── client/ # Client library
|
│ ├── client/ # Client library
|
||||||
@ -490,7 +520,6 @@ network/
|
|||||||
│ ├── constants/ # Bootstrap configuration
|
│ ├── constants/ # Bootstrap configuration
|
||||||
│ └── config/ # System configuration
|
│ └── config/ # System configuration
|
||||||
├── scripts/ # Helper scripts (install, security, tests)
|
├── scripts/ # Helper scripts (install, security, tests)
|
||||||
├── scripts/ # Helper scripts (install, security, tests)
|
|
||||||
├── bin/ # Built executables
|
├── bin/ # Built executables
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -546,8 +575,8 @@ make dev
|
|||||||
make run-node
|
make run-node
|
||||||
|
|
||||||
# Terminal 3: Test with CLI
|
# Terminal 3: Test with CLI
|
||||||
./bin/cli health
|
./bin/network-cli health
|
||||||
./bin/cli peers
|
./bin/network-cli peers
|
||||||
```
|
```
|
||||||
|
|
||||||
### Environment Setup
|
### Environment Setup
|
||||||
@ -607,7 +636,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"network/pkg/client"
|
"git.debros.io/DeBros/network/pkg/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -1,48 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
# clear-ports.sh
|
|
||||||
# Safely terminate any processes listening on specified TCP ports.
|
|
||||||
# Usage:
|
|
||||||
# ./clear-ports.sh # clears 4001, 5001, 7001 by default
|
|
||||||
# ./clear-ports.sh 4001 5001 7001 # clears the specified ports
|
|
||||||
|
|
||||||
set -euo pipefail
|
|
||||||
|
|
||||||
# Collect ports from args or use defaults
|
|
||||||
PORTS=("$@")
|
|
||||||
if [ ${#PORTS[@]} -eq 0 ]; then
|
|
||||||
PORTS=(4001 4002 5001 5002 7001 7002)
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Gracefully terminating listeners on: ${PORTS[*]}"
|
|
||||||
for p in "${PORTS[@]}"; do
|
|
||||||
PIDS=$(lsof -t -n -P -iTCP:"$p" -sTCP:LISTEN 2>/dev/null || true)
|
|
||||||
if [ -n "$PIDS" ]; then
|
|
||||||
echo "Port $p -> PIDs: $PIDS (SIGTERM)"
|
|
||||||
# shellcheck disable=SC2086
|
|
||||||
kill -TERM $PIDS 2>/dev/null || true
|
|
||||||
else
|
|
||||||
echo "Port $p -> no listeners"
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
sleep 1
|
|
||||||
|
|
||||||
echo "Force killing any remaining listeners..."
|
|
||||||
for p in "${PORTS[@]}"; do
|
|
||||||
PIDS=$(lsof -t -n -P -iTCP:"$p" -sTCP:LISTEN 2>/dev/null || true)
|
|
||||||
if [ -n "$PIDS" ]; then
|
|
||||||
echo "Port $p -> PIDs: $PIDS (SIGKILL)"
|
|
||||||
# shellcheck disable=SC2086
|
|
||||||
kill -9 $PIDS 2>/dev/null || true
|
|
||||||
else
|
|
||||||
echo "Port $p -> none remaining"
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
echo "\nVerification (should be empty):"
|
|
||||||
for p in "${PORTS[@]}"; do
|
|
||||||
echo "--- Port $p ---"
|
|
||||||
lsof -n -P -iTCP:"$p" -sTCP:LISTEN 2>/dev/null || true
|
|
||||||
echo
|
|
||||||
done
|
|
||||||
|
|
@ -88,6 +88,10 @@ func main() {
|
|||||||
logger.Printf("Starting regular node...")
|
logger.Printf("Starting regular node...")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply environment variable overrides before applying CLI flags so that
|
||||||
|
// precedence is: flags > env > defaults
|
||||||
|
config.ApplyEnvOverrides(cfg)
|
||||||
|
|
||||||
// Set basic configuration
|
// Set basic configuration
|
||||||
cfg.Node.DataDir = *dataDir
|
cfg.Node.DataDir = *dataDir
|
||||||
cfg.Node.ListenAddresses = []string{
|
cfg.Node.ListenAddresses = []string{
|
||||||
@ -153,8 +157,11 @@ func main() {
|
|||||||
}
|
}
|
||||||
logger.Printf("Using command line bootstrap peer: %s", *bootstrap)
|
logger.Printf("Using command line bootstrap peer: %s", *bootstrap)
|
||||||
} else {
|
} else {
|
||||||
// Use environment-configured bootstrap peers
|
// Use environment-configured bootstrap peers if provided; otherwise fallback to constants
|
||||||
bootstrapPeers := constants.GetBootstrapPeers()
|
bootstrapPeers := cfg.Discovery.BootstrapPeers
|
||||||
|
if len(bootstrapPeers) == 0 {
|
||||||
|
bootstrapPeers = constants.GetBootstrapPeers()
|
||||||
|
}
|
||||||
if len(bootstrapPeers) > 0 {
|
if len(bootstrapPeers) > 0 {
|
||||||
cfg.Discovery.BootstrapPeers = bootstrapPeers
|
cfg.Discovery.BootstrapPeers = bootstrapPeers
|
||||||
// Use the first bootstrap peer for RQLite join
|
// Use the first bootstrap peer for RQLite join
|
||||||
|
@ -1,48 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/rand"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/crypto"
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
// Generate a fixed identity
|
|
||||||
priv, pub, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, rand.Reader)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get peer ID
|
|
||||||
peerID, err := peer.IDFromPublicKey(pub)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("Generated Peer ID: %s\n", peerID.String())
|
|
||||||
|
|
||||||
// Marshal private key
|
|
||||||
data, err := crypto.MarshalPrivateKey(priv)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create data directory
|
|
||||||
dataDir := "./data/bootstrap"
|
|
||||||
if err := os.MkdirAll(dataDir, 0755); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save identity
|
|
||||||
identityFile := filepath.Join(dataDir, "identity.key")
|
|
||||||
if err := os.WriteFile(identityFile, data, 0600); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("Identity saved to: %s\n", identityFile)
|
|
||||||
fmt.Printf("Bootstrap address: /ip4/127.0.0.1/tcp/4001/p2p/%s\n", peerID.String())
|
|
||||||
}
|
|
@ -84,22 +84,8 @@ func NewClient(config *ClientConfig) (NetworkClient, error) {
|
|||||||
return nil, fmt.Errorf("app name is required")
|
return nil, fmt.Errorf("app name is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create zap logger - use different config for quiet mode
|
// Create zap logger via helper for consistency
|
||||||
var logger *zap.Logger
|
logger, err := newClientLogger(config.QuietMode)
|
||||||
var err error
|
|
||||||
if config.QuietMode {
|
|
||||||
// For quiet mode, only show warnings and errors
|
|
||||||
zapConfig := zap.NewProductionConfig()
|
|
||||||
zapConfig.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
|
|
||||||
// Disable caller info for cleaner output
|
|
||||||
zapConfig.DisableCaller = true
|
|
||||||
// Disable stacktrace for cleaner output
|
|
||||||
zapConfig.DisableStacktrace = true
|
|
||||||
logger, err = zapConfig.Build()
|
|
||||||
} else {
|
|
||||||
// Development logger shows debug/info logs
|
|
||||||
logger, err = zap.NewDevelopment()
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create logger: %w", err)
|
return nil, fmt.Errorf("failed to create logger: %w", err)
|
||||||
}
|
}
|
||||||
|
19
pkg/client/logging.go
Normal file
19
pkg/client/logging.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// newClientLogger creates a zap.Logger based on quiet mode preference.
|
||||||
|
// Quiet mode returns a production logger with Warn+ level and reduced noise.
|
||||||
|
// Non-quiet returns a development logger with debug/info output.
|
||||||
|
func newClientLogger(quiet bool) (*zap.Logger, error) {
|
||||||
|
if quiet {
|
||||||
|
cfg := zap.NewProductionConfig()
|
||||||
|
cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
|
||||||
|
cfg.DisableCaller = true
|
||||||
|
cfg.DisableStacktrace = true
|
||||||
|
return cfg.Build()
|
||||||
|
}
|
||||||
|
return zap.NewDevelopment()
|
||||||
|
}
|
@ -1,6 +1,9 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
@ -156,3 +159,184 @@ func BootstrapConfig() *Config {
|
|||||||
}
|
}
|
||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewConfigFromEnv constructs a config (bootstrap or regular) and applies environment overrides.
|
||||||
|
// If isBootstrap is true, starts from BootstrapConfig; otherwise from DefaultConfig.
|
||||||
|
func NewConfigFromEnv(isBootstrap bool) *Config {
|
||||||
|
var cfg *Config
|
||||||
|
if isBootstrap {
|
||||||
|
cfg = BootstrapConfig()
|
||||||
|
} else {
|
||||||
|
cfg = DefaultConfig()
|
||||||
|
}
|
||||||
|
ApplyEnvOverrides(cfg)
|
||||||
|
return cfg
|
||||||
|
}
|
||||||
|
|
||||||
|
// ApplyEnvOverrides mutates cfg based on environment variables.
|
||||||
|
// Precedence: CLI flags (outside this function) > ENV variables > defaults in code.
|
||||||
|
func ApplyEnvOverrides(cfg *Config) {
|
||||||
|
// Node
|
||||||
|
if v := os.Getenv("NODE_ID"); v != "" {
|
||||||
|
cfg.Node.ID = v
|
||||||
|
}
|
||||||
|
if v := os.Getenv("NODE_TYPE"); v != "" { // "bootstrap" or "node"
|
||||||
|
cfg.Node.Type = strings.ToLower(v)
|
||||||
|
cfg.Node.IsBootstrap = cfg.Node.Type == "bootstrap"
|
||||||
|
}
|
||||||
|
if v := os.Getenv("NODE_LISTEN_ADDRESSES"); v != "" {
|
||||||
|
parts := splitAndTrim(v)
|
||||||
|
if len(parts) > 0 {
|
||||||
|
cfg.Node.ListenAddresses = parts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("DATA_DIR"); v != "" {
|
||||||
|
cfg.Node.DataDir = v
|
||||||
|
}
|
||||||
|
if v := os.Getenv("MAX_CONNECTIONS"); v != "" {
|
||||||
|
if n, err := strconv.Atoi(v); err == nil {
|
||||||
|
cfg.Node.MaxConnections = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Database
|
||||||
|
if v := os.Getenv("DB_DATA_DIR"); v != "" {
|
||||||
|
cfg.Database.DataDir = v
|
||||||
|
}
|
||||||
|
if v := os.Getenv("REPLICATION_FACTOR"); v != "" {
|
||||||
|
if n, err := strconv.Atoi(v); err == nil {
|
||||||
|
cfg.Database.ReplicationFactor = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("SHARD_COUNT"); v != "" {
|
||||||
|
if n, err := strconv.Atoi(v); err == nil {
|
||||||
|
cfg.Database.ShardCount = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("MAX_DB_SIZE"); v != "" { // bytes
|
||||||
|
if n, err := parseInt64(v); err == nil {
|
||||||
|
cfg.Database.MaxDatabaseSize = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("BACKUP_INTERVAL"); v != "" { // duration, e.g. 24h
|
||||||
|
if d, err := time.ParseDuration(v); err == nil {
|
||||||
|
cfg.Database.BackupInterval = d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("RQLITE_HTTP_PORT"); v != "" {
|
||||||
|
if n, err := strconv.Atoi(v); err == nil {
|
||||||
|
cfg.Database.RQLitePort = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("RQLITE_RAFT_PORT"); v != "" {
|
||||||
|
if n, err := strconv.Atoi(v); err == nil {
|
||||||
|
cfg.Database.RQLiteRaftPort = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("RQLITE_JOIN_ADDRESS"); v != "" {
|
||||||
|
cfg.Database.RQLiteJoinAddress = v
|
||||||
|
}
|
||||||
|
if v := os.Getenv("ADVERTISE_MODE"); v != "" { // auto | localhost | ip
|
||||||
|
cfg.Database.AdvertiseMode = strings.ToLower(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Discovery
|
||||||
|
if v := os.Getenv("BOOTSTRAP_PEERS"); v != "" {
|
||||||
|
parts := splitAndTrim(v)
|
||||||
|
if len(parts) > 0 {
|
||||||
|
cfg.Discovery.BootstrapPeers = parts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("ENABLE_MDNS"); v != "" {
|
||||||
|
if b, err := parseBool(v); err == nil {
|
||||||
|
cfg.Discovery.EnableMDNS = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("ENABLE_DHT"); v != "" {
|
||||||
|
if b, err := parseBool(v); err == nil {
|
||||||
|
cfg.Discovery.EnableDHT = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("DHT_PREFIX"); v != "" {
|
||||||
|
cfg.Discovery.DHTPrefix = v
|
||||||
|
}
|
||||||
|
if v := os.Getenv("DISCOVERY_INTERVAL"); v != "" { // e.g. 5m
|
||||||
|
if d, err := time.ParseDuration(v); err == nil {
|
||||||
|
cfg.Discovery.DiscoveryInterval = d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Security
|
||||||
|
if v := os.Getenv("ENABLE_TLS"); v != "" {
|
||||||
|
if b, err := parseBool(v); err == nil {
|
||||||
|
cfg.Security.EnableTLS = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v := os.Getenv("PRIVATE_KEY_FILE"); v != "" {
|
||||||
|
cfg.Security.PrivateKeyFile = v
|
||||||
|
}
|
||||||
|
if v := os.Getenv("CERT_FILE"); v != "" {
|
||||||
|
cfg.Security.CertificateFile = v
|
||||||
|
}
|
||||||
|
if v := os.Getenv("AUTH_ENABLED"); v != "" {
|
||||||
|
if b, err := parseBool(v); err == nil {
|
||||||
|
cfg.Security.AuthEnabled = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Logging
|
||||||
|
if v := os.Getenv("LOG_LEVEL"); v != "" {
|
||||||
|
cfg.Logging.Level = strings.ToLower(v)
|
||||||
|
}
|
||||||
|
if v := os.Getenv("LOG_FORMAT"); v != "" {
|
||||||
|
cfg.Logging.Format = strings.ToLower(v)
|
||||||
|
}
|
||||||
|
if v := os.Getenv("LOG_OUTPUT_FILE"); v != "" {
|
||||||
|
cfg.Logging.OutputFile = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helpers
|
||||||
|
func splitAndTrim(csv string) []string {
|
||||||
|
parts := strings.Split(csv, ",")
|
||||||
|
out := make([]string, 0, len(parts))
|
||||||
|
for _, p := range parts {
|
||||||
|
s := strings.TrimSpace(p)
|
||||||
|
if s != "" {
|
||||||
|
out = append(out, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseBool(s string) (bool, error) {
|
||||||
|
switch strings.ToLower(strings.TrimSpace(s)) {
|
||||||
|
case "1", "true", "t", "yes", "y", "on":
|
||||||
|
return true, nil
|
||||||
|
case "0", "false", "f", "no", "n", "off":
|
||||||
|
return false, nil
|
||||||
|
default:
|
||||||
|
return strconv.ParseBool(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseInt64(s string) (int64, error) {
|
||||||
|
// Allow plain int or with optional suffixes k, m, g (base-1024)
|
||||||
|
s = strings.TrimSpace(strings.ToLower(s))
|
||||||
|
mul := int64(1)
|
||||||
|
if strings.HasSuffix(s, "k") {
|
||||||
|
mul = 1024
|
||||||
|
s = strings.TrimSuffix(s, "k")
|
||||||
|
} else if strings.HasSuffix(s, "m") {
|
||||||
|
mul = 1024 * 1024
|
||||||
|
s = strings.TrimSuffix(s, "m")
|
||||||
|
} else if strings.HasSuffix(s, "g") {
|
||||||
|
mul = 1024 * 1024 * 1024
|
||||||
|
s = strings.TrimSuffix(s, "g")
|
||||||
|
}
|
||||||
|
n, err := strconv.ParseInt(strings.TrimSpace(s), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return n * mul, nil
|
||||||
|
}
|
||||||
|
130
pkg/pubsub/discovery_integration.go
Normal file
130
pkg/pubsub/discovery_integration.go
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
package pubsub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
)
|
||||||
|
|
||||||
|
// announceTopicInterest helps with peer discovery by announcing interest in a topic
|
||||||
|
func (m *Manager) announceTopicInterest(topicName string) {
|
||||||
|
// Wait a bit for the subscription to be established
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// Get the topic
|
||||||
|
m.mu.RLock()
|
||||||
|
topic, exists := m.topics[topicName]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// For Anchat specifically, be more aggressive about finding peers
|
||||||
|
if len(m.namespace) > 6 && m.namespace[:6] == "anchat" {
|
||||||
|
go m.aggressiveTopicPeerDiscovery(topicName, topic)
|
||||||
|
} else {
|
||||||
|
// Start a periodic check to monitor topic peer growth
|
||||||
|
go m.monitorTopicPeers(topicName, topic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// aggressiveTopicPeerDiscovery for Anchat - actively seeks topic peers
|
||||||
|
func (m *Manager) aggressiveTopicPeerDiscovery(topicName string, topic *pubsub.Topic) {
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for i := 0; i < 30; i++ { // Monitor for 30 seconds
|
||||||
|
<-ticker.C
|
||||||
|
peers := topic.ListPeers()
|
||||||
|
|
||||||
|
// If we have peers, reduce frequency but keep monitoring
|
||||||
|
if len(peers) > 0 {
|
||||||
|
// Switch to normal monitoring once we have peers
|
||||||
|
go m.monitorTopicPeers(topicName, topic)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// For Anchat, try to actively discover and connect to peers on this topic
|
||||||
|
// This is critical because LibP2P pubsub requires direct connections for message propagation
|
||||||
|
m.forceTopicPeerDiscovery(topicName, topic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// enhancedAnchatTopicDiscovery implements enhanced peer discovery specifically for Anchat
|
||||||
|
func (m *Manager) enhancedAnchatTopicDiscovery(topicName string, topic *pubsub.Topic) {
|
||||||
|
// Wait for subscription to be fully established
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for i := 0; i < 20; i++ { // Monitor for 20 seconds
|
||||||
|
<-ticker.C
|
||||||
|
|
||||||
|
peers := topic.ListPeers()
|
||||||
|
if len(peers) > 0 {
|
||||||
|
// Success! We found topic peers
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try various discovery strategies
|
||||||
|
if i%3 == 0 {
|
||||||
|
// Strategy: Send discovery heartbeat
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||||
|
discoveryMsg := []byte("ANCHAT_DISCOVERY_PING")
|
||||||
|
topic.Publish(ctx, discoveryMsg)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait a bit and check again
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
peers = topic.ListPeers()
|
||||||
|
if len(peers) > 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// forceTopicPeerDiscovery uses multiple strategies to find and connect to topic peers
|
||||||
|
func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) {
|
||||||
|
// Strategy 1: Check if pubsub knows about any peers for this topic
|
||||||
|
peers := topic.ListPeers()
|
||||||
|
if len(peers) > 0 {
|
||||||
|
return // We already have peers
|
||||||
|
}
|
||||||
|
|
||||||
|
// Strategy 2: Try to actively announce our presence and wait for responses
|
||||||
|
// Send a ping/heartbeat to the topic to announce our presence
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Create a discovery message to announce our presence on this topic
|
||||||
|
discoveryMsg := []byte("ANCHAT_PEER_DISCOVERY")
|
||||||
|
topic.Publish(ctx, discoveryMsg)
|
||||||
|
|
||||||
|
// Strategy 3: Wait briefly and check again
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
_ = topic.ListPeers() // Check again but we don't need to use the result
|
||||||
|
|
||||||
|
// Note: In LibP2P, topics don't automatically form connections between subscribers
|
||||||
|
// The underlying network layer needs to ensure peers are connected first
|
||||||
|
// This is why our enhanced client peer discovery is crucial
|
||||||
|
}
|
||||||
|
|
||||||
|
// monitorTopicPeers periodically checks topic peer connectivity
|
||||||
|
func (m *Manager) monitorTopicPeers(topicName string, topic *pubsub.Topic) {
|
||||||
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for i := 0; i < 6; i++ { // Monitor for 30 seconds
|
||||||
|
<-ticker.C
|
||||||
|
peers := topic.ListPeers()
|
||||||
|
|
||||||
|
// If we have peers, we're good
|
||||||
|
if len(peers) > 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
17
pkg/pubsub/logging.go
Normal file
17
pkg/pubsub/logging.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
package pubsub
|
||||||
|
|
||||||
|
import "go.uber.org/zap"
|
||||||
|
|
||||||
|
// newPubSubLogger creates a zap.Logger for pubsub components.
|
||||||
|
// Quiet mode can be handled by callers by using production config externally;
|
||||||
|
// here we default to development logger for richer diagnostics during dev.
|
||||||
|
func newPubSubLogger(quiet bool) (*zap.Logger, error) {
|
||||||
|
if quiet {
|
||||||
|
cfg := zap.NewProductionConfig()
|
||||||
|
cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
|
||||||
|
cfg.DisableCaller = true
|
||||||
|
cfg.DisableStacktrace = true
|
||||||
|
return cfg.Build()
|
||||||
|
}
|
||||||
|
return zap.NewDevelopment()
|
||||||
|
}
|
@ -1,10 +1,7 @@
|
|||||||
package pubsub
|
package pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
)
|
)
|
||||||
@ -21,312 +18,15 @@ type Manager struct {
|
|||||||
// subscription holds subscription data
|
// subscription holds subscription data
|
||||||
type subscription struct {
|
type subscription struct {
|
||||||
sub *pubsub.Subscription
|
sub *pubsub.Subscription
|
||||||
cancel context.CancelFunc
|
cancel func()
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager creates a new pubsub manager
|
// NewManager creates a new pubsub manager
|
||||||
func NewManager(ps *pubsub.PubSub, namespace string) *Manager {
|
func NewManager(ps *pubsub.PubSub, namespace string) *Manager {
|
||||||
return &Manager{
|
return &Manager {
|
||||||
pubsub: ps,
|
pubsub: ps,
|
||||||
topics: make(map[string]*pubsub.Topic),
|
topics: make(map[string]*pubsub.Topic),
|
||||||
subscriptions: make(map[string]*subscription),
|
subscriptions: make(map[string]*subscription),
|
||||||
namespace: namespace,
|
namespace: namespace,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOrCreateTopic gets an existing topic or creates a new one
|
|
||||||
func (m *Manager) getOrCreateTopic(topicName string) (*pubsub.Topic, error) {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
// Return existing topic if available
|
|
||||||
if topic, exists := m.topics[topicName]; exists {
|
|
||||||
return topic, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Join the topic - LibP2P allows multiple clients to join the same topic
|
|
||||||
topic, err := m.pubsub.Join(topicName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to join topic: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
m.topics[topicName] = topic
|
|
||||||
return topic, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe subscribes to a topic
|
|
||||||
func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
|
|
||||||
if m.pubsub == nil {
|
|
||||||
return fmt.Errorf("pubsub not initialized")
|
|
||||||
}
|
|
||||||
|
|
||||||
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
|
|
||||||
|
|
||||||
// Check if already subscribed
|
|
||||||
m.mu.Lock()
|
|
||||||
if _, exists := m.subscriptions[namespacedTopic]; exists {
|
|
||||||
m.mu.Unlock()
|
|
||||||
// Already subscribed - this is normal for LibP2P pubsub
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
m.mu.Unlock()
|
|
||||||
|
|
||||||
// Get or create topic
|
|
||||||
libp2pTopic, err := m.getOrCreateTopic(namespacedTopic)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get topic: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe to topic
|
|
||||||
sub, err := libp2pTopic.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to subscribe to topic: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create cancellable context for this subscription
|
|
||||||
subCtx, cancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
// Store subscription
|
|
||||||
m.mu.Lock()
|
|
||||||
m.subscriptions[namespacedTopic] = &subscription{
|
|
||||||
sub: sub,
|
|
||||||
cancel: cancel,
|
|
||||||
}
|
|
||||||
m.mu.Unlock()
|
|
||||||
|
|
||||||
// Start message handler goroutine
|
|
||||||
go func() {
|
|
||||||
defer func() {
|
|
||||||
sub.Cancel()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-subCtx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
msg, err := sub.Next(subCtx)
|
|
||||||
if err != nil {
|
|
||||||
if subCtx.Err() != nil {
|
|
||||||
return // Context cancelled
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call the handler
|
|
||||||
if err := handler(topic, msg.Data); err != nil {
|
|
||||||
// Log error but continue processing
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Force peer discovery for this topic
|
|
||||||
go m.announceTopicInterest(namespacedTopic)
|
|
||||||
|
|
||||||
// For Anchat, also try to actively find topic peers through the libp2p pubsub system
|
|
||||||
if len(m.namespace) > 6 && m.namespace[:6] == "anchat" {
|
|
||||||
go m.enhancedAnchatTopicDiscovery(namespacedTopic, libp2pTopic)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish publishes a message to a topic
|
|
||||||
func (m *Manager) Publish(ctx context.Context, topic string, data []byte) error {
|
|
||||||
if m.pubsub == nil {
|
|
||||||
return fmt.Errorf("pubsub not initialized")
|
|
||||||
}
|
|
||||||
|
|
||||||
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
|
|
||||||
|
|
||||||
// Get or create topic
|
|
||||||
libp2pTopic, err := m.getOrCreateTopic(namespacedTopic)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get topic for publishing: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish message
|
|
||||||
if err := libp2pTopic.Publish(ctx, data); err != nil {
|
|
||||||
return fmt.Errorf("failed to publish message: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unsubscribe unsubscribes from a topic
|
|
||||||
func (m *Manager) Unsubscribe(ctx context.Context, topic string) error {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
|
|
||||||
|
|
||||||
if subscription, exists := m.subscriptions[namespacedTopic]; exists {
|
|
||||||
// Cancel the subscription context to stop the message handler goroutine
|
|
||||||
subscription.cancel()
|
|
||||||
delete(m.subscriptions, namespacedTopic)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListTopics returns all subscribed topics
|
|
||||||
func (m *Manager) ListTopics(ctx context.Context) ([]string, error) {
|
|
||||||
m.mu.RLock()
|
|
||||||
defer m.mu.RUnlock()
|
|
||||||
|
|
||||||
var topics []string
|
|
||||||
prefix := m.namespace + "."
|
|
||||||
|
|
||||||
for topic := range m.subscriptions {
|
|
||||||
if len(topic) > len(prefix) && topic[:len(prefix)] == prefix {
|
|
||||||
originalTopic := topic[len(prefix):]
|
|
||||||
topics = append(topics, originalTopic)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return topics, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes all subscriptions and topics
|
|
||||||
func (m *Manager) Close() error {
|
|
||||||
m.mu.Lock()
|
|
||||||
defer m.mu.Unlock()
|
|
||||||
|
|
||||||
// Cancel all subscriptions
|
|
||||||
for _, sub := range m.subscriptions {
|
|
||||||
sub.cancel()
|
|
||||||
}
|
|
||||||
m.subscriptions = make(map[string]*subscription)
|
|
||||||
|
|
||||||
// Close all topics
|
|
||||||
for _, topic := range m.topics {
|
|
||||||
topic.Close()
|
|
||||||
}
|
|
||||||
m.topics = make(map[string]*pubsub.Topic)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// announceTopicInterest helps with peer discovery by announcing interest in a topic
|
|
||||||
func (m *Manager) announceTopicInterest(topicName string) {
|
|
||||||
// Wait a bit for the subscription to be established
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
// Get the topic
|
|
||||||
m.mu.RLock()
|
|
||||||
topic, exists := m.topics[topicName]
|
|
||||||
m.mu.RUnlock()
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// For Anchat specifically, be more aggressive about finding peers
|
|
||||||
if len(m.namespace) > 6 && m.namespace[:6] == "anchat" {
|
|
||||||
go m.aggressiveTopicPeerDiscovery(topicName, topic)
|
|
||||||
} else {
|
|
||||||
// Start a periodic check to monitor topic peer growth
|
|
||||||
go m.monitorTopicPeers(topicName, topic)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// aggressiveTopicPeerDiscovery for Anchat - actively seeks topic peers
|
|
||||||
func (m *Manager) aggressiveTopicPeerDiscovery(topicName string, topic *pubsub.Topic) {
|
|
||||||
ticker := time.NewTicker(1 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for i := 0; i < 30; i++ { // Monitor for 30 seconds
|
|
||||||
<-ticker.C
|
|
||||||
peers := topic.ListPeers()
|
|
||||||
|
|
||||||
// If we have peers, reduce frequency but keep monitoring
|
|
||||||
if len(peers) > 0 {
|
|
||||||
// Switch to normal monitoring once we have peers
|
|
||||||
go m.monitorTopicPeers(topicName, topic)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// For Anchat, try to actively discover and connect to peers on this topic
|
|
||||||
// This is critical because LibP2P pubsub requires direct connections for message propagation
|
|
||||||
m.forceTopicPeerDiscovery(topicName, topic)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// enhancedAnchatTopicDiscovery implements enhanced peer discovery specifically for Anchat
|
|
||||||
func (m *Manager) enhancedAnchatTopicDiscovery(topicName string, topic *pubsub.Topic) {
|
|
||||||
// Wait for subscription to be fully established
|
|
||||||
time.Sleep(200 * time.Millisecond)
|
|
||||||
|
|
||||||
ticker := time.NewTicker(1 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for i := 0; i < 20; i++ { // Monitor for 20 seconds
|
|
||||||
<-ticker.C
|
|
||||||
|
|
||||||
peers := topic.ListPeers()
|
|
||||||
if len(peers) > 0 {
|
|
||||||
// Success! We found topic peers
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try various discovery strategies
|
|
||||||
if i%3 == 0 {
|
|
||||||
// Strategy: Send discovery heartbeat
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|
||||||
discoveryMsg := []byte("ANCHAT_DISCOVERY_PING")
|
|
||||||
topic.Publish(ctx, discoveryMsg)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait a bit and check again
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
peers = topic.ListPeers()
|
|
||||||
if len(peers) > 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// forceTopicPeerDiscovery uses multiple strategies to find and connect to topic peers
|
|
||||||
func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) {
|
|
||||||
// Strategy 1: Check if pubsub knows about any peers for this topic
|
|
||||||
peers := topic.ListPeers()
|
|
||||||
if len(peers) > 0 {
|
|
||||||
return // We already have peers
|
|
||||||
}
|
|
||||||
|
|
||||||
// Strategy 2: Try to actively announce our presence and wait for responses
|
|
||||||
// Send a ping/heartbeat to the topic to announce our presence
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Create a discovery message to announce our presence on this topic
|
|
||||||
discoveryMsg := []byte("ANCHAT_PEER_DISCOVERY")
|
|
||||||
topic.Publish(ctx, discoveryMsg)
|
|
||||||
|
|
||||||
// Strategy 3: Wait briefly and check again
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
_ = topic.ListPeers() // Check again but we don't need to use the result
|
|
||||||
|
|
||||||
// Note: In LibP2P, topics don't automatically form connections between subscribers
|
|
||||||
// The underlying network layer needs to ensure peers are connected first
|
|
||||||
// This is why our enhanced client peer discovery is crucial
|
|
||||||
}
|
|
||||||
|
|
||||||
// monitorTopicPeers periodically checks topic peer connectivity
|
|
||||||
func (m *Manager) monitorTopicPeers(topicName string, topic *pubsub.Topic) {
|
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for i := 0; i < 6; i++ { // Monitor for 30 seconds
|
|
||||||
<-ticker.C
|
|
||||||
peers := topic.ListPeers()
|
|
||||||
|
|
||||||
// If we have peers, we're good
|
|
||||||
if len(peers) > 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
28
pkg/pubsub/publish.go
Normal file
28
pkg/pubsub/publish.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package pubsub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Publish publishes a message to a topic
|
||||||
|
func (m *Manager) Publish(ctx context.Context, topic string, data []byte) error {
|
||||||
|
if m.pubsub == nil {
|
||||||
|
return fmt.Errorf("pubsub not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
|
||||||
|
|
||||||
|
// Get or create topic
|
||||||
|
libp2pTopic, err := m.getOrCreateTopic(namespacedTopic)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get topic for publishing: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish message
|
||||||
|
if err := libp2pTopic.Publish(ctx, data); err != nil {
|
||||||
|
return fmt.Errorf("failed to publish message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
140
pkg/pubsub/subscriptions.go
Normal file
140
pkg/pubsub/subscriptions.go
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
package pubsub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Subscribe subscribes to a topic
|
||||||
|
func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
|
||||||
|
if m.pubsub == nil {
|
||||||
|
return fmt.Errorf("pubsub not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
|
||||||
|
|
||||||
|
// Check if already subscribed
|
||||||
|
m.mu.Lock()
|
||||||
|
if _, exists := m.subscriptions[namespacedTopic]; exists {
|
||||||
|
m.mu.Unlock()
|
||||||
|
// Already subscribed - this is normal for LibP2P pubsub
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
// Get or create topic
|
||||||
|
libp2pTopic, err := m.getOrCreateTopic(namespacedTopic)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get topic: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe to topic
|
||||||
|
sub, err := libp2pTopic.Subscribe()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to subscribe to topic: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create cancellable context for this subscription
|
||||||
|
subCtx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// Store subscription
|
||||||
|
m.mu.Lock()
|
||||||
|
m.subscriptions[namespacedTopic] = &subscription{
|
||||||
|
sub: sub,
|
||||||
|
cancel: cancel,
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
// Start message handler goroutine
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
sub.Cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-subCtx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
msg, err := sub.Next(subCtx)
|
||||||
|
if err != nil {
|
||||||
|
if subCtx.Err() != nil {
|
||||||
|
return // Context cancelled
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the handler
|
||||||
|
if err := handler(topic, msg.Data); err != nil {
|
||||||
|
// Log error but continue processing
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Force peer discovery for this topic
|
||||||
|
go m.announceTopicInterest(namespacedTopic)
|
||||||
|
|
||||||
|
// For Anchat, also try to actively find topic peers through the libp2p pubsub system
|
||||||
|
if len(m.namespace) > 6 && m.namespace[:6] == "anchat" {
|
||||||
|
go m.enhancedAnchatTopicDiscovery(namespacedTopic, libp2pTopic)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe unsubscribes from a topic
|
||||||
|
func (m *Manager) Unsubscribe(ctx context.Context, topic string) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
namespacedTopic := fmt.Sprintf("%s.%s", m.namespace, topic)
|
||||||
|
|
||||||
|
if subscription, exists := m.subscriptions[namespacedTopic]; exists {
|
||||||
|
// Cancel the subscription context to stop the message handler goroutine
|
||||||
|
subscription.cancel()
|
||||||
|
delete(m.subscriptions, namespacedTopic)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListTopics returns all subscribed topics
|
||||||
|
func (m *Manager) ListTopics(ctx context.Context) ([]string, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
|
var topics []string
|
||||||
|
prefix := m.namespace + "."
|
||||||
|
|
||||||
|
for topic := range m.subscriptions {
|
||||||
|
if len(topic) > len(prefix) && topic[:len(prefix)] == prefix {
|
||||||
|
originalTopic := topic[len(prefix):]
|
||||||
|
topics = append(topics, originalTopic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return topics, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes all subscriptions and topics
|
||||||
|
func (m *Manager) Close() error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
// Cancel all subscriptions
|
||||||
|
for _, sub := range m.subscriptions {
|
||||||
|
sub.cancel()
|
||||||
|
}
|
||||||
|
m.subscriptions = make(map[string]*subscription)
|
||||||
|
|
||||||
|
// Close all topics
|
||||||
|
for _, topic := range m.topics {
|
||||||
|
topic.Close()
|
||||||
|
}
|
||||||
|
m.topics = make(map[string]*pubsub.Topic)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
27
pkg/pubsub/topics.go
Normal file
27
pkg/pubsub/topics.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package pubsub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
)
|
||||||
|
|
||||||
|
// getOrCreateTopic gets an existing topic or creates a new one
|
||||||
|
func (m *Manager) getOrCreateTopic(topicName string) (*pubsub.Topic, error) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
// Return existing topic if available
|
||||||
|
if topic, exists := m.topics[topicName]; exists {
|
||||||
|
return topic, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Join the topic - LibP2P allows multiple clients to join the same topic
|
||||||
|
topic, err := m.pubsub.Join(topicName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to join topic: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.topics[topicName] = topic
|
||||||
|
return topic, nil
|
||||||
|
}
|
182
pkg/storage/kv_ops.go
Normal file
182
pkg/storage/kv_ops.go
Normal file
@ -0,0 +1,182 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// processRequest processes a storage request and returns a response
|
||||||
|
func (s *Service) processRequest(req *StorageRequest) *StorageResponse {
|
||||||
|
switch req.Type {
|
||||||
|
case MessageTypePut:
|
||||||
|
return s.handlePut(req)
|
||||||
|
case MessageTypeGet:
|
||||||
|
return s.handleGet(req)
|
||||||
|
case MessageTypeDelete:
|
||||||
|
return s.handleDelete(req)
|
||||||
|
case MessageTypeList:
|
||||||
|
return s.handleList(req)
|
||||||
|
case MessageTypeExists:
|
||||||
|
return s.handleExists(req)
|
||||||
|
default:
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("unknown message type: %s", req.Type),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlePut stores a key-value pair
|
||||||
|
func (s *Service) handlePut(req *StorageRequest) *StorageResponse {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
// Use REPLACE to handle both insert and update
|
||||||
|
query := `
|
||||||
|
REPLACE INTO kv_storage (namespace, key, value, updated_at)
|
||||||
|
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
|
||||||
|
`
|
||||||
|
|
||||||
|
_, err := s.db.Exec(query, req.Namespace, req.Key, req.Value)
|
||||||
|
if err != nil {
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to store key: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Debug("Stored key", zap.String("key", req.Key), zap.String("namespace", req.Namespace))
|
||||||
|
return &StorageResponse{Success: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGet retrieves a value by key
|
||||||
|
func (s *Service) handleGet(req *StorageRequest) *StorageResponse {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
query := `SELECT value FROM kv_storage WHERE namespace = ? AND key = ?`
|
||||||
|
|
||||||
|
var value []byte
|
||||||
|
err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&value)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("key not found: %s", req.Key),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to get key: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: true,
|
||||||
|
Value: value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDelete removes a key
|
||||||
|
func (s *Service) handleDelete(req *StorageRequest) *StorageResponse {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
query := `DELETE FROM kv_storage WHERE namespace = ? AND key = ?`
|
||||||
|
|
||||||
|
result, err := s.db.Exec(query, req.Namespace, req.Key)
|
||||||
|
if err != nil {
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to delete key: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsAffected, _ := result.RowsAffected()
|
||||||
|
if rowsAffected == 0 {
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("key not found: %s", req.Key),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Debug("Deleted key", zap.String("key", req.Key), zap.String("namespace", req.Namespace))
|
||||||
|
return &StorageResponse{Success: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleList lists keys with a prefix
|
||||||
|
func (s *Service) handleList(req *StorageRequest) *StorageResponse {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
var query string
|
||||||
|
var args []interface{}
|
||||||
|
|
||||||
|
if req.Prefix == "" {
|
||||||
|
// List all keys in namespace
|
||||||
|
query = `SELECT key FROM kv_storage WHERE namespace = ?`
|
||||||
|
args = []interface{}{req.Namespace}
|
||||||
|
} else {
|
||||||
|
// List keys with prefix
|
||||||
|
query = `SELECT key FROM kv_storage WHERE namespace = ? AND key LIKE ?`
|
||||||
|
args = []interface{}{req.Namespace, req.Prefix + "%"}
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.Limit > 0 {
|
||||||
|
query += ` LIMIT ?`
|
||||||
|
args = append(args, req.Limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := s.db.Query(query, args...)
|
||||||
|
if err != nil {
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to query keys: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var keys []string
|
||||||
|
for rows.Next() {
|
||||||
|
var key string
|
||||||
|
if err := rows.Scan(&key); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
keys = append(keys, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: true,
|
||||||
|
Keys: keys,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleExists checks if a key exists
|
||||||
|
func (s *Service) handleExists(req *StorageRequest) *StorageResponse {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
query := `SELECT 1 FROM kv_storage WHERE namespace = ? AND key = ? LIMIT 1`
|
||||||
|
|
||||||
|
var exists int
|
||||||
|
err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&exists)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: true,
|
||||||
|
Exists: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to check key existence: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &StorageResponse{
|
||||||
|
Success: true,
|
||||||
|
Exists: true,
|
||||||
|
}
|
||||||
|
}
|
16
pkg/storage/logging.go
Normal file
16
pkg/storage/logging.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import "go.uber.org/zap"
|
||||||
|
|
||||||
|
// newStorageLogger creates a zap.Logger for storage components.
|
||||||
|
// Callers can pass quiet=true to reduce log verbosity.
|
||||||
|
func newStorageLogger(quiet bool) (*zap.Logger, error) {
|
||||||
|
if quiet {
|
||||||
|
cfg := zap.NewProductionConfig()
|
||||||
|
cfg.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
|
||||||
|
cfg.DisableCaller = true
|
||||||
|
cfg.DisableStacktrace = true
|
||||||
|
return cfg.Build()
|
||||||
|
}
|
||||||
|
return zap.NewDevelopment()
|
||||||
|
}
|
37
pkg/storage/rqlite_init.go
Normal file
37
pkg/storage/rqlite_init.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// initTables creates the necessary tables for key-value storage
|
||||||
|
func (s *Service) initTables() error {
|
||||||
|
// Create storage table with namespace support
|
||||||
|
createTableSQL := `
|
||||||
|
CREATE TABLE IF NOT EXISTS kv_storage (
|
||||||
|
namespace TEXT NOT NULL,
|
||||||
|
key TEXT NOT NULL,
|
||||||
|
value BLOB NOT NULL,
|
||||||
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
PRIMARY KEY (namespace, key)
|
||||||
|
)
|
||||||
|
`
|
||||||
|
|
||||||
|
// Create index for faster queries
|
||||||
|
createIndexSQL := `
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_kv_storage_namespace_key
|
||||||
|
ON kv_storage(namespace, key)
|
||||||
|
`
|
||||||
|
|
||||||
|
if _, err := s.db.Exec(createTableSQL); err != nil {
|
||||||
|
return fmt.Errorf("failed to create storage table: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := s.db.Exec(createIndexSQL); err != nil {
|
||||||
|
return fmt.Errorf("failed to create storage index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Info("Storage tables initialized successfully")
|
||||||
|
return nil
|
||||||
|
}
|
@ -2,11 +2,8 @@ package storage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/network"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -24,260 +21,9 @@ func NewService(db *sql.DB, logger *zap.Logger) (*Service, error) {
|
|||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize storage tables
|
|
||||||
if err := service.initTables(); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to initialize storage tables: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return service, nil
|
return service, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// initTables creates the necessary tables for key-value storage
|
|
||||||
func (s *Service) initTables() error {
|
|
||||||
// Create storage table with namespace support
|
|
||||||
createTableSQL := `
|
|
||||||
CREATE TABLE IF NOT EXISTS kv_storage (
|
|
||||||
namespace TEXT NOT NULL,
|
|
||||||
key TEXT NOT NULL,
|
|
||||||
value BLOB NOT NULL,
|
|
||||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (namespace, key)
|
|
||||||
)
|
|
||||||
`
|
|
||||||
|
|
||||||
// Create index for faster queries
|
|
||||||
createIndexSQL := `
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_kv_storage_namespace_key
|
|
||||||
ON kv_storage(namespace, key)
|
|
||||||
`
|
|
||||||
|
|
||||||
if _, err := s.db.Exec(createTableSQL); err != nil {
|
|
||||||
return fmt.Errorf("failed to create storage table: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := s.db.Exec(createIndexSQL); err != nil {
|
|
||||||
return fmt.Errorf("failed to create storage index: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.logger.Info("Storage tables initialized successfully")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// HandleStorageStream handles incoming storage protocol streams
|
|
||||||
func (s *Service) HandleStorageStream(stream network.Stream) {
|
|
||||||
defer stream.Close()
|
|
||||||
|
|
||||||
// Read request
|
|
||||||
data, err := io.ReadAll(stream)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Failed to read storage request", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var request StorageRequest
|
|
||||||
if err := request.Unmarshal(data); err != nil {
|
|
||||||
s.logger.Error("Failed to unmarshal storage request", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process request
|
|
||||||
response := s.processRequest(&request)
|
|
||||||
|
|
||||||
// Send response
|
|
||||||
responseData, err := response.Marshal()
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Failed to marshal storage response", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := stream.Write(responseData); err != nil {
|
|
||||||
s.logger.Error("Failed to write storage response", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.logger.Debug("Handled storage request",
|
|
||||||
zap.String("type", string(request.Type)),
|
|
||||||
zap.String("key", request.Key),
|
|
||||||
zap.String("namespace", request.Namespace),
|
|
||||||
zap.Bool("success", response.Success),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// processRequest processes a storage request and returns a response
|
|
||||||
func (s *Service) processRequest(req *StorageRequest) *StorageResponse {
|
|
||||||
switch req.Type {
|
|
||||||
case MessageTypePut:
|
|
||||||
return s.handlePut(req)
|
|
||||||
case MessageTypeGet:
|
|
||||||
return s.handleGet(req)
|
|
||||||
case MessageTypeDelete:
|
|
||||||
return s.handleDelete(req)
|
|
||||||
case MessageTypeList:
|
|
||||||
return s.handleList(req)
|
|
||||||
case MessageTypeExists:
|
|
||||||
return s.handleExists(req)
|
|
||||||
default:
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: false,
|
|
||||||
Error: fmt.Sprintf("unknown message type: %s", req.Type),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handlePut stores a key-value pair
|
|
||||||
func (s *Service) handlePut(req *StorageRequest) *StorageResponse {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
// Use REPLACE to handle both insert and update
|
|
||||||
query := `
|
|
||||||
REPLACE INTO kv_storage (namespace, key, value, updated_at)
|
|
||||||
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
|
|
||||||
`
|
|
||||||
|
|
||||||
_, err := s.db.Exec(query, req.Namespace, req.Key, req.Value)
|
|
||||||
if err != nil {
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: false,
|
|
||||||
Error: fmt.Sprintf("failed to store key: %v", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.logger.Debug("Stored key", zap.String("key", req.Key), zap.String("namespace", req.Namespace))
|
|
||||||
return &StorageResponse{Success: true}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleGet retrieves a value by key
|
|
||||||
func (s *Service) handleGet(req *StorageRequest) *StorageResponse {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
|
|
||||||
query := `SELECT value FROM kv_storage WHERE namespace = ? AND key = ?`
|
|
||||||
|
|
||||||
var value []byte
|
|
||||||
err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&value)
|
|
||||||
if err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: false,
|
|
||||||
Error: fmt.Sprintf("key not found: %s", req.Key),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: false,
|
|
||||||
Error: fmt.Sprintf("failed to get key: %v", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: true,
|
|
||||||
Value: value,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleDelete removes a key
|
|
||||||
func (s *Service) handleDelete(req *StorageRequest) *StorageResponse {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
query := `DELETE FROM kv_storage WHERE namespace = ? AND key = ?`
|
|
||||||
|
|
||||||
result, err := s.db.Exec(query, req.Namespace, req.Key)
|
|
||||||
if err != nil {
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: false,
|
|
||||||
Error: fmt.Sprintf("failed to delete key: %v", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rowsAffected, _ := result.RowsAffected()
|
|
||||||
if rowsAffected == 0 {
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: false,
|
|
||||||
Error: fmt.Sprintf("key not found: %s", req.Key),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.logger.Debug("Deleted key", zap.String("key", req.Key), zap.String("namespace", req.Namespace))
|
|
||||||
return &StorageResponse{Success: true}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleList lists keys with a prefix
|
|
||||||
func (s *Service) handleList(req *StorageRequest) *StorageResponse {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
|
|
||||||
var query string
|
|
||||||
var args []interface{}
|
|
||||||
|
|
||||||
if req.Prefix == "" {
|
|
||||||
// List all keys in namespace
|
|
||||||
query = `SELECT key FROM kv_storage WHERE namespace = ?`
|
|
||||||
args = []interface{}{req.Namespace}
|
|
||||||
} else {
|
|
||||||
// List keys with prefix
|
|
||||||
query = `SELECT key FROM kv_storage WHERE namespace = ? AND key LIKE ?`
|
|
||||||
args = []interface{}{req.Namespace, req.Prefix + "%"}
|
|
||||||
}
|
|
||||||
|
|
||||||
if req.Limit > 0 {
|
|
||||||
query += ` LIMIT ?`
|
|
||||||
args = append(args, req.Limit)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := s.db.Query(query, args...)
|
|
||||||
if err != nil {
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: false,
|
|
||||||
Error: fmt.Sprintf("failed to query keys: %v", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
var keys []string
|
|
||||||
for rows.Next() {
|
|
||||||
var key string
|
|
||||||
if err := rows.Scan(&key); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
keys = append(keys, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: true,
|
|
||||||
Keys: keys,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleExists checks if a key exists
|
|
||||||
func (s *Service) handleExists(req *StorageRequest) *StorageResponse {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
|
|
||||||
query := `SELECT 1 FROM kv_storage WHERE namespace = ? AND key = ? LIMIT 1`
|
|
||||||
|
|
||||||
var exists int
|
|
||||||
err := s.db.QueryRow(query, req.Namespace, req.Key).Scan(&exists)
|
|
||||||
if err != nil {
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: true,
|
|
||||||
Exists: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: false,
|
|
||||||
Error: fmt.Sprintf("failed to check key existence: %v", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &StorageResponse{
|
|
||||||
Success: true,
|
|
||||||
Exists: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the storage service
|
// Close closes the storage service
|
||||||
func (s *Service) Close() error {
|
func (s *Service) Close() error {
|
||||||
// The database connection is managed elsewhere
|
// The database connection is managed elsewhere
|
||||||
|
48
pkg/storage/stream_handler.go
Normal file
48
pkg/storage/stream_handler.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/network"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HandleStorageStream handles incoming storage protocol streams
|
||||||
|
func (s *Service) HandleStorageStream(stream network.Stream) {
|
||||||
|
defer stream.Close()
|
||||||
|
|
||||||
|
// Read request
|
||||||
|
data, err := io.ReadAll(stream)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to read storage request", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var request StorageRequest
|
||||||
|
if err := request.Unmarshal(data); err != nil {
|
||||||
|
s.logger.Error("Failed to unmarshal storage request", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process request
|
||||||
|
response := s.processRequest(&request)
|
||||||
|
|
||||||
|
// Send response
|
||||||
|
responseData, err := response.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Error("Failed to marshal storage response", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := stream.Write(responseData); err != nil {
|
||||||
|
s.logger.Error("Failed to write storage response", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Debug("Handled storage request",
|
||||||
|
zap.String("type", string(request.Type)),
|
||||||
|
zap.String("key", request.Key),
|
||||||
|
zap.String("namespace", request.Namespace),
|
||||||
|
zap.Bool("success", response.Success),
|
||||||
|
)
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user