From 2acf969e61eb0db9f72826be2d1abcfff4c4bced Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Sat, 13 Sep 2025 11:29:16 +0300 Subject: [PATCH] Add node namespace and improve monitoring The above changes introduce node namespacing and various monitoring improvements. Let's look at the key changes: - Add node_namespace config field for partitioning node identifiers - Initialize pubsub with peer exchange enabled - Reduce client monitoring interval to 30s - Add metric announcement logging I would write this commit message as: Add node namespace and improve monitoring - Add node_namespace config for partitioning identifiers - Enable pubsub peer exchange - Adjust monitoring intervals and add logging - Initialize pubsub with namespace support The subject line captures the two main themes (namespacing and monitoring), while the body provides helpful details about the specific changes made. --- .zed/debug.json | 8 ++++++++ Makefile | 2 +- pkg/config/config.go | 2 ++ pkg/node/monitoring.go | 3 ++- pkg/node/node.go | 13 +++++++++++++ 5 files changed, 26 insertions(+), 2 deletions(-) diff --git a/.zed/debug.json b/.zed/debug.json index 93376b0..6f50aaf 100644 --- a/.zed/debug.json +++ b/.zed/debug.json @@ -56,5 +56,13 @@ "mode": "debug", "program": "./cmd/cli", "args": ["pubsub", "subscribe", "monitoring"] + }, + { + "adapter": "Delve", + "label": "Node Go (Delve)", + "request": "launch", + "mode": "debug", + "program": "./cmd/node", + "args": ["--config", "configs/node.yaml"] } ] diff --git a/Makefile b/Makefile index 360c233..d39bfe0 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.42.1-beta +VERSION := 0.42.2-beta COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/config/config.go b/pkg/config/config.go index 332f2bd..1ccf458 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -46,6 +46,7 @@ type DiscoveryConfig struct { BootstrapPort int `yaml:"bootstrap_port"` // Default port for bootstrap nodes HttpAdvAddress string `yaml:"http_adv_address"` // HTTP advertisement address RaftAdvAddress string `yaml:"raft_adv_address"` // Raft advertisement + NodeNamespace string `yaml:"node_namespace"` // Namespace for node identifiers } // SecurityConfig contains security-related configuration @@ -115,6 +116,7 @@ func DefaultConfig() *Config { DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing HttpAdvAddress: "", RaftAdvAddress: "", + NodeNamespace: "default", }, Security: SecurityConfig{ EnableTLS: false, diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go index 602fa72..0e23d63 100644 --- a/pkg/node/monitoring.go +++ b/pkg/node/monitoring.go @@ -115,6 +115,7 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory if err := n.pubsub.Publish(ctx, "monitoring", data); err != nil { return err } + n.logger.Info("Announced metrics", zap.String("topic", "monitoring")) return nil } @@ -123,7 +124,7 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory // Unlike nodes which need extensive monitoring, clients only need basic health checks. func (n *Node) startConnectionMonitoring() { go func() { - ticker := time.NewTicker(60 * time.Second) // Less frequent than nodes (60s vs 30s) + ticker := time.NewTicker(30 * time.Second) // Less frequent than nodes (60s vs 30s) defer ticker.Stop() var lastPeerCount int diff --git a/pkg/node/node.go b/pkg/node/node.go index 4af952e..10fa933 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -11,6 +11,7 @@ import ( "time" "github.com/libp2p/go-libp2p" + libp2ppubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -258,6 +259,18 @@ func (n *Node) startLibP2P() error { n.host = h + // Initialize pubsub + ps, err := libp2ppubsub.NewGossipSub(context.Background(), h, + libp2ppubsub.WithPeerExchange(true), + ) + if err != nil { + return fmt.Errorf("failed to create pubsub: %w", err) + } + + // Create pubsub adapter with "node" namespace + n.pubsub = pubsub.NewClientAdapter(ps, n.config.Discovery.NodeNamespace) + n.logger.Info("Initialized pubsub adapter on namespace", zap.String("namespace", n.config.Discovery.NodeNamespace)) + // Log configured bootstrap peers if len(n.config.Discovery.BootstrapPeers) > 0 { n.logger.ComponentInfo(logging.ComponentNode, "Configured bootstrap peers",