network/pkg/pubsub/manager_test.go
anonpenguin23 a9844a1451 feat: add unit tests for gateway authentication and RQLite utilities
- Introduced comprehensive unit tests for the authentication service in the gateway, covering JWT generation, Base58 decoding, and signature verification for Ethereum and Solana.
- Added tests for RQLite cluster discovery functions, including host replacement logic and public IP validation.
- Implemented tests for RQLite utility functions, focusing on exponential backoff and data directory path resolution.
- Enhanced serverless engine tests to validate timeout handling and memory limits for WASM functions.
2025-12-31 12:26:31 +02:00

218 lines
5.1 KiB
Go

package pubsub
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)
func createTestManager(t *testing.T, ns string) (*Manager, func()) {
ctx, cancel := context.WithCancel(context.Background())
h, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatalf("failed to create libp2p host: %v", err)
}
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
h.Close()
t.Fatalf("failed to create gossipsub: %v", err)
}
mgr := NewManager(ps, ns)
cleanup := func() {
mgr.Close()
h.Close()
cancel()
}
return mgr, cleanup
}
func TestManager_Namespacing(t *testing.T) {
mgr, cleanup := createTestManager(t, "test-ns")
defer cleanup()
ctx := context.Background()
topic := "my-topic"
expectedNamespacedTopic := "test-ns.my-topic"
// Subscribe
err := mgr.Subscribe(ctx, topic, func(t string, d []byte) error { return nil })
if err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
mgr.mu.RLock()
_, exists := mgr.subscriptions[expectedNamespacedTopic]
mgr.mu.RUnlock()
if !exists {
t.Errorf("expected subscription for %s to exist", expectedNamespacedTopic)
}
// Test override
overrideNS := "other-ns"
overrideCtx := context.WithValue(ctx, CtxKeyNamespaceOverride, overrideNS)
expectedOverrideTopic := "other-ns.my-topic"
err = mgr.Subscribe(overrideCtx, topic, func(t string, d []byte) error { return nil })
if err != nil {
t.Fatalf("Subscribe with override failed: %v", err)
}
mgr.mu.RLock()
_, exists = mgr.subscriptions[expectedOverrideTopic]
mgr.mu.RUnlock()
if !exists {
t.Errorf("expected subscription for %s to exist", expectedOverrideTopic)
}
// Test ListTopics
topics, err := mgr.ListTopics(ctx)
if err != nil {
t.Fatalf("ListTopics failed: %v", err)
}
if len(topics) != 1 || topics[0] != "my-topic" {
t.Errorf("expected 1 topic [my-topic], got %v", topics)
}
topicsOverride, err := mgr.ListTopics(overrideCtx)
if err != nil {
t.Fatalf("ListTopics with override failed: %v", err)
}
if len(topicsOverride) != 1 || topicsOverride[0] != "my-topic" {
t.Errorf("expected 1 topic [my-topic] with override, got %v", topicsOverride)
}
}
func TestManager_RefCount(t *testing.T) {
mgr, cleanup := createTestManager(t, "test-ns")
defer cleanup()
ctx := context.Background()
topic := "ref-topic"
namespacedTopic := "test-ns.ref-topic"
h1 := func(t string, d []byte) error { return nil }
h2 := func(t string, d []byte) error { return nil }
// First subscription
err := mgr.Subscribe(ctx, topic, h1)
if err != nil {
t.Fatalf("first subscribe failed: %v", err)
}
mgr.mu.RLock()
ts := mgr.subscriptions[namespacedTopic]
mgr.mu.RUnlock()
if ts.refCount != 1 {
t.Errorf("expected refCount 1, got %d", ts.refCount)
}
// Second subscription
err = mgr.Subscribe(ctx, topic, h2)
if err != nil {
t.Fatalf("second subscribe failed: %v", err)
}
if ts.refCount != 2 {
t.Errorf("expected refCount 2, got %d", ts.refCount)
}
// Unsubscribe one
err = mgr.Unsubscribe(ctx, topic)
if err != nil {
t.Fatalf("unsubscribe 1 failed: %v", err)
}
if ts.refCount != 1 {
t.Errorf("expected refCount 1 after one unsubscribe, got %d", ts.refCount)
}
mgr.mu.RLock()
_, exists := mgr.subscriptions[namespacedTopic]
mgr.mu.RUnlock()
if !exists {
t.Error("expected subscription to still exist")
}
// Unsubscribe second
err = mgr.Unsubscribe(ctx, topic)
if err != nil {
t.Fatalf("unsubscribe 2 failed: %v", err)
}
mgr.mu.RLock()
_, exists = mgr.subscriptions[namespacedTopic]
mgr.mu.RUnlock()
if exists {
t.Error("expected subscription to be removed")
}
}
func TestManager_PubSub(t *testing.T) {
// For a real pubsub test between two managers, we need them to be connected
ctx := context.Background()
h1, _ := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
ps1, _ := pubsub.NewGossipSub(ctx, h1)
mgr1 := NewManager(ps1, "test")
defer h1.Close()
defer mgr1.Close()
h2, _ := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
ps2, _ := pubsub.NewGossipSub(ctx, h2)
mgr2 := NewManager(ps2, "test")
defer h2.Close()
defer mgr2.Close()
// Connect hosts
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), time.Hour)
err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
if err != nil {
t.Fatalf("failed to connect hosts: %v", err)
}
topic := "chat"
msgData := []byte("hello world")
received := make(chan []byte, 1)
err = mgr2.Subscribe(ctx, topic, func(t string, d []byte) error {
received <- d
return nil
})
if err != nil {
t.Fatalf("mgr2 subscribe failed: %v", err)
}
// Wait for mesh to form (mgr1 needs to know about mgr2's subscription)
// In a real network this happens via gossip. We'll just retry publish.
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
Loop:
for {
select {
case <-timeout:
t.Fatal("timed out waiting for message")
case <-ticker.C:
_ = mgr1.Publish(ctx, topic, msgData)
case data := <-received:
if string(data) != string(msgData) {
t.Errorf("expected %s, got %s", string(msgData), string(data))
}
break Loop
}
}
}