mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 08:19:07 +00:00
Here's the commit message: ``` Fix code style and indentation Apply consistent indentation, fix whitespace and tabs vs spaces issues, remove trailing whitespace, and ensure proper line endings throughout the codebase. Also add comments and improve code organization. ``` The message body is included since this is a bigger cleanup effort that touched multiple files and made various formatting improvements that are worth explaining.
655 lines
17 KiB
Go
655 lines
17 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.debros.io/DeBros/network/pkg/storage"
|
|
|
|
"git.debros.io/DeBros/network/pkg/anyoneproxy"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
"github.com/rqlite/gorqlite"
|
|
)
|
|
|
|
// DatabaseClientImpl implements DatabaseClient
|
|
type DatabaseClientImpl struct {
|
|
client *Client
|
|
connection *gorqlite.Connection
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// checkConnection verifies the client is connected
|
|
func (d *DatabaseClientImpl) checkConnection() error {
|
|
if !d.client.isConnected() {
|
|
return fmt.Errorf("client not connected")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// withRetry executes an operation with retry logic
|
|
func (d *DatabaseClientImpl) withRetry(operation func(*gorqlite.Connection) error) error {
|
|
maxRetries := 3
|
|
var lastErr error
|
|
|
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
|
conn, err := d.getRQLiteConnection()
|
|
if err != nil {
|
|
lastErr = err
|
|
d.clearConnection()
|
|
continue
|
|
}
|
|
|
|
if err := operation(conn); err != nil {
|
|
lastErr = err
|
|
d.clearConnection()
|
|
continue
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("operation failed after %d attempts. Last error: %w", maxRetries, lastErr)
|
|
}
|
|
|
|
// Query executes a SQL query
|
|
func (d *DatabaseClientImpl) Query(ctx context.Context, sql string, args ...interface{}) (*QueryResult, error) {
|
|
if err := d.checkConnection(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := d.client.requireAccess(ctx); err != nil {
|
|
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
// Determine if this is a read or write operation
|
|
isWriteOperation := d.isWriteOperation(sql)
|
|
|
|
// Retry logic for resilient querying
|
|
maxRetries := 3
|
|
var lastErr error
|
|
|
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
|
// Get RQLite connection (tries multiple nodes)
|
|
conn, err := d.getRQLiteConnection()
|
|
if err != nil {
|
|
lastErr = err
|
|
|
|
// Clear any cached connection and try again
|
|
d.clearConnection()
|
|
continue
|
|
}
|
|
|
|
if isWriteOperation {
|
|
// Execute write operation with parameters
|
|
_, err := conn.WriteOneParameterized(gorqlite.ParameterizedStatement{
|
|
Query: sql,
|
|
Arguments: args,
|
|
})
|
|
if err != nil {
|
|
lastErr = err
|
|
d.clearConnection()
|
|
continue
|
|
}
|
|
|
|
// For write operations, return empty result set
|
|
return &QueryResult{
|
|
Columns: []string{"affected"},
|
|
Rows: [][]interface{}{{"success"}},
|
|
Count: 1,
|
|
}, nil
|
|
} else {
|
|
// Execute read operation with parameters
|
|
result, err := conn.QueryOneParameterized(gorqlite.ParameterizedStatement{
|
|
Query: sql,
|
|
Arguments: args,
|
|
})
|
|
if err != nil {
|
|
lastErr = err
|
|
d.clearConnection()
|
|
continue
|
|
}
|
|
|
|
// Convert gorqlite.QueryResult to our QueryResult
|
|
columns := result.Columns()
|
|
numRows := int(result.NumRows())
|
|
|
|
queryResult := &QueryResult{
|
|
Columns: columns,
|
|
Rows: make([][]interface{}, 0, numRows),
|
|
Count: result.NumRows(),
|
|
}
|
|
|
|
// Iterate through rows
|
|
for result.Next() {
|
|
row, err := result.Slice()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
queryResult.Rows = append(queryResult.Rows, row)
|
|
}
|
|
|
|
return queryResult, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("query failed after %d attempts. Last error: %w", maxRetries, lastErr)
|
|
}
|
|
|
|
// isWriteOperation determines if a SQL statement is a write operation
|
|
func (d *DatabaseClientImpl) isWriteOperation(sql string) bool {
|
|
// Convert to uppercase for comparison
|
|
sqlUpper := strings.ToUpper(strings.TrimSpace(sql))
|
|
|
|
// List of write operation keywords
|
|
writeKeywords := []string{
|
|
"INSERT", "UPDATE", "DELETE", "CREATE", "DROP", "ALTER",
|
|
"TRUNCATE", "REPLACE", "MERGE", "PRAGMA",
|
|
}
|
|
|
|
for _, keyword := range writeKeywords {
|
|
if strings.HasPrefix(sqlUpper, keyword) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// clearConnection clears the cached connection to force reconnection
|
|
func (d *DatabaseClientImpl) clearConnection() {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
d.connection = nil
|
|
}
|
|
|
|
// getRQLiteConnection returns a connection to RQLite, creating one if needed
|
|
func (d *DatabaseClientImpl) getRQLiteConnection() (*gorqlite.Connection, error) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
// Always try to get a fresh connection to handle leadership changes
|
|
// and node failures gracefully
|
|
return d.connectToAvailableNode()
|
|
}
|
|
|
|
// getRQLiteNodes returns a list of RQLite node URLs with precedence:
|
|
// 1) client config DatabaseEndpoints
|
|
// 2) RQLITE_NODES env (comma/space separated)
|
|
// 3) library defaults via DefaultDatabaseEndpoints()
|
|
func (d *DatabaseClientImpl) getRQLiteNodes() []string {
|
|
// 1) Prefer explicit configuration on the client
|
|
if d.client != nil && d.client.config != nil && len(d.client.config.DatabaseEndpoints) > 0 {
|
|
return dedupeStrings(normalizeEndpoints(d.client.config.DatabaseEndpoints))
|
|
}
|
|
|
|
// 3) Fallback to library defaults derived from bootstrap peers
|
|
return DefaultDatabaseEndpoints()
|
|
}
|
|
|
|
// normalizeEndpoints is now imported from defaults.go
|
|
|
|
func hasPort(hostport string) bool {
|
|
// cheap check for :port suffix (IPv6 with brackets handled by url.Parse earlier)
|
|
if i := strings.LastIndex(hostport, ":"); i > -1 && i < len(hostport)-1 {
|
|
// ensure the segment after ':' is numeric-ish
|
|
for _, c := range hostport[i+1:] {
|
|
if c < '0' || c > '9' {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// connectToAvailableNode tries to connect to any available RQLite node
|
|
func (d *DatabaseClientImpl) connectToAvailableNode() (*gorqlite.Connection, error) {
|
|
// Get RQLite nodes from environment or use defaults
|
|
rqliteNodes := d.getRQLiteNodes()
|
|
|
|
var lastErr error
|
|
|
|
for _, rqliteURL := range rqliteNodes {
|
|
var conn *gorqlite.Connection
|
|
var err error
|
|
// If Anyone proxy is enabled, build a proxy-aware HTTP client
|
|
if anyoneproxy.Enabled() {
|
|
httpClient := anyoneproxy.NewHTTPClient()
|
|
conn, err = gorqlite.OpenWithClient(rqliteURL, httpClient)
|
|
} else {
|
|
conn, err = gorqlite.Open(rqliteURL)
|
|
}
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
|
|
// Test the connection with a simple query to ensure it's working
|
|
// and the node has leadership or can serve reads
|
|
if err := d.testConnection(conn); err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
|
|
d.connection = conn
|
|
return conn, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("failed to connect to any RQLite instance. Last error: %w", lastErr)
|
|
}
|
|
|
|
// testConnection performs a health check on the RQLite connection
|
|
func (d *DatabaseClientImpl) testConnection(conn *gorqlite.Connection) error {
|
|
// Try a simple read query first (works even without leadership)
|
|
result, err := conn.QueryOne("SELECT 1")
|
|
if err != nil {
|
|
return fmt.Errorf("read test failed: %w", err)
|
|
}
|
|
|
|
// Check if we got a valid result
|
|
if result.NumRows() == 0 {
|
|
return fmt.Errorf("read test returned no rows")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Transaction executes multiple queries in a transaction
|
|
func (d *DatabaseClientImpl) Transaction(ctx context.Context, queries []string) error {
|
|
if !d.client.isConnected() {
|
|
return fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := d.client.requireAccess(ctx); err != nil {
|
|
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
maxRetries := 3
|
|
var lastErr error
|
|
|
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
|
// Get RQLite connection
|
|
conn, err := d.getRQLiteConnection()
|
|
if err != nil {
|
|
lastErr = err
|
|
d.clearConnection()
|
|
continue
|
|
}
|
|
|
|
// Execute all queries in the transaction
|
|
success := true
|
|
for _, query := range queries {
|
|
_, err := conn.WriteOne(query)
|
|
if err != nil {
|
|
lastErr = err
|
|
success = false
|
|
d.clearConnection()
|
|
break
|
|
}
|
|
}
|
|
|
|
if success {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("transaction failed after %d attempts. Last error: %w", maxRetries, lastErr)
|
|
}
|
|
|
|
// CreateTable creates a new table
|
|
func (d *DatabaseClientImpl) CreateTable(ctx context.Context, schema string) error {
|
|
if err := d.checkConnection(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := d.client.requireAccess(ctx); err != nil {
|
|
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
return d.withRetry(func(conn *gorqlite.Connection) error {
|
|
_, err := conn.WriteOne(schema)
|
|
return err
|
|
})
|
|
}
|
|
|
|
// DropTable drops a table
|
|
func (d *DatabaseClientImpl) DropTable(ctx context.Context, tableName string) error {
|
|
if err := d.checkConnection(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return d.withRetry(func(conn *gorqlite.Connection) error {
|
|
dropSQL := fmt.Sprintf("DROP TABLE IF EXISTS %s", tableName)
|
|
_, err := conn.WriteOne(dropSQL)
|
|
return err
|
|
})
|
|
}
|
|
|
|
// GetSchema returns schema information
|
|
func (d *DatabaseClientImpl) GetSchema(ctx context.Context) (*SchemaInfo, error) {
|
|
if !d.client.isConnected() {
|
|
return nil, fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := d.client.requireAccess(ctx); err != nil {
|
|
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
// Get RQLite connection
|
|
conn, err := d.getRQLiteConnection()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get RQLite connection: %w", err)
|
|
}
|
|
|
|
// Query for all tables
|
|
result, err := conn.QueryOne("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query table list: %w", err)
|
|
}
|
|
|
|
schema := &SchemaInfo{
|
|
Tables: make([]TableInfo, 0),
|
|
}
|
|
|
|
// Iterate through tables
|
|
for result.Next() {
|
|
row, err := result.Slice()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get table row: %w", err)
|
|
}
|
|
|
|
if len(row) > 0 {
|
|
tableName := fmt.Sprintf("%v", row[0])
|
|
|
|
// Get column information for this table
|
|
columnResult, err := conn.QueryOne(fmt.Sprintf("PRAGMA table_info(%s)", tableName))
|
|
if err != nil {
|
|
continue // Skip this table if we can't get column info
|
|
}
|
|
|
|
tableInfo := TableInfo{
|
|
Name: tableName,
|
|
Columns: make([]ColumnInfo, 0),
|
|
}
|
|
|
|
// Parse column information
|
|
for columnResult.Next() {
|
|
colRow, err := columnResult.Slice()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if len(colRow) >= 6 {
|
|
columnInfo := ColumnInfo{
|
|
Name: fmt.Sprintf("%v", colRow[1]), // name
|
|
Type: fmt.Sprintf("%v", colRow[2]), // type
|
|
Nullable: fmt.Sprintf("%v", colRow[3]) == "0", // notnull (0 = nullable, 1 = not null)
|
|
}
|
|
tableInfo.Columns = append(tableInfo.Columns, columnInfo)
|
|
}
|
|
}
|
|
|
|
schema.Tables = append(schema.Tables, tableInfo)
|
|
}
|
|
}
|
|
|
|
return schema, nil
|
|
}
|
|
|
|
// StorageClientImpl implements StorageClient using distributed storage
|
|
type StorageClientImpl struct {
|
|
client *Client
|
|
storageClient *storage.Client
|
|
}
|
|
|
|
// Get retrieves a value by key
|
|
func (s *StorageClientImpl) Get(ctx context.Context, key string) ([]byte, error) {
|
|
if !s.client.isConnected() {
|
|
return nil, fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := s.client.requireAccess(ctx); err != nil {
|
|
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
return s.storageClient.Get(ctx, key)
|
|
}
|
|
|
|
// Put stores a value by key
|
|
func (s *StorageClientImpl) Put(ctx context.Context, key string, value []byte) error {
|
|
if !s.client.isConnected() {
|
|
return fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := s.client.requireAccess(ctx); err != nil {
|
|
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
err := s.storageClient.Put(ctx, key, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete removes a key
|
|
func (s *StorageClientImpl) Delete(ctx context.Context, key string) error {
|
|
if !s.client.isConnected() {
|
|
return fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := s.client.requireAccess(ctx); err != nil {
|
|
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
err := s.storageClient.Delete(ctx, key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// List returns keys with a given prefix
|
|
func (s *StorageClientImpl) List(ctx context.Context, prefix string, limit int) ([]string, error) {
|
|
if !s.client.isConnected() {
|
|
return nil, fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := s.client.requireAccess(ctx); err != nil {
|
|
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
return s.storageClient.List(ctx, prefix, limit)
|
|
}
|
|
|
|
// Exists checks if a key exists
|
|
func (s *StorageClientImpl) Exists(ctx context.Context, key string) (bool, error) {
|
|
if !s.client.isConnected() {
|
|
return false, fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := s.client.requireAccess(ctx); err != nil {
|
|
return false, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
return s.storageClient.Exists(ctx, key)
|
|
}
|
|
|
|
// NetworkInfoImpl implements NetworkInfo
|
|
type NetworkInfoImpl struct {
|
|
client *Client
|
|
}
|
|
|
|
// GetPeers returns information about connected peers
|
|
func (n *NetworkInfoImpl) GetPeers(ctx context.Context) ([]PeerInfo, error) {
|
|
if !n.client.isConnected() {
|
|
return nil, fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := n.client.requireAccess(ctx); err != nil {
|
|
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
// Get peers from LibP2P host
|
|
host := n.client.host
|
|
if host == nil {
|
|
return nil, fmt.Errorf("no host available")
|
|
}
|
|
|
|
// Get connected peers
|
|
connectedPeers := host.Network().Peers()
|
|
peers := make([]PeerInfo, 0, len(connectedPeers)+1) // +1 for self
|
|
|
|
// Add connected peers
|
|
for _, peerID := range connectedPeers {
|
|
// Get peer addresses
|
|
peerInfo := host.Peerstore().PeerInfo(peerID)
|
|
|
|
// Convert multiaddrs to strings
|
|
addrs := make([]string, len(peerInfo.Addrs))
|
|
for i, addr := range peerInfo.Addrs {
|
|
addrs[i] = addr.String()
|
|
}
|
|
|
|
peers = append(peers, PeerInfo{
|
|
ID: peerID.String(),
|
|
Addresses: addrs,
|
|
Connected: true,
|
|
LastSeen: time.Now(), // LibP2P doesn't track last seen, so use current time
|
|
})
|
|
}
|
|
|
|
// Add self node
|
|
selfPeerInfo := host.Peerstore().PeerInfo(host.ID())
|
|
selfAddrs := make([]string, len(selfPeerInfo.Addrs))
|
|
for i, addr := range selfPeerInfo.Addrs {
|
|
selfAddrs[i] = addr.String()
|
|
}
|
|
|
|
// Insert self node at the beginning of the list
|
|
selfPeer := PeerInfo{
|
|
ID: host.ID().String(),
|
|
Addresses: selfAddrs,
|
|
Connected: true,
|
|
LastSeen: time.Now(),
|
|
}
|
|
|
|
// Prepend self to the list
|
|
peers = append([]PeerInfo{selfPeer}, peers...)
|
|
|
|
return peers, nil
|
|
}
|
|
|
|
// GetStatus returns network status
|
|
func (n *NetworkInfoImpl) GetStatus(ctx context.Context) (*NetworkStatus, error) {
|
|
if !n.client.isConnected() {
|
|
return nil, fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := n.client.requireAccess(ctx); err != nil {
|
|
return nil, fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
host := n.client.host
|
|
if host == nil {
|
|
return nil, fmt.Errorf("no host available")
|
|
}
|
|
|
|
// Get actual network status
|
|
connectedPeers := host.Network().Peers()
|
|
|
|
// Try to get database size from RQLite (optional - don't fail if unavailable)
|
|
var dbSize int64 = 0
|
|
dbClient := n.client.database
|
|
if conn, err := dbClient.getRQLiteConnection(); err == nil {
|
|
// Query database size (rough estimate)
|
|
if result, err := conn.QueryOne("SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()"); err == nil {
|
|
for result.Next() {
|
|
if row, err := result.Slice(); err == nil && len(row) > 0 {
|
|
if size, ok := row[0].(int64); ok {
|
|
dbSize = size
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return &NetworkStatus{
|
|
NodeID: host.ID().String(),
|
|
Connected: true,
|
|
PeerCount: len(connectedPeers),
|
|
DatabaseSize: dbSize,
|
|
Uptime: time.Since(n.client.startTime),
|
|
}, nil
|
|
}
|
|
|
|
// ConnectToPeer connects to a specific peer
|
|
func (n *NetworkInfoImpl) ConnectToPeer(ctx context.Context, peerAddr string) error {
|
|
if !n.client.isConnected() {
|
|
return fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := n.client.requireAccess(ctx); err != nil {
|
|
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
host := n.client.host
|
|
if host == nil {
|
|
return fmt.Errorf("no host available")
|
|
}
|
|
|
|
// Parse the multiaddr
|
|
ma, err := multiaddr.NewMultiaddr(peerAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid multiaddr: %w", err)
|
|
}
|
|
|
|
// Extract peer info
|
|
peerInfo, err := peer.AddrInfoFromP2pAddr(ma)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to extract peer info: %w", err)
|
|
}
|
|
|
|
// Connect to the peer
|
|
if err := host.Connect(ctx, *peerInfo); err != nil {
|
|
return fmt.Errorf("failed to connect to peer: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DisconnectFromPeer disconnects from a specific peer
|
|
func (n *NetworkInfoImpl) DisconnectFromPeer(ctx context.Context, peerID string) error {
|
|
if !n.client.isConnected() {
|
|
return fmt.Errorf("client not connected")
|
|
}
|
|
|
|
if err := n.client.requireAccess(ctx); err != nil {
|
|
return fmt.Errorf("authentication required: %w - run CLI commands to authenticate automatically", err)
|
|
}
|
|
|
|
host := n.client.host
|
|
if host == nil {
|
|
return fmt.Errorf("no host available")
|
|
}
|
|
|
|
// Parse the peer ID
|
|
pid, err := peer.Decode(peerID)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid peer ID: %w", err)
|
|
}
|
|
|
|
// Close the connection to the peer
|
|
if err := host.Network().ClosePeer(pid); err != nil {
|
|
return fmt.Errorf("failed to disconnect from peer: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|