Bump version to 0.42.0-beta and add node monitoring

Add detailed connection and system resource monitoring for nodes using a
new node monitoring package. Remove previous client-only monitoring.
Update openapi specs formatting and add new OS stat dependency.
This commit is contained in:
anonpenguin 2025-09-13 07:35:48 +03:00
parent 44ab3eb66d
commit 6b2512a983
8 changed files with 250 additions and 124 deletions

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.41.0-beta
VERSION := 0.42.0-beta
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

1
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/libp2p/go-libp2p v0.41.1
github.com/libp2p/go-libp2p-pubsub v0.14.2
github.com/mackerelio/go-osstat v0.2.6
github.com/multiformats/go-multiaddr v0.15.0
github.com/rqlite/gorqlite v0.0.0-20250609141355-ac86a4a1c9a8
go.uber.org/zap v1.27.0

2
go.sum
View File

@ -157,6 +157,8 @@ github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8S
github.com/libp2p/go-yamux/v5 v5.0.0 h1:2djUh96d3Jiac/JpGkKs4TO49YhsfLopAoryfPmf+Po=
github.com/libp2p/go-yamux/v5 v5.0.0/go.mod h1:en+3cdX51U0ZslwRdRLrvQsdayFt3TSUKvBGErzpWbU=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/mackerelio/go-osstat v0.2.6 h1:gs4U8BZeS1tjrL08tt5VUliVvSWP26Ai2Ob8Lr7f2i0=
github.com/mackerelio/go-osstat v0.2.6/go.mod h1:lRy8V9ZuHpuRVZh+vyTkODeDPl3/d5MgXHtLSaqG8bA=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=

View File

@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: DeBros Gateway API
version: 1.0.0
version: 0.40.0
description: REST API over the DeBros Network client for storage, database, and pubsub.
servers:
- url: http://localhost:8080
@ -79,7 +79,7 @@ paths:
get:
summary: Gateway health
responses:
'200': { description: OK }
"200": { description: OK }
/v1/storage/put:
post:
summary: Store a value by key
@ -96,10 +96,26 @@ paths:
type: string
format: binary
responses:
'201': { description: Created }
'400': { description: Bad Request, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
'401': { description: Unauthorized }
'500': { description: Error, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
"201": { description: Created }
"400":
{
description: Bad Request,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
"401": { description: Unauthorized }
"500":
{
description: Error,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
/v1/storage/get:
get:
summary: Get a value by key
@ -109,14 +125,22 @@ paths:
schema: { type: string }
required: true
responses:
'200':
"200":
description: OK
content:
application/octet-stream:
schema:
type: string
format: binary
'404': { description: Not Found, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
"404":
{
description: Not Found,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
/v1/storage/exists:
get:
summary: Check key existence
@ -126,7 +150,7 @@ paths:
schema: { type: string }
required: true
responses:
'200':
"200":
description: OK
content:
application/json:
@ -143,7 +167,7 @@ paths:
name: prefix
schema: { type: string }
responses:
'200':
"200":
description: OK
content:
application/json:
@ -167,7 +191,7 @@ paths:
properties:
key: { type: string }
responses:
'200': { description: OK }
"200": { description: OK }
/v1/db/create-table:
post:
summary: Create tables via SQL DDL
@ -175,11 +199,27 @@ paths:
required: true
content:
application/json:
schema: { $ref: '#/components/schemas/CreateTableRequest' }
schema: { $ref: "#/components/schemas/CreateTableRequest" }
responses:
'201': { description: Created }
'400': { description: Bad Request, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
'500': { description: Error, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
"201": { description: Created }
"400":
{
description: Bad Request,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
"500":
{
description: Error,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
/v1/db/drop-table:
post:
summary: Drop a table
@ -187,9 +227,9 @@ paths:
required: true
content:
application/json:
schema: { $ref: '#/components/schemas/DropTableRequest' }
schema: { $ref: "#/components/schemas/DropTableRequest" }
responses:
'200': { description: OK }
"200": { description: OK }
/v1/db/query:
post:
summary: Execute a single SQL query
@ -197,15 +237,31 @@ paths:
required: true
content:
application/json:
schema: { $ref: '#/components/schemas/QueryRequest' }
schema: { $ref: "#/components/schemas/QueryRequest" }
responses:
'200':
"200":
description: OK
content:
application/json:
schema: { $ref: '#/components/schemas/QueryResponse' }
'400': { description: Bad Request, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
'500': { description: Error, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
schema: { $ref: "#/components/schemas/QueryResponse" }
"400":
{
description: Bad Request,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
"500":
{
description: Error,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
/v1/db/transaction:
post:
summary: Execute multiple SQL statements atomically
@ -213,16 +269,32 @@ paths:
required: true
content:
application/json:
schema: { $ref: '#/components/schemas/TransactionRequest' }
schema: { $ref: "#/components/schemas/TransactionRequest" }
responses:
'200': { description: OK }
'400': { description: Bad Request, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
'500': { description: Error, content: { application/json: { schema: { $ref: '#/components/schemas/Error' } } } }
"200": { description: OK }
"400":
{
description: Bad Request,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
"500":
{
description: Error,
content:
{
application/json:
{ schema: { $ref: "#/components/schemas/Error" } },
},
}
/v1/db/schema:
get:
summary: Get current database schema
responses:
'200': { description: OK }
"200": { description: OK }
/v1/pubsub/publish:
post:
summary: Publish to a topic
@ -237,13 +309,13 @@ paths:
topic: { type: string }
data_base64: { type: string }
responses:
'200': { description: OK }
"200": { description: OK }
/v1/pubsub/topics:
get:
summary: List topics in caller namespace
responses:
'200':
"200":
description: OK
content:
application/json:
schema: { $ref: '#/components/schemas/TopicsResponse' }
schema: { $ref: "#/components/schemas/TopicsResponse" }

View File

@ -241,7 +241,6 @@ func (c *Client) Connect() error {
c.logger.Debug("Client configured as lightweight P2P participant (no discovery)")
// Start minimal connection monitoring
c.startConnectionMonitoring()
c.logger.Info("Connection monitoring started")
c.logger.Info("Setting connected state...")

View File

@ -1,66 +0,0 @@
package client
import (
"time"
"go.uber.org/zap"
)
// startConnectionMonitoring starts minimal connection monitoring for the lightweight client.
// Unlike nodes which need extensive monitoring, clients only need basic health checks.
func (c *Client) startConnectionMonitoring() {
go func() {
ticker := time.NewTicker(60 * time.Second) // Less frequent than nodes (60s vs 30s)
defer ticker.Stop()
var lastPeerCount int
firstCheck := true
for range ticker.C {
if !c.isConnected() {
c.logger.Debug("Connection monitoring stopped: client disconnected")
return
}
if c.host == nil {
return
}
// Get current peer count
peers := c.host.Network().Peers()
currentPeerCount := len(peers)
// Only log if peer count changed or on first check
if firstCheck || currentPeerCount != lastPeerCount {
if currentPeerCount == 0 {
c.logger.Warn("Client has no connected peers",
zap.String("client_id", c.host.ID().String()))
} else if currentPeerCount < lastPeerCount {
c.logger.Info("Client lost peers",
zap.Int("current_peers", currentPeerCount),
zap.Int("previous_peers", lastPeerCount))
} else if currentPeerCount > lastPeerCount && !firstCheck {
c.logger.Debug("Client gained peers",
zap.Int("current_peers", currentPeerCount),
zap.Int("previous_peers", lastPeerCount))
}
lastPeerCount = currentPeerCount
firstCheck = false
}
// Log detailed peer info at debug level occasionally (every 5 minutes)
if time.Now().Unix()%300 == 0 && currentPeerCount > 0 {
peerIDs := make([]string, 0, currentPeerCount)
for _, p := range peers {
peerIDs = append(peerIDs, p.String())
}
c.logger.Debug("Client peer status",
zap.Int("peer_count", currentPeerCount),
zap.Strings("peer_ids", peerIDs))
}
}
}()
c.logger.Debug("Lightweight connection monitoring started")
}

136
pkg/node/monitoring.go Normal file
View File

@ -0,0 +1,136 @@
package node
import (
"context"
"encoding/json"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mackerelio/go-osstat/cpu"
"github.com/mackerelio/go-osstat/memory"
"go.uber.org/zap"
)
func logPeerStatus(n *Node, currentPeerCount int, lastPeerCount int, firstCheck bool) (int, bool) {
if firstCheck || currentPeerCount != lastPeerCount {
if currentPeerCount == 0 {
n.logger.Warn("Node has no connected peers",
zap.String("node_id", n.host.ID().String()))
} else if currentPeerCount < lastPeerCount {
n.logger.Info("Node lost peers",
zap.Int("current_peers", currentPeerCount),
zap.Int("previous_peers", lastPeerCount))
} else if currentPeerCount > lastPeerCount && !firstCheck {
n.logger.Debug("Node gained peers",
zap.Int("current_peers", currentPeerCount),
zap.Int("previous_peers", lastPeerCount))
}
lastPeerCount = currentPeerCount
firstCheck = false
}
return lastPeerCount, firstCheck
}
func logDetailedPeerInfo(n *Node, currentPeerCount int, peers []peer.ID) {
if time.Now().Unix()%300 == 0 && currentPeerCount > 0 {
peerIDs := make([]string, 0, currentPeerCount)
for _, p := range peers {
peerIDs = append(peerIDs, p.String())
}
n.logger.Debug("Node peer status",
zap.Int("peer_count", currentPeerCount),
zap.Strings("peer_ids", peerIDs))
}
}
func logSystemUsage(n *Node) (*memory.Stats, uint64) {
mem, _ := memory.Get()
cpuBefore, _ := cpu.Get()
time.Sleep(3 * time.Second)
cpuAfter, _ := cpu.Get()
totalCpu := cpuAfter.Total - cpuBefore.Total
n.logger.Debug("Node CPU usage",
zap.Float64("cpu_usage", float64(totalCpu)),
zap.Float64("memory_usage", float64(mem.Used)))
return mem, totalCpu
}
func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory.Stats) error {
if n.pubsub == nil {
return nil
}
peerIDs := make([]string, 0, len(peers))
for _, p := range peers {
peerIDs = append(peerIDs, p.String())
}
msg := struct {
PeerID string `json:"peer_id"`
PeerCount int `json:"peer_count"`
PeerIDs []string `json:"peer_ids,omitempty"`
CPU uint64 `json:"cpu_usage"`
Memory uint64 `json:"memory_usage"`
Timestamp int64 `json:"timestamp"`
}{
PeerID: n.host.ID().String(),
PeerCount: len(peers),
PeerIDs: peerIDs,
CPU: cpuUsage,
Memory: memUsage.Used,
Timestamp: time.Now().Unix(),
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
ctx := context.Background()
if err := n.pubsub.Publish(ctx, "monitoring", data); err != nil {
return err
}
return nil
}
// startConnectionMonitoring starts minimal connection monitoring for the lightweight client.
// Unlike nodes which need extensive monitoring, clients only need basic health checks.
func (n *Node) startConnectionMonitoring() {
go func() {
ticker := time.NewTicker(60 * time.Second) // Less frequent than nodes (60s vs 30s)
defer ticker.Stop()
var lastPeerCount int
firstCheck := true
for range ticker.C {
if n.host == nil {
return
}
// Get current peer count
peers := n.host.Network().Peers()
currentPeerCount := len(peers)
// Only log if peer count changed or on first check
lastPeerCount, firstCheck = logPeerStatus(n, currentPeerCount, lastPeerCount, firstCheck)
// Log detailed peer info at debug level occasionally (every 5 minutes)
logDetailedPeerInfo(n, currentPeerCount, peers)
// Log system usage
mem, cpuUsage := logSystemUsage(n)
// Announce metrics
if err := announceMetrics(n, peers, cpuUsage, mem); err != nil {
n.logger.Error("Failed to announce metrics", zap.Error(err))
}
}
}()
n.logger.Debug("Lightweight connection monitoring started")
}

View File

@ -25,6 +25,7 @@ import (
"git.debros.io/DeBros/network/pkg/config"
"git.debros.io/DeBros/network/pkg/database"
"git.debros.io/DeBros/network/pkg/logging"
"git.debros.io/DeBros/network/pkg/pubsub"
)
// Node represents a network node with RQLite database
@ -39,6 +40,9 @@ type Node struct {
// Peer discovery
discoveryCancel context.CancelFunc
bootstrapCancel context.CancelFunc
// PubSub
pubsub *pubsub.ClientAdapter
}
// NewNode creates a new network node
@ -367,7 +371,6 @@ func (n *Node) startLibP2P() error {
// Start peer discovery and monitoring
n.startPeerDiscovery()
n.startConnectionMonitoring()
n.logger.ComponentInfo(logging.ComponentLibP2P, "LibP2P host started",
zap.String("peer_id", h.ID().String()))
@ -566,29 +569,6 @@ func (n *Node) discoverViaPeerExchange(ctx context.Context) int {
return connected
}
// startConnectionMonitoring monitors connection health and logs status
func (n *Node) startConnectionMonitoring() {
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if n.host == nil {
return
}
connectedPeers := n.host.Network().Peers()
if len(connectedPeers) == 0 {
n.logger.Debug("Node has no connected peers - seeking connections",
zap.String("node_id", n.host.ID().String()))
}
}
}
}()
}
// stopPeerDiscovery stops peer discovery
func (n *Node) stopPeerDiscovery() {
if n.discoveryCancel != nil {
@ -658,5 +638,7 @@ func (n *Node) Start(ctx context.Context) error {
zap.Strings("listen_addrs", listenAddrs),
)
n.startConnectionMonitoring()
return nil
}