feat: enhance IPFS configuration and logging in CLI

- Added IPFS cluster API and HTTP API configuration options to node and bootstrap configurations.
- Improved the generation of IPFS-related URLs and parameters for better integration.
- Enhanced error logging in cache handlers to provide more context on failures during cache operations.
This commit is contained in:
anonpenguin23 2025-11-08 11:59:38 +02:00
parent 50f7abf376
commit 93b25c42e4
4 changed files with 86 additions and 12 deletions

View File

@ -13,6 +13,21 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
### Deprecated
### Fixed
## [0.59.0] - 2025-11-08
### Added
- Added support for asynchronous pinning of uploaded files, improving upload speed.
- Added an optional `pin` flag to the storage upload endpoint to control whether content is pinned (defaults to true).
### Changed
- Improved handling of IPFS Cluster responses during the Add operation to correctly process streaming NDJSON output.
### Deprecated
### Removed
### Fixed
\n
## [0.58.0] - 2025-11-07
### Added

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks kill
VERSION := 0.58.0
VERSION := 0.59.0
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -9,6 +9,7 @@ import (
"io"
"net/http"
"strings"
"time"
"github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/logging"
@ -81,6 +82,7 @@ func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("Content-Type")
var reader io.Reader
var name string
var shouldPin bool = true // Default to true
if strings.HasPrefix(contentType, "multipart/form-data") {
// Handle multipart upload
@ -98,6 +100,11 @@ func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) {
reader = file
name = header.Filename
// Parse pin flag from form (default: true)
if pinValue := r.FormValue("pin"); pinValue != "" {
shouldPin = strings.ToLower(pinValue) == "true"
}
} else {
// Handle JSON request with base64 data
var req StorageUploadRequest
@ -120,6 +127,7 @@ func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) {
reader = bytes.NewReader(data)
name = req.Name
// For JSON requests, pin defaults to true (can be extended if needed)
}
// Add to IPFS
@ -131,19 +139,18 @@ func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}
// Pin with replication factor
_, err = g.ipfsClient.Pin(ctx, addResp.Cid, name, replicationFactor)
if err != nil {
g.logger.ComponentWarn(logging.ComponentGeneral, "failed to pin content", zap.Error(err), zap.String("cid", addResp.Cid))
// Still return success, but log the pin failure
}
// Return response immediately - don't block on pinning
response := StorageUploadResponse{
Cid: addResp.Cid,
Name: addResp.Name,
Size: addResp.Size,
}
// Pin asynchronously in background if requested
if shouldPin {
go g.pinAsync(addResp.Cid, name, replicationFactor)
}
writeJSON(w, http.StatusOK, response)
}
@ -322,6 +329,34 @@ func (g *Gateway) storageUnpinHandler(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "cid": path})
}
// pinAsync pins a CID asynchronously in the background with retry logic
// Retries once if the first attempt fails, then gives up
func (g *Gateway) pinAsync(cid, name string, replicationFactor int) {
ctx := context.Background()
// First attempt
_, err := g.ipfsClient.Pin(ctx, cid, name, replicationFactor)
if err == nil {
g.logger.ComponentWarn(logging.ComponentGeneral, "async pin succeeded", zap.String("cid", cid))
return
}
// Log first failure
g.logger.ComponentWarn(logging.ComponentGeneral, "async pin failed, retrying once",
zap.Error(err), zap.String("cid", cid))
// Retry once after a short delay
time.Sleep(2 * time.Second)
_, err = g.ipfsClient.Pin(ctx, cid, name, replicationFactor)
if err != nil {
// Final failure - log and give up
g.logger.ComponentWarn(logging.ComponentGeneral, "async pin retry failed, giving up",
zap.Error(err), zap.String("cid", cid))
} else {
g.logger.ComponentWarn(logging.ComponentGeneral, "async pin succeeded on retry", zap.String("cid", cid))
}
}
// base64Decode decodes base64 string to bytes
func base64Decode(s string) ([]byte, error) {
return base64.StdEncoding.DecodeString(s)

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
@ -177,12 +178,35 @@ func (c *Client) Add(ctx context.Context, reader io.Reader, name string) (*AddRe
return nil, fmt.Errorf("add failed with status %d: %s", resp.StatusCode, string(body))
}
var result AddResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode add response: %w", err)
// IPFS Cluster streams NDJSON responses. We need to drain the entire stream
// to prevent the connection from closing prematurely, which would cancel
// the cluster's pinning operation. Read all JSON objects and keep the last one.
dec := json.NewDecoder(resp.Body)
var last AddResponse
var hasResult bool
for {
var chunk AddResponse
if err := dec.Decode(&chunk); err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("failed to decode add response: %w", err)
}
last = chunk
hasResult = true
}
return &result, nil
if !hasResult {
return nil, fmt.Errorf("add response missing CID")
}
// Ensure name is set if provided
if last.Name == "" && name != "" {
last.Name = name
}
return &last, nil
}
// Pin pins a CID with specified replication factor