feat: implement asynchronous libp2p publishing in pubsub handler

- Updated the pubsubPublishHandler to publish messages to libp2p asynchronously, preventing HTTP response blocking.
- Introduced context with timeout for libp2p publishing to enhance reliability and logging for success and failure cases.
- Maintained immediate response for local delivery to WebSocket subscribers, improving overall message handling efficiency.
This commit is contained in:
anonpenguin 2025-10-28 10:25:38 +02:00
parent 8dc71e7920
commit 0f0237b5ea
No known key found for this signature in database
GPG Key ID: 1CBB1FE35AFBEE30
2 changed files with 19 additions and 20 deletions

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.52.13
VERSION := 0.52.14
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -1,6 +1,7 @@
package gateway
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
@ -287,27 +288,25 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
zap.Int("local_subscribers", len(localSubs)),
zap.Int("local_delivered", localDeliveryCount))
// Still publish to libp2p for cross-node delivery
ctx := pubsub.WithNamespace(client.WithInternalAuth(r.Context()), ns)
if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil {
// Log but don't fail - local delivery succeeded
g.logger.ComponentWarn("gateway", "libp2p publish failed (local delivery succeeded)",
zap.String("topic", body.Topic),
zap.Error(err))
// Still return OK since local delivery worked
if localDeliveryCount > 0 {
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "local_only": true})
return
// Publish to libp2p asynchronously for cross-node delivery
// This prevents blocking the HTTP response if libp2p network is slow
go func() {
publishCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ctx := pubsub.WithNamespace(client.WithInternalAuth(publishCtx), ns)
if err := g.client.PubSub().Publish(ctx, body.Topic, data); err != nil {
g.logger.ComponentWarn("gateway", "async libp2p publish failed",
zap.String("topic", body.Topic),
zap.Error(err))
} else {
g.logger.ComponentDebug("gateway", "async libp2p publish succeeded",
zap.String("topic", body.Topic))
}
// If no local delivery, return error
writeError(w, http.StatusInternalServerError, err.Error())
return
}
g.logger.ComponentInfo("gateway", "pubsub publish: message published successfully",
zap.String("topic", body.Topic),
zap.String("delivery", "local+libp2p"))
}()
// Return immediately after local delivery
// Local WebSocket subscribers already received the message
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}