diff --git a/.zed/debug.json b/.zed/debug.json index 93376b0..6f50aaf 100644 --- a/.zed/debug.json +++ b/.zed/debug.json @@ -56,5 +56,13 @@ "mode": "debug", "program": "./cmd/cli", "args": ["pubsub", "subscribe", "monitoring"] + }, + { + "adapter": "Delve", + "label": "Node Go (Delve)", + "request": "launch", + "mode": "debug", + "program": "./cmd/node", + "args": ["--config", "configs/node.yaml"] } ] diff --git a/Makefile b/Makefile index 360c233..d39bfe0 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.42.1-beta +VERSION := 0.42.2-beta COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/config/config.go b/pkg/config/config.go index 332f2bd..1ccf458 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -46,6 +46,7 @@ type DiscoveryConfig struct { BootstrapPort int `yaml:"bootstrap_port"` // Default port for bootstrap nodes HttpAdvAddress string `yaml:"http_adv_address"` // HTTP advertisement address RaftAdvAddress string `yaml:"raft_adv_address"` // Raft advertisement + NodeNamespace string `yaml:"node_namespace"` // Namespace for node identifiers } // SecurityConfig contains security-related configuration @@ -115,6 +116,7 @@ func DefaultConfig() *Config { DiscoveryInterval: time.Second * 15, // Back to 15 seconds for testing HttpAdvAddress: "", RaftAdvAddress: "", + NodeNamespace: "default", }, Security: SecurityConfig{ EnableTLS: false, diff --git a/pkg/node/monitoring.go b/pkg/node/monitoring.go index 602fa72..0e23d63 100644 --- a/pkg/node/monitoring.go +++ b/pkg/node/monitoring.go @@ -115,6 +115,7 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory if err := n.pubsub.Publish(ctx, "monitoring", data); err != nil { return err } + n.logger.Info("Announced metrics", zap.String("topic", "monitoring")) return nil } @@ -123,7 +124,7 @@ func announceMetrics(n *Node, peers []peer.ID, cpuUsage uint64, memUsage *memory // Unlike nodes which need extensive monitoring, clients only need basic health checks. func (n *Node) startConnectionMonitoring() { go func() { - ticker := time.NewTicker(60 * time.Second) // Less frequent than nodes (60s vs 30s) + ticker := time.NewTicker(30 * time.Second) // Less frequent than nodes (60s vs 30s) defer ticker.Stop() var lastPeerCount int diff --git a/pkg/node/node.go b/pkg/node/node.go index 4af952e..10fa933 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -11,6 +11,7 @@ import ( "time" "github.com/libp2p/go-libp2p" + libp2ppubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -258,6 +259,18 @@ func (n *Node) startLibP2P() error { n.host = h + // Initialize pubsub + ps, err := libp2ppubsub.NewGossipSub(context.Background(), h, + libp2ppubsub.WithPeerExchange(true), + ) + if err != nil { + return fmt.Errorf("failed to create pubsub: %w", err) + } + + // Create pubsub adapter with "node" namespace + n.pubsub = pubsub.NewClientAdapter(ps, n.config.Discovery.NodeNamespace) + n.logger.Info("Initialized pubsub adapter on namespace", zap.String("namespace", n.config.Discovery.NodeNamespace)) + // Log configured bootstrap peers if len(n.config.Discovery.BootstrapPeers) > 0 { n.logger.ComponentInfo(logging.ComponentNode, "Configured bootstrap peers",