mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 06:19:08 +00:00
The above changes introduce node namespacing and various monitoring improvements. Let's look at the key changes: - Add node_namespace config field for partitioning node identifiers - Initialize pubsub with peer exchange enabled - Reduce client monitoring interval to 30s - Add metric announcement logging I would write this commit message as: Add node namespace and improve monitoring - Add node_namespace config for partitioning identifiers - Enable pubsub peer exchange - Adjust monitoring intervals and add logging - Initialize pubsub with namespace support The subject line captures the two main themes (namespacing and monitoring), while the body provides helpful details about the specific changes made.
160 lines
4.2 KiB
Go
160 lines
4.2 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/mackerelio/go-osstat/cpu"
|
|
"github.com/mackerelio/go-osstat/memory"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func logPeerStatus(n *Node, currentPeerCount int, lastPeerCount int, firstCheck bool) (int, bool) {
|
|
if firstCheck || currentPeerCount != lastPeerCount {
|
|
if currentPeerCount == 0 {
|
|
n.logger.Warn("Node has no connected peers",
|
|
zap.String("node_id", n.host.ID().String()))
|
|
} else if currentPeerCount < lastPeerCount {
|
|
n.logger.Info("Node lost peers",
|
|
zap.Int("current_peers", currentPeerCount),
|
|
zap.Int("previous_peers", lastPeerCount))
|
|
} else if currentPeerCount > lastPeerCount && !firstCheck {
|
|
n.logger.Debug("Node gained peers",
|
|
zap.Int("current_peers", currentPeerCount),
|
|
zap.Int("previous_peers", lastPeerCount))
|
|
}
|
|
|
|
lastPeerCount = currentPeerCount
|
|
firstCheck = false
|
|
}
|
|
return lastPeerCount, firstCheck
|
|
}
|
|
|
|
func logDetailedPeerInfo(n *Node, currentPeerCount int, peers []peer.ID) {
|
|
if time.Now().Unix()%300 == 0 && currentPeerCount > 0 {
|
|
peerIDs := make([]string, 0, currentPeerCount)
|
|
for _, p := range peers {
|
|
peerIDs = append(peerIDs, p.String())
|
|
}
|
|
n.logger.Debug("Node peer status",
|
|
zap.Int("peer_count", currentPeerCount),
|
|
zap.Strings("peer_ids", peerIDs))
|
|
}
|
|
}
|
|
|
|
func GetCPUUsagePercent(n *Node, interval time.Duration) (uint64, error) {
|
|
before, err := cpu.Get()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
time.Sleep(interval)
|
|
after, err := cpu.Get()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
idle := float64(after.Idle - before.Idle)
|
|
total := float64(after.Total - before.Total)
|
|
if total == 0 {
|
|
return 0, errors.New("Failed to get CPU usage")
|
|
}
|
|
usagePercent := (1.0 - idle/total) * 100.0
|
|
return uint64(usagePercent), nil
|
|
}
|
|
|
|
func logSystemUsage(n *Node) (*memory.Stats, uint64) {
|
|
mem, _ := memory.Get()
|
|
|
|
totalCpu, err := GetCPUUsagePercent(n, 3*time.Second)
|
|
if err != nil {
|
|
n.logger.Error("Failed to get CPU usage", zap.Error(err))
|
|
return mem, 0
|
|
}
|
|
|
|
n.logger.Debug("Node CPU usage",
|
|
zap.Float64("cpu_usage", float64(totalCpu)),
|
|
zap.Float64("memory_usage_percent", float64(mem.Used)/float64(mem.Total)*100))
|
|
|
|
return mem, totalCpu
|
|
}
|
|
|
|
func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory.Stats) error {
|
|
if n.pubsub == nil {
|
|
return nil
|
|
}
|
|
|
|
peerIDs := make([]string, 0, len(peers))
|
|
for _, p := range peers {
|
|
peerIDs = append(peerIDs, p.String())
|
|
}
|
|
|
|
msg := struct {
|
|
PeerID string `json:"peer_id"`
|
|
PeerCount int `json:"peer_count"`
|
|
PeerIDs []string `json:"peer_ids,omitempty"`
|
|
CPU uint64 `json:"cpu_usage"`
|
|
Memory uint64 `json:"memory_usage"`
|
|
Timestamp int64 `json:"timestamp"`
|
|
}{
|
|
PeerID: n.host.ID().String(),
|
|
PeerCount: len(peers),
|
|
PeerIDs: peerIDs,
|
|
CPU: cpuUsage,
|
|
Memory: memUsage.Used,
|
|
Timestamp: time.Now().Unix(),
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx := context.Background()
|
|
if err := n.pubsub.Publish(ctx, "monitoring", data); err != nil {
|
|
return err
|
|
}
|
|
n.logger.Info("Announced metrics", zap.String("topic", "monitoring"))
|
|
|
|
return nil
|
|
}
|
|
|
|
// startConnectionMonitoring starts minimal connection monitoring for the lightweight client.
|
|
// Unlike nodes which need extensive monitoring, clients only need basic health checks.
|
|
func (n *Node) startConnectionMonitoring() {
|
|
go func() {
|
|
ticker := time.NewTicker(30 * time.Second) // Less frequent than nodes (60s vs 30s)
|
|
defer ticker.Stop()
|
|
|
|
var lastPeerCount int
|
|
firstCheck := true
|
|
|
|
for range ticker.C {
|
|
if n.host == nil {
|
|
return
|
|
}
|
|
|
|
// Get current peer count
|
|
peers := n.host.Network().Peers()
|
|
currentPeerCount := len(peers)
|
|
|
|
// Only log if peer count changed or on first check
|
|
lastPeerCount, firstCheck = logPeerStatus(n, currentPeerCount, lastPeerCount, firstCheck)
|
|
|
|
// Log detailed peer info at debug level occasionally (every 5 minutes)
|
|
logDetailedPeerInfo(n, currentPeerCount, peers)
|
|
|
|
// Log system usage
|
|
mem, cpuUsage := logSystemUsage(n)
|
|
|
|
// Announce metrics
|
|
if err := announceMetrics(n, peers, cpuUsage, mem); err != nil {
|
|
n.logger.Error("Failed to announce metrics", zap.Error(err))
|
|
}
|
|
}
|
|
}()
|
|
|
|
n.logger.Debug("Lightweight connection monitoring started")
|
|
}
|