From 0f0237b5ea7ed9ca7cba3a40a829545d121f2581 Mon Sep 17 00:00:00 2001 From: anonpenguin Date: Tue, 28 Oct 2025 10:25:38 +0200 Subject: [PATCH] feat: implement asynchronous libp2p publishing in pubsub handler - Updated the pubsubPublishHandler to publish messages to libp2p asynchronously, preventing HTTP response blocking. - Introduced context with timeout for libp2p publishing to enhance reliability and logging for success and failure cases. - Maintained immediate response for local delivery to WebSocket subscribers, improving overall message handling efficiency. --- Makefile | 2 +- pkg/gateway/pubsub_handlers.go | 37 +++++++++++++++++----------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index c801706..b45505f 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.13 +VERSION := 0.52.14 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index 0ac4f72..af87641 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -1,6 +1,7 @@ package gateway import ( + "context" "encoding/base64" "encoding/json" "fmt" @@ -287,27 +288,25 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { zap.Int("local_subscribers", len(localSubs)), zap.Int("local_delivered", localDeliveryCount)) - // Still publish to libp2p for cross-node delivery - ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns) - if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil { - // Log but don't fail - local delivery succeeded - g.logger.ComponentWarn("gateway", "libp2p publish failed (local delivery succeeded)", - zap.String("topic", body.Topic), - zap.Error(err)) - // Still return OK since local delivery worked - if localDeliveryCount > 0 { - writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "local_only": true}) - return + // Publish to libp2p asynchronously for cross-node delivery + // This prevents blocking the HTTP response if libp2p network is slow + go func() { + publishCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + ctx := pubsub.WithNamespace(client.WithInternalAuth(publishCtx), ns) + if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil { + g.logger.ComponentWarn("gateway", "async libp2p publish failed", + zap.String("topic", body.Topic), + zap.Error(err)) + } else { + g.logger.ComponentDebug("gateway", "async libp2p publish succeeded", + zap.String("topic", body.Topic)) } - // If no local delivery, return error - writeError(w, http.StatusInternalServerError, err.Error()) - return - } - - g.logger.ComponentInfo("gateway", "pubsub publish: message published successfully", - zap.String("topic", body.Topic), - zap.String("delivery", "local+libp2p")) + }() + // Return immediately after local delivery + // Local WebSocket subscribers already received the message writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) }