network/pkg/client/client.go
anonpenguin 076edf4208 Fix code style and indentation
Here's the commit message:

``` Fix code style and indentation

Apply consistent indentation, fix whitespace and tabs vs spaces issues,
remove trailing whitespace, and ensure proper line endings throughout
the codebase. Also add comments and improve code organization. ```

The message body is included since this is a bigger cleanup effort that
touched multiple files and made various formatting improvements that are
worth explaining.
2025-08-20 11:27:08 +03:00

440 lines
12 KiB
Go

package client
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/security/noise"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
libp2ppubsub "github.com/libp2p/go-libp2p-pubsub"
"git.debros.io/DeBros/network/pkg/anyoneproxy"
"git.debros.io/DeBros/network/pkg/pubsub"
"git.debros.io/DeBros/network/pkg/storage"
)
// Client implements the NetworkClient interface
type Client struct {
config *ClientConfig
// Network components
host host.Host
libp2pPS *libp2ppubsub.PubSub
logger *zap.Logger
// Components
database *DatabaseClientImpl
storage *StorageClientImpl
network *NetworkInfoImpl
pubsub *pubSubBridge
// State
connected bool
startTime time.Time
mu sync.RWMutex
// resolvedNamespace is the namespace derived from JWT/APIKey.
resolvedNamespace string
}
// NewClient creates a new network client
func NewClient(config *ClientConfig) (NetworkClient, error) {
if config == nil {
return nil, fmt.Errorf("config cannot be nil")
}
if config.AppName == "" {
return nil, fmt.Errorf("app name is required")
}
// Create zap logger via helper for consistency
logger, err := newClientLogger(config.QuietMode)
if err != nil {
return nil, fmt.Errorf("failed to create logger: %w", err)
}
client := &Client{
config: config,
logger: logger,
startTime: time.Now(),
}
// Initialize components (will be configured when connected)
client.database = &DatabaseClientImpl{client: client}
client.network = &NetworkInfoImpl{client: client}
return client, nil
}
// Database returns the database client
func (c *Client) Database() DatabaseClient {
return c.database
}
// Storage returns the storage client
func (c *Client) Storage() StorageClient {
return c.storage
}
// PubSub returns the pub/sub client
func (c *Client) PubSub() PubSubClient {
return c.pubsub
}
// Network returns the network info client
func (c *Client) Network() NetworkInfo {
return c.network
}
// Config returns a snapshot copy of the client's configuration
func (c *Client) Config() *ClientConfig {
c.mu.RLock()
defer c.mu.RUnlock()
if c.config == nil {
return nil
}
cp := *c.config
if c.config.BootstrapPeers != nil {
cp.BootstrapPeers = append([]string(nil), c.config.BootstrapPeers...)
}
if c.config.DatabaseEndpoints != nil {
cp.DatabaseEndpoints = append([]string(nil), c.config.DatabaseEndpoints...)
}
return &cp
}
// Connect establishes connection to the network
func (c *Client) Connect() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.connected {
return nil
}
// Derive and set namespace from provided credentials
ns, err := c.deriveNamespace()
if err != nil {
return fmt.Errorf("failed to derive namespace: %w", err)
}
c.resolvedNamespace = ns
// Create LibP2P host with optional Anyone proxy for TCP and optional QUIC disable
var opts []libp2p.Option
opts = append(opts,
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"), // Random port
libp2p.Security(noise.ID, noise.New),
libp2p.DefaultMuxers,
)
if anyoneproxy.Enabled() {
opts = append(opts, libp2p.Transport(tcp.NewTCPTransport, tcp.WithDialerForAddr(anyoneproxy.DialerForAddr())))
} else {
opts = append(opts, libp2p.Transport(tcp.NewTCPTransport))
}
// Enable QUIC only when not proxying. When proxy is enabled, prefer TCP via SOCKS5.
if !anyoneproxy.Enabled() {
opts = append(opts, libp2p.Transport(libp2pquic.NewTransport))
}
h, err := libp2p.New(opts...)
if err != nil {
return fmt.Errorf("failed to create libp2p host: %w", err)
}
c.host = h
// Log host identity and listen addresses
addrs := c.host.Addrs()
addrStrs := make([]string, 0, len(addrs))
for _, a := range addrs {
addrStrs = append(addrStrs, a.String())
}
c.logger.Info("LibP2P host created",
zap.String("peer_id", c.host.ID().String()),
zap.Int("listen_addr_count", len(addrStrs)),
zap.Strings("listen_addrs", addrStrs),
)
c.logger.Info("Creating GossipSub...")
// Create LibP2P GossipSub with PeerExchange enabled (gossip-based peer exchange).
// Peer exchange helps propagate peer addresses via pubsub gossip and is enabled
// globally so discovery works without Anchat-specific branches.
var ps *libp2ppubsub.PubSub
ps, err = libp2ppubsub.NewGossipSub(context.Background(), h,
libp2ppubsub.WithPeerExchange(true),
)
if err != nil {
h.Close()
return fmt.Errorf("failed to create pubsub: %w", err)
}
c.libp2pPS = ps
c.logger.Info("GossipSub created successfully")
c.logger.Info("Creating pubsub bridge...")
c.logger.Info("Getting app namespace for pubsub...")
// Access namespace directly to avoid deadlock (we already hold c.mu.Lock())
var namespace string
if c.resolvedNamespace != "" {
namespace = c.resolvedNamespace
} else {
namespace = c.config.AppName
}
c.logger.Info("App namespace retrieved", zap.String("namespace", namespace))
c.logger.Info("Calling pubsub.NewClientAdapter...")
adapter := pubsub.NewClientAdapter(c.libp2pPS, namespace)
c.logger.Info("pubsub.NewClientAdapter completed successfully")
c.logger.Info("Creating pubSubBridge...")
c.pubsub = &pubSubBridge{client: c, adapter: adapter}
c.logger.Info("Pubsub bridge created successfully")
c.logger.Info("Creating storage client...")
// Create storage client with the host (use namespace directly to avoid deadlock)
storageClient := storage.NewClient(h, namespace, c.logger)
c.storage = &StorageClientImpl{
client: c,
storageClient: storageClient,
}
c.logger.Info("Storage client created successfully")
c.logger.Info("Starting bootstrap peer connections...")
// Connect to bootstrap peers FIRST
ctx, cancel := context.WithTimeout(context.Background(), c.config.ConnectTimeout)
defer cancel()
bootstrapPeersConnected := 0
for _, bootstrapAddr := range c.config.BootstrapPeers {
c.logger.Info("Attempting to connect to bootstrap peer", zap.String("addr", bootstrapAddr))
if err := c.connectToBootstrap(ctx, bootstrapAddr); err != nil {
c.logger.Warn("Failed to connect to bootstrap peer",
zap.String("addr", bootstrapAddr),
zap.Error(err))
continue
}
bootstrapPeersConnected++
c.logger.Info("Successfully connected to bootstrap peer", zap.String("addr", bootstrapAddr))
}
if bootstrapPeersConnected == 0 {
c.logger.Warn("No bootstrap peers connected, continuing anyway")
} else {
c.logger.Info("Bootstrap peer connections completed", zap.Int("connected_count", bootstrapPeersConnected))
}
c.logger.Info("Adding bootstrap peers to peerstore...")
// Add bootstrap peers to peerstore so we can connect to them later
for _, bootstrapAddr := range c.config.BootstrapPeers {
if ma, err := multiaddr.NewMultiaddr(bootstrapAddr); err == nil {
if peerInfo, err := peer.AddrInfoFromP2pAddr(ma); err == nil {
c.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour*24)
c.logger.Debug("Added bootstrap peer to peerstore",
zap.String("peer", peerInfo.ID.String()))
}
}
}
c.logger.Info("Bootstrap peers added to peerstore")
c.logger.Info("Starting connection monitoring...")
// Client is a lightweight P2P participant - no discovery needed
// We only connect to known bootstrap peers and let nodes handle discovery
c.logger.Debug("Client configured as lightweight P2P participant (no discovery)")
// Start minimal connection monitoring
c.startConnectionMonitoring()
c.logger.Info("Connection monitoring started")
c.logger.Info("Setting connected state...")
c.connected = true
c.logger.Info("Connected state set to true")
c.logger.Info("Client connected", zap.String("namespace", namespace))
return nil
}
// Disconnect closes the connection to the network
func (c *Client) Disconnect() error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.connected {
return nil
}
// Close pubsub adapter
if c.pubsub != nil && c.pubsub.adapter != nil {
if err := c.pubsub.adapter.Close(); err != nil {
c.logger.Error("Failed to close pubsub adapter", zap.Error(err))
}
c.pubsub = nil
}
// Close LibP2P host
if c.host != nil {
if err := c.host.Close(); err != nil {
c.logger.Error("Failed to close host", zap.Error(err))
}
}
c.connected = false
c.logger.Info("Client disconnected")
return nil
}
// Health returns the current health status
func (c *Client) Health() (*HealthStatus, error) {
c.mu.RLock()
defer c.mu.RUnlock()
status := "healthy"
if !c.connected {
status = "unhealthy"
}
checks := map[string]string{
"connection": "ok",
"database": "ok",
"storage": "ok",
"pubsub": "ok",
}
if !c.connected {
checks["connection"] = "disconnected"
}
return &HealthStatus{
Status: status,
Checks: checks,
LastUpdated: time.Now(),
ResponseTime: time.Millisecond * 10, // Simulated
}, nil
}
// isConnected checks if the client is connected
func (c *Client) isConnected() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.connected
}
// getAppNamespace returns the namespace for this app
func (c *Client) getAppNamespace() string {
c.mu.RLock()
defer c.mu.RUnlock()
if c.resolvedNamespace != "" {
return c.resolvedNamespace
}
return c.config.AppName
}
// requireAccess enforces that credentials are present and that any context-based namespace overrides match
func (c *Client) requireAccess(ctx context.Context) error {
// Allow internal system operations to bypass authentication
if IsInternalContext(ctx) {
return nil
}
cfg := c.Config()
if cfg == nil || (strings.TrimSpace(cfg.APIKey) == "" && strings.TrimSpace(cfg.JWT) == "") {
return fmt.Errorf("access denied: API key or JWT required")
}
ns := c.getAppNamespace()
if v := ctx.Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" && s != ns {
return fmt.Errorf("access denied: namespace mismatch")
}
}
if v := ctx.Value(pubsub.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" && s != ns {
return fmt.Errorf("access denied: namespace mismatch")
}
}
return nil
}
// deriveNamespace determines the namespace from JWT or API key.
func (c *Client) deriveNamespace() (string, error) {
// Prefer JWT claim {"Namespace": "..."}
if strings.TrimSpace(c.config.JWT) != "" {
ns, err := parseJWTNamespace(c.config.JWT)
if err != nil {
return "", err
}
if ns != "" {
return ns, nil
}
}
// Fallback to API key format ak_<random>:<namespace>
if strings.TrimSpace(c.config.APIKey) != "" {
ns, err := parseAPIKeyNamespace(c.config.APIKey)
if err != nil {
return "", err
}
if ns != "" {
return ns, nil
}
}
return c.config.AppName, nil
}
// parseJWTNamespace decodes base64url payload to extract Namespace claim (no signature verification)
func parseJWTNamespace(token string) (string, error) {
parts := strings.Split(token, ".")
if len(parts) < 2 {
return "", fmt.Errorf("invalid JWT format")
}
payload := parts[1]
// Decode base64url (raw, no padding)
data, err := base64.RawURLEncoding.DecodeString(payload)
if err != nil {
return "", fmt.Errorf("failed to decode JWT payload: %w", err)
}
// Minimal JSON struct
var claims struct {
Namespace string `json:"Namespace"`
}
if err := json.Unmarshal(data, &claims); err != nil {
return "", fmt.Errorf("failed to parse JWT claims: %w", err)
}
return strings.TrimSpace(claims.Namespace), nil
}
// parseAPIKeyNamespace extracts the namespace from ak_<random>:<namespace>
func parseAPIKeyNamespace(key string) (string, error) {
key = strings.TrimSpace(key)
if key == "" {
return "", fmt.Errorf("invalid API key: empty")
}
// Allow but ignore prefix ak_
parts := strings.Split(key, ":")
if len(parts) != 2 {
return "", fmt.Errorf("invalid API key format: expected ak_<random>:<namespace>")
}
ns := strings.TrimSpace(parts[1])
if ns == "" {
return "", fmt.Errorf("invalid API key: empty namespace")
}
return ns, nil
}