mirror of
https://github.com/DeBrosOfficial/network.git
synced 2026-01-30 03:43:04 +00:00
- Introduced comprehensive unit tests for the authentication service in the gateway, covering JWT generation, Base58 decoding, and signature verification for Ethereum and Solana. - Added tests for RQLite cluster discovery functions, including host replacement logic and public IP validation. - Implemented tests for RQLite utility functions, focusing on exponential backoff and data directory path resolution. - Enhanced serverless engine tests to validate timeout handling and memory limits for WASM functions.
218 lines
5.1 KiB
Go
218 lines
5.1 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
func createTestManager(t *testing.T, ns string) (*Manager, func()) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
h, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
|
|
if err != nil {
|
|
t.Fatalf("failed to create libp2p host: %v", err)
|
|
}
|
|
|
|
ps, err := pubsub.NewGossipSub(ctx, h)
|
|
if err != nil {
|
|
h.Close()
|
|
t.Fatalf("failed to create gossipsub: %v", err)
|
|
}
|
|
|
|
mgr := NewManager(ps, ns)
|
|
|
|
cleanup := func() {
|
|
mgr.Close()
|
|
h.Close()
|
|
cancel()
|
|
}
|
|
|
|
return mgr, cleanup
|
|
}
|
|
|
|
func TestManager_Namespacing(t *testing.T) {
|
|
mgr, cleanup := createTestManager(t, "test-ns")
|
|
defer cleanup()
|
|
|
|
ctx := context.Background()
|
|
topic := "my-topic"
|
|
expectedNamespacedTopic := "test-ns.my-topic"
|
|
|
|
// Subscribe
|
|
err := mgr.Subscribe(ctx, topic, func(t string, d []byte) error { return nil })
|
|
if err != nil {
|
|
t.Fatalf("Subscribe failed: %v", err)
|
|
}
|
|
|
|
mgr.mu.RLock()
|
|
_, exists := mgr.subscriptions[expectedNamespacedTopic]
|
|
mgr.mu.RUnlock()
|
|
|
|
if !exists {
|
|
t.Errorf("expected subscription for %s to exist", expectedNamespacedTopic)
|
|
}
|
|
|
|
// Test override
|
|
overrideNS := "other-ns"
|
|
overrideCtx := context.WithValue(ctx, CtxKeyNamespaceOverride, overrideNS)
|
|
expectedOverrideTopic := "other-ns.my-topic"
|
|
|
|
err = mgr.Subscribe(overrideCtx, topic, func(t string, d []byte) error { return nil })
|
|
if err != nil {
|
|
t.Fatalf("Subscribe with override failed: %v", err)
|
|
}
|
|
|
|
mgr.mu.RLock()
|
|
_, exists = mgr.subscriptions[expectedOverrideTopic]
|
|
mgr.mu.RUnlock()
|
|
|
|
if !exists {
|
|
t.Errorf("expected subscription for %s to exist", expectedOverrideTopic)
|
|
}
|
|
|
|
// Test ListTopics
|
|
topics, err := mgr.ListTopics(ctx)
|
|
if err != nil {
|
|
t.Fatalf("ListTopics failed: %v", err)
|
|
}
|
|
if len(topics) != 1 || topics[0] != "my-topic" {
|
|
t.Errorf("expected 1 topic [my-topic], got %v", topics)
|
|
}
|
|
|
|
topicsOverride, err := mgr.ListTopics(overrideCtx)
|
|
if err != nil {
|
|
t.Fatalf("ListTopics with override failed: %v", err)
|
|
}
|
|
if len(topicsOverride) != 1 || topicsOverride[0] != "my-topic" {
|
|
t.Errorf("expected 1 topic [my-topic] with override, got %v", topicsOverride)
|
|
}
|
|
}
|
|
|
|
func TestManager_RefCount(t *testing.T) {
|
|
mgr, cleanup := createTestManager(t, "test-ns")
|
|
defer cleanup()
|
|
|
|
ctx := context.Background()
|
|
topic := "ref-topic"
|
|
namespacedTopic := "test-ns.ref-topic"
|
|
|
|
h1 := func(t string, d []byte) error { return nil }
|
|
h2 := func(t string, d []byte) error { return nil }
|
|
|
|
// First subscription
|
|
err := mgr.Subscribe(ctx, topic, h1)
|
|
if err != nil {
|
|
t.Fatalf("first subscribe failed: %v", err)
|
|
}
|
|
|
|
mgr.mu.RLock()
|
|
ts := mgr.subscriptions[namespacedTopic]
|
|
mgr.mu.RUnlock()
|
|
|
|
if ts.refCount != 1 {
|
|
t.Errorf("expected refCount 1, got %d", ts.refCount)
|
|
}
|
|
|
|
// Second subscription
|
|
err = mgr.Subscribe(ctx, topic, h2)
|
|
if err != nil {
|
|
t.Fatalf("second subscribe failed: %v", err)
|
|
}
|
|
|
|
if ts.refCount != 2 {
|
|
t.Errorf("expected refCount 2, got %d", ts.refCount)
|
|
}
|
|
|
|
// Unsubscribe one
|
|
err = mgr.Unsubscribe(ctx, topic)
|
|
if err != nil {
|
|
t.Fatalf("unsubscribe 1 failed: %v", err)
|
|
}
|
|
|
|
if ts.refCount != 1 {
|
|
t.Errorf("expected refCount 1 after one unsubscribe, got %d", ts.refCount)
|
|
}
|
|
|
|
mgr.mu.RLock()
|
|
_, exists := mgr.subscriptions[namespacedTopic]
|
|
mgr.mu.RUnlock()
|
|
if !exists {
|
|
t.Error("expected subscription to still exist")
|
|
}
|
|
|
|
// Unsubscribe second
|
|
err = mgr.Unsubscribe(ctx, topic)
|
|
if err != nil {
|
|
t.Fatalf("unsubscribe 2 failed: %v", err)
|
|
}
|
|
|
|
mgr.mu.RLock()
|
|
_, exists = mgr.subscriptions[namespacedTopic]
|
|
mgr.mu.RUnlock()
|
|
if exists {
|
|
t.Error("expected subscription to be removed")
|
|
}
|
|
}
|
|
|
|
func TestManager_PubSub(t *testing.T) {
|
|
// For a real pubsub test between two managers, we need them to be connected
|
|
ctx := context.Background()
|
|
|
|
h1, _ := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
|
|
ps1, _ := pubsub.NewGossipSub(ctx, h1)
|
|
mgr1 := NewManager(ps1, "test")
|
|
defer h1.Close()
|
|
defer mgr1.Close()
|
|
|
|
h2, _ := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
|
|
ps2, _ := pubsub.NewGossipSub(ctx, h2)
|
|
mgr2 := NewManager(ps2, "test")
|
|
defer h2.Close()
|
|
defer mgr2.Close()
|
|
|
|
// Connect hosts
|
|
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), time.Hour)
|
|
err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
|
|
if err != nil {
|
|
t.Fatalf("failed to connect hosts: %v", err)
|
|
}
|
|
|
|
topic := "chat"
|
|
msgData := []byte("hello world")
|
|
received := make(chan []byte, 1)
|
|
|
|
err = mgr2.Subscribe(ctx, topic, func(t string, d []byte) error {
|
|
received <- d
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("mgr2 subscribe failed: %v", err)
|
|
}
|
|
|
|
// Wait for mesh to form (mgr1 needs to know about mgr2's subscription)
|
|
// In a real network this happens via gossip. We'll just retry publish.
|
|
timeout := time.After(5 * time.Second)
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
Loop:
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatal("timed out waiting for message")
|
|
case <-ticker.C:
|
|
_ = mgr1.Publish(ctx, topic, msgData)
|
|
case data := <-received:
|
|
if string(data) != string(msgData) {
|
|
t.Errorf("expected %s, got %s", string(msgData), string(data))
|
|
}
|
|
break Loop
|
|
}
|
|
}
|
|
}
|