diff --git a/Makefile b/Makefile index 75f34eb..e39702d 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.41.0-beta +VERSION := 0.42.0-beta COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/go.mod b/go.mod index 18f309c..d2db8ed 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/libp2p/go-libp2p v0.41.1 github.com/libp2p/go-libp2p-pubsub v0.14.2 + github.com/mackerelio/go-osstat v0.2.6 github.com/multiformats/go-multiaddr v0.15.0 github.com/rqlite/gorqlite v0.0.0-20250609141355-ac86a4a1c9a8 go.uber.org/zap v1.27.0 diff --git a/go.sum b/go.sum index 0213ab0..33dd50c 100644 --- a/go.sum +++ b/go.sum @@ -157,6 +157,8 @@ github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8S github.com/libp2p/go-yamux/v5 v5.0.0 h1:2djUh96d3Jiac/JpGkKs4TO49YhsfLopAoryfPmf+Po= github.com/libp2p/go-yamux/v5 v5.0.0/go.mod h1:en+3cdX51U0ZslwRdRLrvQsdayFt3TSUKvBGErzpWbU= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= +github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0= +github.com/mackerelio/go-osstat v0.2.6/go.mod h1:lRy8V9ZuHpuRVZh+vyTkODeDPl3/d5MgXHtLSaqG8bA= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU= diff --git a/openapi/gateway.yaml b/openapi/gateway.yaml index 1e1cbc2..7ad1702 100644 --- a/openapi/gateway.yaml +++ b/openapi/gateway.yaml @@ -1,7 +1,7 @@ openapi: 3.0.3 info: title: DeBros Gateway API - version: 1.0.0 + version: 0.40.0 description: REST API over the DeBros Network client for storage, database, and pubsub. servers: - url: http://localhost:8080 @@ -79,7 +79,7 @@ paths: get: summary: Gateway health responses: - '200': { description: OK } + "200": { description: OK } /v1/storage/put: post: summary: Store a value by key @@ -96,10 +96,26 @@ paths: type: string format: binary responses: - '201': { description: Created } - '400': { description: Bad Request, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } - '401': { description: Unauthorized } - '500': { description: Error, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } + "201": { description: Created } + "400": + { + description: Bad Request, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } + "401": { description: Unauthorized } + "500": + { + description: Error, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } /v1/storage/get: get: summary: Get a value by key @@ -109,14 +125,22 @@ paths: schema: { type: string } required: true responses: - '200': + "200": description: OK content: application/octet-stream: schema: type: string format: binary - '404': { description: Not Found, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } + "404": + { + description: Not Found, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } /v1/storage/exists: get: summary: Check key existence @@ -126,7 +150,7 @@ paths: schema: { type: string } required: true responses: - '200': + "200": description: OK content: application/json: @@ -143,7 +167,7 @@ paths: name: prefix schema: { type: string } responses: - '200': + "200": description: OK content: application/json: @@ -167,7 +191,7 @@ paths: properties: key: { type: string } responses: - '200': { description: OK } + "200": { description: OK } /v1/db/create-table: post: summary: Create tables via SQL DDL @@ -175,11 +199,27 @@ paths: required: true content: application/json: - schema: { $ref: '#/components/schemas/CreateTableRequest' } + schema: { $ref: "#/components/schemas/CreateTableRequest" } responses: - '201': { description: Created } - '400': { description: Bad Request, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } - '500': { description: Error, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } + "201": { description: Created } + "400": + { + description: Bad Request, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } + "500": + { + description: Error, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } /v1/db/drop-table: post: summary: Drop a table @@ -187,9 +227,9 @@ paths: required: true content: application/json: - schema: { $ref: '#/components/schemas/DropTableRequest' } + schema: { $ref: "#/components/schemas/DropTableRequest" } responses: - '200': { description: OK } + "200": { description: OK } /v1/db/query: post: summary: Execute a single SQL query @@ -197,15 +237,31 @@ paths: required: true content: application/json: - schema: { $ref: '#/components/schemas/QueryRequest' } + schema: { $ref: "#/components/schemas/QueryRequest" } responses: - '200': + "200": description: OK content: application/json: - schema: { $ref: '#/components/schemas/QueryResponse' } - '400': { description: Bad Request, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } - '500': { description: Error, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } + schema: { $ref: "#/components/schemas/QueryResponse" } + "400": + { + description: Bad Request, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } + "500": + { + description: Error, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } /v1/db/transaction: post: summary: Execute multiple SQL statements atomically @@ -213,16 +269,32 @@ paths: required: true content: application/json: - schema: { $ref: '#/components/schemas/TransactionRequest' } + schema: { $ref: "#/components/schemas/TransactionRequest" } responses: - '200': { description: OK } - '400': { description: Bad Request, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } - '500': { description: Error, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } } + "200": { description: OK } + "400": + { + description: Bad Request, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } + "500": + { + description: Error, + content: + { + application/json: + { schema: { $ref: "#/components/schemas/Error" } }, + }, + } /v1/db/schema: get: summary: Get current database schema responses: - '200': { description: OK } + "200": { description: OK } /v1/pubsub/publish: post: summary: Publish to a topic @@ -237,13 +309,13 @@ paths: topic: { type: string } data_base64: { type: string } responses: - '200': { description: OK } + "200": { description: OK } /v1/pubsub/topics: get: summary: List topics in caller namespace responses: - '200': + "200": description: OK content: application/json: - schema: { $ref: '#/components/schemas/TopicsResponse' } + schema: { $ref: "#/components/schemas/TopicsResponse" } diff --git a/pkg/client/client.go b/pkg/client/client.go index 8623034..f7f0a85 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -241,7 +241,6 @@ func (c *Client) Connect() error { c.logger.Debug("Client configured as lightweight P2P participant (no discovery)") // Start minimal connection monitoring - c.startConnectionMonitoring() c.logger.Info("Connection monitoring started") c.logger.Info("Setting connected state...") diff --git a/pkg/client/monitoring.go b/pkg/client/monitoring.go deleted file mode 100644 index f59c4bb..0000000 --- a/pkg/client/monitoring.go +++ /dev/null @@ -1,66 +0,0 @@ -package client - -import ( - "time" - - "go.uber.org/zap" -) - -// startConnectionMonitoring starts minimal connection monitoring for the lightweight client. -// Unlike nodes which need extensive monitoring, clients only need basic health checks. -func (c *Client) startConnectionMonitoring() { - go func() { - ticker := time.NewTicker(60 * time.Second) // Less frequent than nodes (60s vs 30s) - defer ticker.Stop() - - var lastPeerCount int - firstCheck := true - - for range ticker.C { - if !c.isConnected() { - c.logger.Debug("Connection monitoring stopped: client disconnected") - return - } - - if c.host == nil { - return - } - - // Get current peer count - peers := c.host.Network().Peers() - currentPeerCount := len(peers) - - // Only log if peer count changed or on first check - if firstCheck || currentPeerCount != lastPeerCount { - if currentPeerCount == 0 { - c.logger.Warn("Client has no connected peers", - zap.String("client_id", c.host.ID().String())) - } else if currentPeerCount < lastPeerCount { - c.logger.Info("Client lost peers", - zap.Int("current_peers", currentPeerCount), - zap.Int("previous_peers", lastPeerCount)) - } else if currentPeerCount > lastPeerCount && !firstCheck { - c.logger.Debug("Client gained peers", - zap.Int("current_peers", currentPeerCount), - zap.Int("previous_peers", lastPeerCount)) - } - - lastPeerCount = currentPeerCount - firstCheck = false - } - - // Log detailed peer info at debug level occasionally (every 5 minutes) - if time.Now().Unix()%300 == 0 && currentPeerCount > 0 { - peerIDs := make([]string, 0, currentPeerCount) - for _, p := range peers { - peerIDs = append(peerIDs, p.String()) - } - c.logger.Debug("Client peer status", - zap.Int("peer_count", currentPeerCount), - zap.Strings("peer_ids", peerIDs)) - } - } - }() - - c.logger.Debug("Lightweight connection monitoring started") -} diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go new file mode 100644 index 0000000..4dcddea --- /dev/null +++ b/pkg/node/monitoring.go @@ -0,0 +1,136 @@ +package node + +import ( + "context" + "encoding/json" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/mackerelio/go-osstat/cpu" + "github.com/mackerelio/go-osstat/memory" + "go.uber.org/zap" +) + +func logPeerStatus(n *Node, currentPeerCount int, lastPeerCount int, firstCheck bool) (int, bool) { + if firstCheck || currentPeerCount != lastPeerCount { + if currentPeerCount == 0 { + n.logger.Warn("Node has no connected peers", + zap.String("node_id", n.host.ID().String())) + } else if currentPeerCount < lastPeerCount { + n.logger.Info("Node lost peers", + zap.Int("current_peers", currentPeerCount), + zap.Int("previous_peers", lastPeerCount)) + } else if currentPeerCount > lastPeerCount && !firstCheck { + n.logger.Debug("Node gained peers", + zap.Int("current_peers", currentPeerCount), + zap.Int("previous_peers", lastPeerCount)) + } + + lastPeerCount = currentPeerCount + firstCheck = false + } + return lastPeerCount, firstCheck +} + +func logDetailedPeerInfo(n *Node, currentPeerCount int, peers []peer.ID) { + if time.Now().Unix()%300 == 0 && currentPeerCount > 0 { + peerIDs := make([]string, 0, currentPeerCount) + for _, p := range peers { + peerIDs = append(peerIDs, p.String()) + } + n.logger.Debug("Node peer status", + zap.Int("peer_count", currentPeerCount), + zap.Strings("peer_ids", peerIDs)) + } +} + +func logSystemUsage(n *Node) (*memory.Stats, uint64) { + mem, _ := memory.Get() + cpuBefore, _ := cpu.Get() + time.Sleep(3 * time.Second) + cpuAfter, _ := cpu.Get() + totalCpu := cpuAfter.Total - cpuBefore.Total + + n.logger.Debug("Node CPU usage", + zap.Float64("cpu_usage", float64(totalCpu)), + zap.Float64("memory_usage", float64(mem.Used))) + + return mem, totalCpu +} + +func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory.Stats) error { + if n.pubsub == nil { + return nil + } + + peerIDs := make([]string, 0, len(peers)) + for _, p := range peers { + peerIDs = append(peerIDs, p.String()) + } + + msg := struct { + PeerID string `json:"peer_id"` + PeerCount int `json:"peer_count"` + PeerIDs []string `json:"peer_ids,omitempty"` + CPU uint64 `json:"cpu_usage"` + Memory uint64 `json:"memory_usage"` + Timestamp int64 `json:"timestamp"` + }{ + PeerID: n.host.ID().String(), + PeerCount: len(peers), + PeerIDs: peerIDs, + CPU: cpuUsage, + Memory: memUsage.Used, + Timestamp: time.Now().Unix(), + } + + data, err := json.Marshal(msg) + if err != nil { + return err + } + + ctx := context.Background() + if err := n.pubsub.Publish(ctx, "monitoring", data); err != nil { + return err + } + + return nil +} + +// startConnectionMonitoring starts minimal connection monitoring for the lightweight client. +// Unlike nodes which need extensive monitoring, clients only need basic health checks. +func (n *Node) startConnectionMonitoring() { + go func() { + ticker := time.NewTicker(60 * time.Second) // Less frequent than nodes (60s vs 30s) + defer ticker.Stop() + + var lastPeerCount int + firstCheck := true + + for range ticker.C { + if n.host == nil { + return + } + + // Get current peer count + peers := n.host.Network().Peers() + currentPeerCount := len(peers) + + // Only log if peer count changed or on first check + lastPeerCount, firstCheck = logPeerStatus(n, currentPeerCount, lastPeerCount, firstCheck) + + // Log detailed peer info at debug level occasionally (every 5 minutes) + logDetailedPeerInfo(n, currentPeerCount, peers) + + // Log system usage + mem, cpuUsage := logSystemUsage(n) + + // Announce metrics + if err := announceMetrics(n, peers, cpuUsage, mem); err != nil { + n.logger.Error("Failed to announce metrics", zap.Error(err)) + } + } + }() + + n.logger.Debug("Lightweight connection monitoring started") +} diff --git a/pkg/node/node.go b/pkg/node/node.go index 6868a16..4af952e 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -25,6 +25,7 @@ import ( "git.debros.io/DeBros/network/pkg/config" "git.debros.io/DeBros/network/pkg/database" "git.debros.io/DeBros/network/pkg/logging" + "git.debros.io/DeBros/network/pkg/pubsub" ) // Node represents a network node with RQLite database @@ -33,12 +34,15 @@ type Node struct { logger *logging.ColoredLogger host host.Host - rqliteManager *database.RQLiteManager - rqliteAdapter *database.RQLiteAdapter + rqliteManager *database.RQLiteManager + rqliteAdapter *database.RQLiteAdapter // Peer discovery discoveryCancel context.CancelFunc bootstrapCancel context.CancelFunc + + // PubSub + pubsub *pubsub.ClientAdapter } // NewNode creates a new network node @@ -367,7 +371,6 @@ func (n *Node) startLibP2P() error { // Start peer discovery and monitoring n.startPeerDiscovery() - n.startConnectionMonitoring() n.logger.ComponentInfo(logging.ComponentLibP2P, "LibP2P host started", zap.String("peer_id", h.ID().String())) @@ -566,29 +569,6 @@ func (n *Node) discoverViaPeerExchange(ctx context.Context) int { return connected } -// startConnectionMonitoring monitors connection health and logs status -func (n *Node) startConnectionMonitoring() { - go func() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if n.host == nil { - return - } - - connectedPeers := n.host.Network().Peers() - if len(connectedPeers) == 0 { - n.logger.Debug("Node has no connected peers - seeking connections", - zap.String("node_id", n.host.ID().String())) - } - } - } - }() -} - // stopPeerDiscovery stops peer discovery func (n *Node) stopPeerDiscovery() { if n.discoveryCancel != nil { @@ -658,5 +638,7 @@ func (n *Node) Start(ctx context.Context) error { zap.Strings("listen_addrs", listenAddrs), ) + n.startConnectionMonitoring() + return nil }