network/e2e/pubsub_client_test.go
anonpenguin23 0ea58354ca
feat: enhance E2E testing and dependency management
- Added new E2E tests for authentication, cache operations, and IPFS interactions to improve coverage and reliability.
- Introduced concurrency tests for cache operations to validate performance under load.
- Updated `go.mod` to include `github.com/mattn/go-sqlite3` as a dependency for database interactions.
- Refined Makefile to simplify E2E test execution and configuration discovery.
- Removed outdated client E2E tests and consolidated related functionality for better maintainability.
2025-11-10 15:36:58 +02:00

422 lines
10 KiB
Go

//go:build e2e
package e2e
import (
"context"
"fmt"
"sync"
"testing"
"time"
)
func newMessageCollector(ctx context.Context, buffer int) (chan []byte, func(string, []byte) error) {
if buffer <= 0 {
buffer = 1
}
ch := make(chan []byte, buffer)
handler := func(_ string, data []byte) error {
copied := append([]byte(nil), data...)
select {
case ch <- copied:
case <-ctx.Done():
}
return nil
}
return ch, handler
}
func waitForMessage(ctx context.Context, ch <-chan []byte) ([]byte, error) {
select {
case msg := <-ch:
return msg, nil
case <-ctx.Done():
return nil, fmt.Errorf("context finished while waiting for pubsub message: %w", ctx.Err())
}
}
func TestPubSub_SubscribePublish(t *testing.T) {
SkipIfMissingGateway(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create two clients
client1 := NewNetworkClient(t)
client2 := NewNetworkClient(t)
if err := client1.Connect(); err != nil {
t.Fatalf("client1 connect failed: %v", err)
}
defer client1.Disconnect()
if err := client2.Connect(); err != nil {
t.Fatalf("client2 connect failed: %v", err)
}
defer client2.Disconnect()
topic := GenerateTopic()
message := "test-message-from-client1"
// Subscribe on client2
messageCh, handler := newMessageCollector(ctx, 1)
if err := client2.PubSub().Subscribe(ctx, topic, handler); err != nil {
t.Fatalf("subscribe failed: %v", err)
}
defer client2.PubSub().Unsubscribe(ctx, topic)
// Give subscription time to propagate and mesh to form
Delay(2000)
// Publish from client1
if err := client1.PubSub().Publish(ctx, topic, []byte(message)); err != nil {
t.Fatalf("publish failed: %v", err)
}
// Receive message on client2
recvCtx, recvCancel := context.WithTimeout(ctx, 10*time.Second)
defer recvCancel()
msg, err := waitForMessage(recvCtx, messageCh)
if err != nil {
t.Fatalf("receive failed: %v", err)
}
if string(msg) != message {
t.Fatalf("expected message %q, got %q", message, string(msg))
}
}
func TestPubSub_MultipleSubscribers(t *testing.T) {
SkipIfMissingGateway(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create three clients
clientPub := NewNetworkClient(t)
clientSub1 := NewNetworkClient(t)
clientSub2 := NewNetworkClient(t)
if err := clientPub.Connect(); err != nil {
t.Fatalf("publisher connect failed: %v", err)
}
defer clientPub.Disconnect()
if err := clientSub1.Connect(); err != nil {
t.Fatalf("subscriber1 connect failed: %v", err)
}
defer clientSub1.Disconnect()
if err := clientSub2.Connect(); err != nil {
t.Fatalf("subscriber2 connect failed: %v", err)
}
defer clientSub2.Disconnect()
topic := GenerateTopic()
message1 := "message-for-sub1"
message2 := "message-for-sub2"
// Subscribe on both clients
sub1Ch, sub1Handler := newMessageCollector(ctx, 4)
if err := clientSub1.PubSub().Subscribe(ctx, topic, sub1Handler); err != nil {
t.Fatalf("subscribe1 failed: %v", err)
}
defer clientSub1.PubSub().Unsubscribe(ctx, topic)
sub2Ch, sub2Handler := newMessageCollector(ctx, 4)
if err := clientSub2.PubSub().Subscribe(ctx, topic, sub2Handler); err != nil {
t.Fatalf("subscribe2 failed: %v", err)
}
defer clientSub2.PubSub().Unsubscribe(ctx, topic)
// Give subscriptions time to propagate
Delay(500)
// Publish first message
if err := clientPub.PubSub().Publish(ctx, topic, []byte(message1)); err != nil {
t.Fatalf("publish1 failed: %v", err)
}
// Both subscribers should receive first message
recvCtx, recvCancel := context.WithTimeout(ctx, 10*time.Second)
defer recvCancel()
msg1a, err := waitForMessage(recvCtx, sub1Ch)
if err != nil {
t.Fatalf("sub1 receive1 failed: %v", err)
}
if string(msg1a) != message1 {
t.Fatalf("sub1: expected %q, got %q", message1, string(msg1a))
}
msg1b, err := waitForMessage(recvCtx, sub2Ch)
if err != nil {
t.Fatalf("sub2 receive1 failed: %v", err)
}
if string(msg1b) != message1 {
t.Fatalf("sub2: expected %q, got %q", message1, string(msg1b))
}
// Publish second message
if err := clientPub.PubSub().Publish(ctx, topic, []byte(message2)); err != nil {
t.Fatalf("publish2 failed: %v", err)
}
// Both subscribers should receive second message
recvCtx2, recvCancel2 := context.WithTimeout(ctx, 10*time.Second)
defer recvCancel2()
msg2a, err := waitForMessage(recvCtx2, sub1Ch)
if err != nil {
t.Fatalf("sub1 receive2 failed: %v", err)
}
if string(msg2a) != message2 {
t.Fatalf("sub1: expected %q, got %q", message2, string(msg2a))
}
msg2b, err := waitForMessage(recvCtx2, sub2Ch)
if err != nil {
t.Fatalf("sub2 receive2 failed: %v", err)
}
if string(msg2b) != message2 {
t.Fatalf("sub2: expected %q, got %q", message2, string(msg2b))
}
}
func TestPubSub_Deduplication(t *testing.T) {
SkipIfMissingGateway(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create two clients
clientPub := NewNetworkClient(t)
clientSub := NewNetworkClient(t)
if err := clientPub.Connect(); err != nil {
t.Fatalf("publisher connect failed: %v", err)
}
defer clientPub.Disconnect()
if err := clientSub.Connect(); err != nil {
t.Fatalf("subscriber connect failed: %v", err)
}
defer clientSub.Disconnect()
topic := GenerateTopic()
message := "duplicate-test-message"
// Subscribe on client
messageCh, handler := newMessageCollector(ctx, 3)
if err := clientSub.PubSub().Subscribe(ctx, topic, handler); err != nil {
t.Fatalf("subscribe failed: %v", err)
}
defer clientSub.PubSub().Unsubscribe(ctx, topic)
// Give subscription time to propagate and mesh to form
Delay(2000)
// Publish the same message multiple times
for i := 0; i < 3; i++ {
if err := clientPub.PubSub().Publish(ctx, topic, []byte(message)); err != nil {
t.Fatalf("publish %d failed: %v", i, err)
}
}
// Receive messages - should get all (no dedup filter on subscribe)
recvCtx, recvCancel := context.WithTimeout(ctx, 5*time.Second)
defer recvCancel()
receivedCount := 0
for receivedCount < 3 {
if _, err := waitForMessage(recvCtx, messageCh); err != nil {
break
}
receivedCount++
}
if receivedCount < 1 {
t.Fatalf("expected to receive at least 1 message, got %d", receivedCount)
}
}
func TestPubSub_ConcurrentPublish(t *testing.T) {
SkipIfMissingGateway(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create clients
clientPub := NewNetworkClient(t)
clientSub := NewNetworkClient(t)
if err := clientPub.Connect(); err != nil {
t.Fatalf("publisher connect failed: %v", err)
}
defer clientPub.Disconnect()
if err := clientSub.Connect(); err != nil {
t.Fatalf("subscriber connect failed: %v", err)
}
defer clientSub.Disconnect()
topic := GenerateTopic()
numMessages := 10
// Subscribe
messageCh, handler := newMessageCollector(ctx, numMessages)
if err := clientSub.PubSub().Subscribe(ctx, topic, handler); err != nil {
t.Fatalf("subscribe failed: %v", err)
}
defer clientSub.PubSub().Unsubscribe(ctx, topic)
// Give subscription time to propagate and mesh to form
Delay(2000)
// Publish multiple messages concurrently
var wg sync.WaitGroup
for i := 0; i < numMessages; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
msg := fmt.Sprintf("concurrent-msg-%d", idx)
if err := clientPub.PubSub().Publish(ctx, topic, []byte(msg)); err != nil {
t.Logf("publish %d failed: %v", idx, err)
}
}(i)
}
wg.Wait()
// Receive messages
recvCtx, recvCancel := context.WithTimeout(ctx, 10*time.Second)
defer recvCancel()
receivedCount := 0
for receivedCount < numMessages {
if _, err := waitForMessage(recvCtx, messageCh); err != nil {
break
}
receivedCount++
}
if receivedCount < numMessages {
t.Logf("expected %d messages, got %d (some may have been dropped)", numMessages, receivedCount)
}
}
func TestPubSub_TopicIsolation(t *testing.T) {
SkipIfMissingGateway(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create clients
clientPub := NewNetworkClient(t)
clientSub := NewNetworkClient(t)
if err := clientPub.Connect(); err != nil {
t.Fatalf("publisher connect failed: %v", err)
}
defer clientPub.Disconnect()
if err := clientSub.Connect(); err != nil {
t.Fatalf("subscriber connect failed: %v", err)
}
defer clientSub.Disconnect()
topic1 := GenerateTopic()
topic2 := GenerateTopic()
// Subscribe to topic1
messageCh, handler := newMessageCollector(ctx, 2)
if err := clientSub.PubSub().Subscribe(ctx, topic1, handler); err != nil {
t.Fatalf("subscribe1 failed: %v", err)
}
defer clientSub.PubSub().Unsubscribe(ctx, topic1)
// Give subscription time to propagate and mesh to form
Delay(2000)
// Publish to topic2
msg2 := "message-on-topic2"
if err := clientPub.PubSub().Publish(ctx, topic2, []byte(msg2)); err != nil {
t.Fatalf("publish2 failed: %v", err)
}
// Publish to topic1
msg1 := "message-on-topic1"
if err := clientPub.PubSub().Publish(ctx, topic1, []byte(msg1)); err != nil {
t.Fatalf("publish1 failed: %v", err)
}
// Receive on sub1 - should get msg1 only
recvCtx, recvCancel := context.WithTimeout(ctx, 10*time.Second)
defer recvCancel()
msg, err := waitForMessage(recvCtx, messageCh)
if err != nil {
t.Fatalf("receive failed: %v", err)
}
if string(msg) != msg1 {
t.Fatalf("expected %q, got %q", msg1, string(msg))
}
}
func TestPubSub_EmptyMessage(t *testing.T) {
SkipIfMissingGateway(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create clients
clientPub := NewNetworkClient(t)
clientSub := NewNetworkClient(t)
if err := clientPub.Connect(); err != nil {
t.Fatalf("publisher connect failed: %v", err)
}
defer clientPub.Disconnect()
if err := clientSub.Connect(); err != nil {
t.Fatalf("subscriber connect failed: %v", err)
}
defer clientSub.Disconnect()
topic := GenerateTopic()
// Subscribe
messageCh, handler := newMessageCollector(ctx, 1)
if err := clientSub.PubSub().Subscribe(ctx, topic, handler); err != nil {
t.Fatalf("subscribe failed: %v", err)
}
defer clientSub.PubSub().Unsubscribe(ctx, topic)
// Give subscription time to propagate and mesh to form
Delay(2000)
// Publish empty message
if err := clientPub.PubSub().Publish(ctx, topic, []byte("")); err != nil {
t.Fatalf("publish empty failed: %v", err)
}
// Receive on sub - should get empty message
recvCtx, recvCancel := context.WithTimeout(ctx, 10*time.Second)
defer recvCancel()
msg, err := waitForMessage(recvCtx, messageCh)
if err != nil {
t.Fatalf("receive failed: %v", err)
}
if len(msg) != 0 {
t.Fatalf("expected empty message, got %q", string(msg))
}
}