From 93b25c42e45626995a1ec468c91a14237c4707c6 Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Sat, 8 Nov 2025 11:59:38 +0200 Subject: [PATCH] feat: enhance IPFS configuration and logging in CLI - Added IPFS cluster API and HTTP API configuration options to node and bootstrap configurations. - Improved the generation of IPFS-related URLs and parameters for better integration. - Enhanced error logging in cache handlers to provide more context on failures during cache operations. --- CHANGELOG.md | 15 ++++++++++ Makefile | 2 +- pkg/gateway/storage_handlers.go | 49 ++++++++++++++++++++++++++++----- pkg/ipfs/client.go | 32 ++++++++++++++++++--- 4 files changed, 86 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c68f26..55eedd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.59.0] - 2025-11-08 + +### Added +- Added support for asynchronous pinning of uploaded files, improving upload speed. +- Added an optional `pin` flag to the storage upload endpoint to control whether content is pinned (defaults to true). + +### Changed +- Improved handling of IPFS Cluster responses during the Add operation to correctly process streaming NDJSON output. + +### Deprecated + +### Removed + +### Fixed +\n ## [0.58.0] - 2025-11-07 ### Added diff --git a/Makefile b/Makefile index 4262b76..215f1c7 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill -VERSION := 0.58.0 +VERSION := 0.59.0 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/gateway/storage_handlers.go b/pkg/gateway/storage_handlers.go index 16706b3..03e3f91 100644 --- a/pkg/gateway/storage_handlers.go +++ b/pkg/gateway/storage_handlers.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "strings" + "time" "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/logging" @@ -81,6 +82,7 @@ func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) { contentType := r.Header.Get("Content-Type") var reader io.Reader var name string + var shouldPin bool = true // Default to true if strings.HasPrefix(contentType, "multipart/form-data") { // Handle multipart upload @@ -98,6 +100,11 @@ func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) { reader = file name = header.Filename + + // Parse pin flag from form (default: true) + if pinValue := r.FormValue("pin"); pinValue != "" { + shouldPin = strings.ToLower(pinValue) == "true" + } } else { // Handle JSON request with base64 data var req StorageUploadRequest @@ -120,6 +127,7 @@ func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) { reader = bytes.NewReader(data) name = req.Name + // For JSON requests, pin defaults to true (can be extended if needed) } // Add to IPFS @@ -131,19 +139,18 @@ func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) { return } - // Pin with replication factor - _, err = g.ipfsClient.Pin(ctx, addResp.Cid, name, replicationFactor) - if err != nil { - g.logger.ComponentWarn(logging.ComponentGeneral, "failed to pin content", zap.Error(err), zap.String("cid", addResp.Cid)) - // Still return success, but log the pin failure - } - + // Return response immediately - don't block on pinning response := StorageUploadResponse{ Cid: addResp.Cid, Name: addResp.Name, Size: addResp.Size, } + // Pin asynchronously in background if requested + if shouldPin { + go g.pinAsync(addResp.Cid, name, replicationFactor) + } + writeJSON(w, http.StatusOK, response) } @@ -322,6 +329,34 @@ func (g *Gateway) storageUnpinHandler(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "cid": path}) } +// pinAsync pins a CID asynchronously in the background with retry logic +// Retries once if the first attempt fails, then gives up +func (g *Gateway) pinAsync(cid, name string, replicationFactor int) { + ctx := context.Background() + + // First attempt + _, err := g.ipfsClient.Pin(ctx, cid, name, replicationFactor) + if err == nil { + g.logger.ComponentWarn(logging.ComponentGeneral, "async pin succeeded", zap.String("cid", cid)) + return + } + + // Log first failure + g.logger.ComponentWarn(logging.ComponentGeneral, "async pin failed, retrying once", + zap.Error(err), zap.String("cid", cid)) + + // Retry once after a short delay + time.Sleep(2 * time.Second) + _, err = g.ipfsClient.Pin(ctx, cid, name, replicationFactor) + if err != nil { + // Final failure - log and give up + g.logger.ComponentWarn(logging.ComponentGeneral, "async pin retry failed, giving up", + zap.Error(err), zap.String("cid", cid)) + } else { + g.logger.ComponentWarn(logging.ComponentGeneral, "async pin succeeded on retry", zap.String("cid", cid)) + } +} + // base64Decode decodes base64 string to bytes func base64Decode(s string) ([]byte, error) { return base64.StdEncoding.DecodeString(s) diff --git a/pkg/ipfs/client.go b/pkg/ipfs/client.go index 83dbb5d..ff41c42 100644 --- a/pkg/ipfs/client.go +++ b/pkg/ipfs/client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "mime/multipart" @@ -177,12 +178,35 @@ func (c *Client) Add(ctx context.Context, reader io.Reader, name string) (*AddRe return nil, fmt.Errorf("add failed with status %d: %s", resp.StatusCode, string(body)) } - var result AddResponse - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return nil, fmt.Errorf("failed to decode add response: %w", err) + // IPFS Cluster streams NDJSON responses. We need to drain the entire stream + // to prevent the connection from closing prematurely, which would cancel + // the cluster's pinning operation. Read all JSON objects and keep the last one. + dec := json.NewDecoder(resp.Body) + var last AddResponse + var hasResult bool + + for { + var chunk AddResponse + if err := dec.Decode(&chunk); err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, fmt.Errorf("failed to decode add response: %w", err) + } + last = chunk + hasResult = true } - return &result, nil + if !hasResult { + return nil, fmt.Errorf("add response missing CID") + } + + // Ensure name is set if provided + if last.Name == "" && name != "" { + last.Name = name + } + + return &last, nil } // Pin pins a CID with specified replication factor