mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-12-11 08:38:48 +00:00
feat: integrate Olric distributed cache support
- Added Olric cache server integration, including configuration options for Olric servers and timeout settings. - Implemented HTTP handlers for cache operations: health check, get, put, delete, and scan. - Enhanced Makefile with commands to run the Olric server and manage its configuration. - Updated README and setup scripts to include Olric installation and configuration instructions. - Introduced tests for cache handlers to ensure proper functionality and error handling.
This commit is contained in:
parent
3196e91e85
commit
cf26c1af2c
36
CHANGELOG.md
36
CHANGELOG.md
@ -13,6 +13,42 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant
|
||||
### Deprecated
|
||||
|
||||
### Fixed
|
||||
## [0.56.0] - 2025-11-05
|
||||
|
||||
### Added
|
||||
- Added IPFS storage endpoints to the Gateway for content upload, pinning, status, retrieval, and unpinning.
|
||||
- Introduced `StorageClient` interface and implementation in the Go client library for interacting with the new IPFS storage endpoints.
|
||||
- Added support for automatically starting IPFS daemon, IPFS Cluster daemon, and Olric cache server in the `dev` environment setup.
|
||||
|
||||
### Changed
|
||||
- Updated Gateway configuration to include settings for IPFS Cluster API URL, IPFS API URL, timeout, and replication factor.
|
||||
- Refactored Olric configuration generation to use a simpler, local-environment focused setup.
|
||||
- Improved IPFS content retrieval (`Get`) to fall back to the IPFS Gateway (port 8080) if the IPFS API (port 5001) returns a 404.
|
||||
|
||||
### Deprecated
|
||||
|
||||
### Removed
|
||||
|
||||
### Fixed
|
||||
\n
|
||||
## [0.55.0] - 2025-11-05
|
||||
|
||||
### Added
|
||||
- Added IPFS storage endpoints to the Gateway for content upload, pinning, status, retrieval, and unpinning.
|
||||
- Introduced `StorageClient` interface and implementation in the Go client library for interacting with the new IPFS storage endpoints.
|
||||
- Added support for automatically starting IPFS daemon, IPFS Cluster daemon, and Olric cache server in the `dev` environment setup.
|
||||
|
||||
### Changed
|
||||
- Updated Gateway configuration to include settings for IPFS Cluster API URL, IPFS API URL, timeout, and replication factor.
|
||||
- Refactored Olric configuration generation to use a simpler, local-environment focused setup.
|
||||
- Improved `dev` environment logging to include logs from IPFS and Olric services when running.
|
||||
|
||||
### Deprecated
|
||||
|
||||
### Removed
|
||||
|
||||
### Fixed
|
||||
\n
|
||||
## [0.54.0] - 2025-11-03
|
||||
|
||||
### Added
|
||||
|
||||
89
Makefile
89
Makefile
@ -21,7 +21,7 @@ test-e2e:
|
||||
|
||||
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks
|
||||
|
||||
VERSION := 0.54.0
|
||||
VERSION := 0.56.0
|
||||
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
|
||||
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'
|
||||
@ -119,6 +119,58 @@ dev: build
|
||||
@echo "Starting node3..."
|
||||
@nohup ./bin/node --config node3.yaml > $$HOME/.debros/logs/node3.log 2>&1 & echo $$! > .dev/pids/node3.pid
|
||||
@sleep 1
|
||||
@echo "Starting IPFS daemon..."
|
||||
@if command -v ipfs >/dev/null 2>&1; then \
|
||||
if [ ! -d $$HOME/.debros/ipfs ]; then \
|
||||
echo " Initializing IPFS repository..."; \
|
||||
IPFS_PATH=$$HOME/.debros/ipfs ipfs init 2>&1 | grep -v "generating" | grep -v "peer identity" || true; \
|
||||
fi; \
|
||||
if ! pgrep -f "ipfs daemon" >/dev/null 2>&1; then \
|
||||
IPFS_PATH=$$HOME/.debros/ipfs nohup ipfs daemon > $$HOME/.debros/logs/ipfs.log 2>&1 & echo $$! > .dev/pids/ipfs.pid; \
|
||||
echo " IPFS daemon started (PID: $$(cat .dev/pids/ipfs.pid))"; \
|
||||
sleep 5; \
|
||||
else \
|
||||
echo " ✓ IPFS daemon already running"; \
|
||||
fi; \
|
||||
else \
|
||||
echo " ⚠️ ipfs command not found - skipping IPFS (storage endpoints will be disabled)"; \
|
||||
echo " Install with: https://docs.ipfs.tech/install/"; \
|
||||
fi
|
||||
@echo "Starting IPFS Cluster daemon..."
|
||||
@if command -v ipfs-cluster-service >/dev/null 2>&1; then \
|
||||
if [ ! -d $$HOME/.debros/ipfs-cluster ]; then \
|
||||
echo " Initializing IPFS Cluster..."; \
|
||||
CLUSTER_PATH=$$HOME/.debros/ipfs-cluster ipfs-cluster-service init --force 2>&1 | grep -v "peer identity" || true; \
|
||||
fi; \
|
||||
if ! pgrep -f "ipfs-cluster-service" >/dev/null 2>&1; then \
|
||||
CLUSTER_PATH=$$HOME/.debros/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster.pid; \
|
||||
echo " IPFS Cluster daemon started (PID: $$(cat .dev/pids/ipfs-cluster.pid))"; \
|
||||
sleep 5; \
|
||||
else \
|
||||
echo " ✓ IPFS Cluster daemon already running"; \
|
||||
fi; \
|
||||
else \
|
||||
echo " ⚠️ ipfs-cluster-service command not found - skipping IPFS Cluster (storage endpoints will be disabled)"; \
|
||||
echo " Install with: https://ipfscluster.io/documentation/guides/install/"; \
|
||||
fi
|
||||
@echo "Starting Olric cache server..."
|
||||
@if command -v olric-server >/dev/null 2>&1; then \
|
||||
if [ ! -f $$HOME/.debros/olric-config.yaml ]; then \
|
||||
echo " Creating Olric config..."; \
|
||||
mkdir -p $$HOME/.debros; \
|
||||
fi; \
|
||||
if ! pgrep -f "olric-server" >/dev/null 2>&1; then \
|
||||
OLRIC_SERVER_CONFIG=$$HOME/.debros/olric-config.yaml nohup olric-server > $$HOME/.debros/logs/olric.log 2>&1 & echo $$! > .dev/pids/olric.pid; \
|
||||
echo " Olric cache server started (PID: $$(cat .dev/pids/olric.pid))"; \
|
||||
sleep 3; \
|
||||
else \
|
||||
echo " ✓ Olric cache server already running"; \
|
||||
fi; \
|
||||
else \
|
||||
echo " ⚠️ olric-server command not found - skipping Olric (cache endpoints will be disabled)"; \
|
||||
echo " Install with: go install github.com/olric-data/olric/cmd/olric-server@v0.7.0"; \
|
||||
fi
|
||||
@sleep 1
|
||||
@echo "Starting gateway..."
|
||||
@nohup ./bin/gateway --config gateway.yaml > $$HOME/.debros/logs/gateway.log 2>&1 & echo $$! > .dev/pids/gateway.pid
|
||||
@echo ""
|
||||
@ -130,6 +182,15 @@ dev: build
|
||||
@if [ -f .dev/pids/anon.pid ]; then \
|
||||
echo " Anon: PID=$$(cat .dev/pids/anon.pid) (SOCKS: 9050)"; \
|
||||
fi
|
||||
@if [ -f .dev/pids/ipfs.pid ]; then \
|
||||
echo " IPFS: PID=$$(cat .dev/pids/ipfs.pid) (API: 5001)"; \
|
||||
fi
|
||||
@if [ -f .dev/pids/ipfs-cluster.pid ]; then \
|
||||
echo " IPFS Cluster: PID=$$(cat .dev/pids/ipfs-cluster.pid) (API: 9094)"; \
|
||||
fi
|
||||
@if [ -f .dev/pids/olric.pid ]; then \
|
||||
echo " Olric: PID=$$(cat .dev/pids/olric.pid) (API: 3320)"; \
|
||||
fi
|
||||
@echo " Bootstrap: PID=$$(cat .dev/pids/bootstrap.pid)"
|
||||
@echo " Node2: PID=$$(cat .dev/pids/node2.pid)"
|
||||
@echo " Node3: PID=$$(cat .dev/pids/node3.pid)"
|
||||
@ -137,6 +198,13 @@ dev: build
|
||||
@echo ""
|
||||
@echo "Ports:"
|
||||
@echo " Anon SOCKS: 9050 (proxy endpoint: POST /v1/proxy/anon)"
|
||||
@if [ -f .dev/pids/ipfs.pid ]; then \
|
||||
echo " IPFS API: 5001 (content retrieval)"; \
|
||||
echo " IPFS Cluster: 9094 (pin management)"; \
|
||||
fi
|
||||
@if [ -f .dev/pids/olric.pid ]; then \
|
||||
echo " Olric: 3320 (cache API)"; \
|
||||
fi
|
||||
@echo " Bootstrap P2P: 4001, HTTP: 5001, Raft: 7001"
|
||||
@echo " Node2 P2P: 4002, HTTP: 5002, Raft: 7002"
|
||||
@echo " Node3 P2P: 4003, HTTP: 5003, Raft: 7003"
|
||||
@ -145,13 +213,18 @@ dev: build
|
||||
@echo "Press Ctrl+C to stop all processes"
|
||||
@echo "============================================================"
|
||||
@echo ""
|
||||
@if [ -f .dev/pids/anon.pid ]; then \
|
||||
trap 'echo "Stopping all processes..."; kill $$(cat .dev/pids/*.pid) 2>/dev/null; rm -f .dev/pids/*.pid; exit 0' INT; \
|
||||
tail -f $$HOME/.debros/logs/anon.log $$HOME/.debros/logs/bootstrap.log $$HOME/.debros/logs/node2.log $$HOME/.debros/logs/node3.log $$HOME/.debros/logs/gateway.log; \
|
||||
else \
|
||||
trap 'echo "Stopping all processes..."; kill $$(cat .dev/pids/*.pid) 2>/dev/null; rm -f .dev/pids/*.pid; exit 0' INT; \
|
||||
tail -f $$HOME/.debros/logs/bootstrap.log $$HOME/.debros/logs/node2.log $$HOME/.debros/logs/node3.log $$HOME/.debros/logs/gateway.log; \
|
||||
fi
|
||||
@LOGS="$$HOME/.debros/logs/bootstrap.log $$HOME/.debros/logs/node2.log $$HOME/.debros/logs/node3.log $$HOME/.debros/logs/gateway.log"; \
|
||||
if [ -f .dev/pids/anon.pid ]; then \
|
||||
LOGS="$$LOGS $$HOME/.debros/logs/anon.log"; \
|
||||
fi; \
|
||||
if [ -f .dev/pids/ipfs.pid ]; then \
|
||||
LOGS="$$LOGS $$HOME/.debros/logs/ipfs.log"; \
|
||||
fi; \
|
||||
if [ -f .dev/pids/ipfs-cluster.pid ]; then \
|
||||
LOGS="$$LOGS $$HOME/.debros/logs/ipfs-cluster.log"; \
|
||||
fi; \
|
||||
trap 'echo "Stopping all processes..."; kill $$(cat .dev/pids/*.pid) 2>/dev/null; rm -f .dev/pids/*.pid; exit 0' INT; \
|
||||
tail -f $$LOGS
|
||||
|
||||
# Help
|
||||
help:
|
||||
|
||||
@ -139,6 +139,7 @@ Common endpoints (see `openapi/gateway.yaml` for the full spec):
|
||||
- `POST /v1/rqlite/exec`, `POST /v1/rqlite/find`, `POST /v1/rqlite/select`, `POST /v1/rqlite/transaction`
|
||||
- `GET /v1/rqlite/schema`
|
||||
- `POST /v1/pubsub/publish`, `GET /v1/pubsub/topics`, `GET /v1/pubsub/ws?topic=<topic>`
|
||||
- `POST /v1/storage/upload`, `POST /v1/storage/pin`, `GET /v1/storage/status/:cid`, `GET /v1/storage/get/:cid`, `DELETE /v1/storage/unpin/:cid`
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
|
||||
@ -51,15 +51,19 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
|
||||
|
||||
// Load YAML
|
||||
type yamlCfg struct {
|
||||
ListenAddr string `yaml:"listen_addr"`
|
||||
ClientNamespace string `yaml:"client_namespace"`
|
||||
RQLiteDSN string `yaml:"rqlite_dsn"`
|
||||
BootstrapPeers []string `yaml:"bootstrap_peers"`
|
||||
EnableHTTPS bool `yaml:"enable_https"`
|
||||
DomainName string `yaml:"domain_name"`
|
||||
TLSCacheDir string `yaml:"tls_cache_dir"`
|
||||
OlricServers []string `yaml:"olric_servers"`
|
||||
OlricTimeout string `yaml:"olric_timeout"`
|
||||
ListenAddr string `yaml:"listen_addr"`
|
||||
ClientNamespace string `yaml:"client_namespace"`
|
||||
RQLiteDSN string `yaml:"rqlite_dsn"`
|
||||
BootstrapPeers []string `yaml:"bootstrap_peers"`
|
||||
EnableHTTPS bool `yaml:"enable_https"`
|
||||
DomainName string `yaml:"domain_name"`
|
||||
TLSCacheDir string `yaml:"tls_cache_dir"`
|
||||
OlricServers []string `yaml:"olric_servers"`
|
||||
OlricTimeout string `yaml:"olric_timeout"`
|
||||
IPFSClusterAPIURL string `yaml:"ipfs_cluster_api_url"`
|
||||
IPFSAPIURL string `yaml:"ipfs_api_url"`
|
||||
IPFSTimeout string `yaml:"ipfs_timeout"`
|
||||
IPFSReplicationFactor int `yaml:"ipfs_replication_factor"`
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(configPath)
|
||||
@ -82,15 +86,19 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
|
||||
|
||||
// Build config from YAML
|
||||
cfg := &gateway.Config{
|
||||
ListenAddr: ":6001",
|
||||
ClientNamespace: "default",
|
||||
BootstrapPeers: nil,
|
||||
RQLiteDSN: "",
|
||||
EnableHTTPS: false,
|
||||
DomainName: "",
|
||||
TLSCacheDir: "",
|
||||
OlricServers: nil,
|
||||
OlricTimeout: 0,
|
||||
ListenAddr: ":6001",
|
||||
ClientNamespace: "default",
|
||||
BootstrapPeers: nil,
|
||||
RQLiteDSN: "",
|
||||
EnableHTTPS: false,
|
||||
DomainName: "",
|
||||
TLSCacheDir: "",
|
||||
OlricServers: nil,
|
||||
OlricTimeout: 0,
|
||||
IPFSClusterAPIURL: "",
|
||||
IPFSAPIURL: "",
|
||||
IPFSTimeout: 0,
|
||||
IPFSReplicationFactor: 0,
|
||||
}
|
||||
|
||||
if v := strings.TrimSpace(y.ListenAddr); v != "" {
|
||||
@ -142,6 +150,24 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
|
||||
}
|
||||
}
|
||||
|
||||
// IPFS configuration
|
||||
if v := strings.TrimSpace(y.IPFSClusterAPIURL); v != "" {
|
||||
cfg.IPFSClusterAPIURL = v
|
||||
}
|
||||
if v := strings.TrimSpace(y.IPFSAPIURL); v != "" {
|
||||
cfg.IPFSAPIURL = v
|
||||
}
|
||||
if v := strings.TrimSpace(y.IPFSTimeout); v != "" {
|
||||
if parsed, err := time.ParseDuration(v); err == nil {
|
||||
cfg.IPFSTimeout = parsed
|
||||
} else {
|
||||
logger.ComponentWarn(logging.ComponentGeneral, "invalid ipfs_timeout, using default", zap.String("value", v), zap.Error(err))
|
||||
}
|
||||
}
|
||||
if y.IPFSReplicationFactor > 0 {
|
||||
cfg.IPFSReplicationFactor = y.IPFSReplicationFactor
|
||||
}
|
||||
|
||||
// Validate configuration
|
||||
if errs := cfg.ValidateConfig(); len(errs) > 0 {
|
||||
fmt.Fprintf(os.Stderr, "\nGateway configuration errors (%d):\n", len(errs))
|
||||
|
||||
@ -3,10 +3,13 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@ -407,6 +410,201 @@ func TestGateway_Database_RecreateWithFK(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGateway_Storage_UploadMultipart(t *testing.T) {
|
||||
key := requireAPIKey(t)
|
||||
base := gatewayBaseURL()
|
||||
|
||||
// Create multipart form data using proper multipart writer
|
||||
content := []byte("test file content for IPFS upload")
|
||||
var buf bytes.Buffer
|
||||
writer := multipart.NewWriter(&buf)
|
||||
part, err := writer.CreateFormFile("file", "test.txt")
|
||||
if err != nil {
|
||||
t.Fatalf("create form file: %v", err)
|
||||
}
|
||||
if _, err := part.Write(content); err != nil {
|
||||
t.Fatalf("write content: %v", err)
|
||||
}
|
||||
if err := writer.Close(); err != nil {
|
||||
t.Fatalf("close writer: %v", err)
|
||||
}
|
||||
|
||||
req, _ := http.NewRequest(http.MethodPost, base+"/v1/storage/upload", &buf)
|
||||
req.Header = authHeader(key)
|
||||
req.Header.Set("Content-Type", writer.FormDataContentType())
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("upload do: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusServiceUnavailable {
|
||||
t.Skip("IPFS storage not available; skipping storage tests")
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("upload status: %d, body: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var uploadResp struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&uploadResp); err != nil {
|
||||
t.Fatalf("upload decode: %v", err)
|
||||
}
|
||||
if uploadResp.Cid == "" {
|
||||
t.Fatalf("upload returned empty CID")
|
||||
}
|
||||
if uploadResp.Name != "test.txt" {
|
||||
t.Fatalf("upload name mismatch: got %s", uploadResp.Name)
|
||||
}
|
||||
if uploadResp.Size == 0 {
|
||||
t.Fatalf("upload size is zero")
|
||||
}
|
||||
|
||||
// Test pinning the uploaded content
|
||||
pinBody := fmt.Sprintf(`{"cid":"%s","name":"test-pinned"}`, uploadResp.Cid)
|
||||
req2, _ := http.NewRequest(http.MethodPost, base+"/v1/storage/pin", strings.NewReader(pinBody))
|
||||
req2.Header = authHeader(key)
|
||||
resp2, err := httpClient().Do(req2)
|
||||
if err != nil {
|
||||
t.Fatalf("pin do: %v", err)
|
||||
}
|
||||
defer resp2.Body.Close()
|
||||
if resp2.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp2.Body)
|
||||
t.Fatalf("pin status: %d, body: %s", resp2.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// Test getting pin status
|
||||
req3, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/status/"+uploadResp.Cid, nil)
|
||||
req3.Header = authHeader(key)
|
||||
resp3, err := httpClient().Do(req3)
|
||||
if err != nil {
|
||||
t.Fatalf("status do: %v", err)
|
||||
}
|
||||
defer resp3.Body.Close()
|
||||
if resp3.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp3.Body)
|
||||
t.Fatalf("status status: %d, body: %s", resp3.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var statusResp struct {
|
||||
Cid string `json:"cid"`
|
||||
Status string `json:"status"`
|
||||
ReplicationFactor int `json:"replication_factor"`
|
||||
Peers []string `json:"peers"`
|
||||
}
|
||||
if err := json.NewDecoder(resp3.Body).Decode(&statusResp); err != nil {
|
||||
t.Fatalf("status decode: %v", err)
|
||||
}
|
||||
if statusResp.Cid != uploadResp.Cid {
|
||||
t.Fatalf("status CID mismatch: got %s", statusResp.Cid)
|
||||
}
|
||||
|
||||
// Test retrieving content
|
||||
req4, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/get/"+uploadResp.Cid, nil)
|
||||
req4.Header = authHeader(key)
|
||||
resp4, err := httpClient().Do(req4)
|
||||
if err != nil {
|
||||
t.Fatalf("get do: %v", err)
|
||||
}
|
||||
defer resp4.Body.Close()
|
||||
if resp4.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp4.Body)
|
||||
t.Fatalf("get status: %d, body: %s", resp4.StatusCode, string(body))
|
||||
}
|
||||
|
||||
retrieved, err := io.ReadAll(resp4.Body)
|
||||
if err != nil {
|
||||
t.Fatalf("get read: %v", err)
|
||||
}
|
||||
if string(retrieved) != string(content) {
|
||||
t.Fatalf("retrieved content mismatch: got %q", string(retrieved))
|
||||
}
|
||||
|
||||
// Test unpinning
|
||||
req5, _ := http.NewRequest(http.MethodDelete, base+"/v1/storage/unpin/"+uploadResp.Cid, nil)
|
||||
req5.Header = authHeader(key)
|
||||
resp5, err := httpClient().Do(req5)
|
||||
if err != nil {
|
||||
t.Fatalf("unpin do: %v", err)
|
||||
}
|
||||
defer resp5.Body.Close()
|
||||
if resp5.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp5.Body)
|
||||
t.Fatalf("unpin status: %d, body: %s", resp5.StatusCode, string(body))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGateway_Storage_UploadJSON(t *testing.T) {
|
||||
key := requireAPIKey(t)
|
||||
base := gatewayBaseURL()
|
||||
|
||||
// Test JSON upload with base64 data
|
||||
content := []byte("test json upload content")
|
||||
b64 := base64.StdEncoding.EncodeToString(content)
|
||||
body := fmt.Sprintf(`{"name":"test.json","data":"%s"}`, b64)
|
||||
|
||||
req, _ := http.NewRequest(http.MethodPost, base+"/v1/storage/upload", strings.NewReader(body))
|
||||
req.Header = authHeader(key)
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("upload json do: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusServiceUnavailable {
|
||||
t.Skip("IPFS storage not available; skipping storage tests")
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
t.Fatalf("upload json status: %d, body: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var uploadResp struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&uploadResp); err != nil {
|
||||
t.Fatalf("upload json decode: %v", err)
|
||||
}
|
||||
if uploadResp.Cid == "" {
|
||||
t.Fatalf("upload json returned empty CID")
|
||||
}
|
||||
if uploadResp.Name != "test.json" {
|
||||
t.Fatalf("upload json name mismatch: got %s", uploadResp.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGateway_Storage_InvalidCID(t *testing.T) {
|
||||
key := requireAPIKey(t)
|
||||
base := gatewayBaseURL()
|
||||
|
||||
// Test status with invalid CID
|
||||
req, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/status/QmInvalidCID123", nil)
|
||||
req.Header = authHeader(key)
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil {
|
||||
t.Fatalf("status invalid do: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusServiceUnavailable {
|
||||
t.Skip("IPFS storage not available; skipping storage tests")
|
||||
}
|
||||
|
||||
// Should return error but not crash
|
||||
if resp.StatusCode != http.StatusNotFound && resp.StatusCode != http.StatusInternalServerError {
|
||||
t.Fatalf("expected error status for invalid CID, got %d", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func toWSURL(httpURL string) string {
|
||||
u, err := url.Parse(httpURL)
|
||||
if err != nil {
|
||||
|
||||
@ -1086,26 +1086,15 @@ func installOlric() {
|
||||
if err := os.MkdirAll(olricConfigDir, 0755); err == nil {
|
||||
configPath := olricConfigDir + "/config.yaml"
|
||||
if _, err := os.Stat(configPath); os.IsNotExist(err) {
|
||||
configContent := `memberlist:
|
||||
bind-addr: "0.0.0.0"
|
||||
bind-port: 3322
|
||||
client:
|
||||
bind-addr: "0.0.0.0"
|
||||
bind-port: 3320
|
||||
configContent := `server:
|
||||
bindAddr: "127.0.0.1"
|
||||
bindPort: 3320
|
||||
|
||||
# Durability and replication configuration
|
||||
# Replicates data across entire network for fault tolerance
|
||||
dmaps:
|
||||
default:
|
||||
replication:
|
||||
mode: sync # Synchronous replication for durability
|
||||
replica_count: 2 # Replicate to 2 backup nodes (3 total copies: 1 primary + 2 backups)
|
||||
write_quorum: 2 # Require 2 nodes to acknowledge writes
|
||||
read_quorum: 1 # Read from 1 node (faster reads)
|
||||
read_repair: true # Enable read-repair for consistency
|
||||
memberlist:
|
||||
environment: local
|
||||
bindAddr: "127.0.0.1"
|
||||
bindPort: 3322
|
||||
|
||||
# Split-brain protection
|
||||
member_count_quorum: 2 # Require at least 2 nodes to operate (prevents split-brain)
|
||||
`
|
||||
if err := os.WriteFile(configPath, []byte(configContent), 0644); err == nil {
|
||||
exec.Command("chown", "debros:debros", configPath).Run()
|
||||
@ -1532,6 +1521,17 @@ database:
|
||||
cluster_sync_interval: "30s"
|
||||
peer_inactivity_limit: "24h"
|
||||
min_cluster_size: 1
|
||||
ipfs:
|
||||
# IPFS Cluster API endpoint for pin management (leave empty to disable)
|
||||
cluster_api_url: "http://localhost:9094"
|
||||
# IPFS HTTP API endpoint for content retrieval
|
||||
api_url: "http://localhost:5001"
|
||||
# Timeout for IPFS operations
|
||||
timeout: "60s"
|
||||
# Replication factor for pinned content
|
||||
replication_factor: 3
|
||||
# Enable client-side encryption before upload
|
||||
enable_encryption: true
|
||||
|
||||
discovery:
|
||||
bootstrap_peers: []
|
||||
@ -1607,6 +1607,17 @@ database:
|
||||
cluster_sync_interval: "30s"
|
||||
peer_inactivity_limit: "24h"
|
||||
min_cluster_size: 1
|
||||
ipfs:
|
||||
# IPFS Cluster API endpoint for pin management (leave empty to disable)
|
||||
cluster_api_url: "http://localhost:9094"
|
||||
# IPFS HTTP API endpoint for content retrieval
|
||||
api_url: "http://localhost:5001"
|
||||
# Timeout for IPFS operations
|
||||
timeout: "60s"
|
||||
# Replication factor for pinned content
|
||||
replication_factor: 3
|
||||
# Enable client-side encryption before upload
|
||||
enable_encryption: true
|
||||
|
||||
discovery:
|
||||
%s
|
||||
@ -1670,13 +1681,23 @@ func generateGatewayConfigDirect(bootstrapPeers string, enableHTTPS bool, domain
|
||||
olricYAML.WriteString(" - \"localhost:3320\"\n")
|
||||
}
|
||||
|
||||
// IPFS Cluster configuration (defaults - can be customized later)
|
||||
ipfsYAML := `# IPFS Cluster configuration (optional)
|
||||
# Uncomment and configure if you have IPFS Cluster running:
|
||||
# ipfs_cluster_api_url: "http://localhost:9094"
|
||||
# ipfs_api_url: "http://localhost:5001"
|
||||
# ipfs_timeout: "60s"
|
||||
# ipfs_replication_factor: 3
|
||||
`
|
||||
|
||||
return fmt.Sprintf(`listen_addr: ":6001"
|
||||
client_namespace: "default"
|
||||
rqlite_dsn: ""
|
||||
%s
|
||||
%s
|
||||
%s
|
||||
`, peersYAML.String(), httpsYAML.String(), olricYAML.String())
|
||||
%s
|
||||
`, peersYAML.String(), httpsYAML.String(), olricYAML.String(), ipfsYAML)
|
||||
}
|
||||
|
||||
// generateOlricConfig generates an Olric configuration file
|
||||
@ -1689,30 +1710,15 @@ func generateOlricConfig(configPath, bindIP string, httpPort, memberlistPort int
|
||||
}
|
||||
|
||||
var config strings.Builder
|
||||
config.WriteString("server:\n")
|
||||
config.WriteString(fmt.Sprintf(" bindAddr: \"%s\"\n", bindIP))
|
||||
config.WriteString(fmt.Sprintf(" bindPort: %d\n", httpPort))
|
||||
config.WriteString("\n")
|
||||
config.WriteString("memberlist:\n")
|
||||
config.WriteString(fmt.Sprintf(" bind-addr: \"%s\"\n", bindIP))
|
||||
config.WriteString(fmt.Sprintf(" bind-port: %d\n", memberlistPort))
|
||||
config.WriteString(" # Multicast discovery enabled - peers discovered dynamically via LibP2P network\n")
|
||||
|
||||
config.WriteString("client:\n")
|
||||
config.WriteString(fmt.Sprintf(" bind-addr: \"%s\"\n", bindIP))
|
||||
config.WriteString(fmt.Sprintf(" bind-port: %d\n", httpPort))
|
||||
|
||||
// Durability and replication settings
|
||||
config.WriteString("\n# Durability and replication configuration\n")
|
||||
config.WriteString("# Replicates data across entire network for fault tolerance\n")
|
||||
config.WriteString("dmaps:\n")
|
||||
config.WriteString(" default:\n")
|
||||
config.WriteString(" replication:\n")
|
||||
config.WriteString(" mode: sync # Synchronous replication for durability\n")
|
||||
config.WriteString(" replica_count: 2 # Replicate to 2 backup nodes (3 total copies: 1 primary + 2 backups)\n")
|
||||
config.WriteString(" write_quorum: 2 # Require 2 nodes to acknowledge writes\n")
|
||||
config.WriteString(" read_quorum: 1 # Read from 1 node (faster reads)\n")
|
||||
config.WriteString(" read_repair: true # Enable read-repair for consistency\n")
|
||||
|
||||
// Split-brain protection
|
||||
config.WriteString("\n# Split-brain protection\n")
|
||||
config.WriteString("member_count_quorum: 2 # Require at least 2 nodes to operate (prevents split-brain)\n")
|
||||
config.WriteString(" environment: local\n")
|
||||
config.WriteString(fmt.Sprintf(" bindAddr: \"%s\"\n", bindIP))
|
||||
config.WriteString(fmt.Sprintf(" bindPort: %d\n", memberlistPort))
|
||||
config.WriteString("\n")
|
||||
|
||||
// Write config file
|
||||
if err := os.WriteFile(configPath, []byte(config.String()), 0644); err != nil {
|
||||
|
||||
@ -35,6 +35,7 @@ type Client struct {
|
||||
database *DatabaseClientImpl
|
||||
network *NetworkInfoImpl
|
||||
pubsub *pubSubBridge
|
||||
storage *StorageClientImpl
|
||||
|
||||
// State
|
||||
connected bool
|
||||
@ -70,6 +71,7 @@ func NewClient(config *ClientConfig) (NetworkClient, error) {
|
||||
// Initialize components (will be configured when connected)
|
||||
client.database = &DatabaseClientImpl{client: client}
|
||||
client.network = &NetworkInfoImpl{client: client}
|
||||
client.storage = &StorageClientImpl{client: client}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
@ -89,6 +91,11 @@ func (c *Client) Network() NetworkInfo {
|
||||
return c.network
|
||||
}
|
||||
|
||||
// Storage returns the storage client
|
||||
func (c *Client) Storage() StorageClient {
|
||||
return c.storage
|
||||
}
|
||||
|
||||
// Config returns a snapshot copy of the client's configuration
|
||||
func (c *Client) Config() *ClientConfig {
|
||||
c.mu.RLock()
|
||||
|
||||
@ -3,6 +3,7 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -17,6 +18,9 @@ type NetworkClient interface {
|
||||
// Network information
|
||||
Network() NetworkInfo
|
||||
|
||||
// Storage operations (IPFS)
|
||||
Storage() StorageClient
|
||||
|
||||
// Lifecycle
|
||||
Connect() error
|
||||
Disconnect() error
|
||||
@ -51,6 +55,24 @@ type NetworkInfo interface {
|
||||
DisconnectFromPeer(ctx context.Context, peerID string) error
|
||||
}
|
||||
|
||||
// StorageClient provides IPFS storage operations
|
||||
type StorageClient interface {
|
||||
// Upload uploads content to IPFS and pins it
|
||||
Upload(ctx context.Context, reader io.Reader, name string) (*StorageUploadResult, error)
|
||||
|
||||
// Pin pins an existing CID
|
||||
Pin(ctx context.Context, cid string, name string) (*StoragePinResult, error)
|
||||
|
||||
// Status gets the pin status for a CID
|
||||
Status(ctx context.Context, cid string) (*StorageStatus, error)
|
||||
|
||||
// Get retrieves content from IPFS by CID
|
||||
Get(ctx context.Context, cid string) (io.ReadCloser, error)
|
||||
|
||||
// Unpin removes a pin from a CID
|
||||
Unpin(ctx context.Context, cid string) error
|
||||
}
|
||||
|
||||
// MessageHandler is called when a pub/sub message is received
|
||||
type MessageHandler func(topic string, data []byte) error
|
||||
|
||||
@ -107,12 +129,38 @@ type HealthStatus struct {
|
||||
ResponseTime time.Duration `json:"response_time"`
|
||||
}
|
||||
|
||||
// StorageUploadResult represents the result of uploading content to IPFS
|
||||
type StorageUploadResult struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
// StoragePinResult represents the result of pinning a CID
|
||||
type StoragePinResult struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// StorageStatus represents the status of a pinned CID
|
||||
type StorageStatus struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"` // "pinned", "pinning", "queued", "unpinned", "error"
|
||||
ReplicationMin int `json:"replication_min"`
|
||||
ReplicationMax int `json:"replication_max"`
|
||||
ReplicationFactor int `json:"replication_factor"`
|
||||
Peers []string `json:"peers"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// ClientConfig represents configuration for network clients
|
||||
type ClientConfig struct {
|
||||
AppName string `json:"app_name"`
|
||||
DatabaseName string `json:"database_name"`
|
||||
BootstrapPeers []string `json:"bootstrap_peers"`
|
||||
DatabaseEndpoints []string `json:"database_endpoints"`
|
||||
GatewayURL string `json:"gateway_url"` // Gateway URL for HTTP API access (e.g., "http://localhost:6001")
|
||||
ConnectTimeout time.Duration `json:"connect_timeout"`
|
||||
RetryAttempts int `json:"retry_attempts"`
|
||||
RetryDelay time.Duration `json:"retry_delay"`
|
||||
@ -132,6 +180,7 @@ func DefaultClientConfig(appName string) *ClientConfig {
|
||||
DatabaseName: fmt.Sprintf("%s_db", appName),
|
||||
BootstrapPeers: peers,
|
||||
DatabaseEndpoints: endpoints,
|
||||
GatewayURL: "http://localhost:6001",
|
||||
ConnectTimeout: time.Second * 30,
|
||||
RetryAttempts: 3,
|
||||
RetryDelay: time.Second * 5,
|
||||
|
||||
245
pkg/client/storage_client.go
Normal file
245
pkg/client/storage_client.go
Normal file
@ -0,0 +1,245 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// StorageClientImpl implements StorageClient using HTTP requests to the gateway
|
||||
type StorageClientImpl struct {
|
||||
client *Client
|
||||
}
|
||||
|
||||
// Upload uploads content to IPFS and pins it
|
||||
func (s *StorageClientImpl) Upload(ctx context.Context, reader io.Reader, name string) (*StorageUploadResult, error) {
|
||||
if err := s.client.requireAccess(ctx); err != nil {
|
||||
return nil, fmt.Errorf("authentication required: %w", err)
|
||||
}
|
||||
|
||||
gatewayURL := s.getGatewayURL()
|
||||
|
||||
// Create multipart form
|
||||
var buf bytes.Buffer
|
||||
writer := multipart.NewWriter(&buf)
|
||||
|
||||
// Add file field
|
||||
part, err := writer.CreateFormFile("file", name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create form file: %w", err)
|
||||
}
|
||||
|
||||
if _, err := io.Copy(part, reader); err != nil {
|
||||
return nil, fmt.Errorf("failed to copy data: %w", err)
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return nil, fmt.Errorf("failed to close writer: %w", err)
|
||||
}
|
||||
|
||||
// Create request
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", gatewayURL+"/v1/storage/upload", &buf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", writer.FormDataContentType())
|
||||
s.addAuthHeaders(req)
|
||||
|
||||
// Execute request
|
||||
client := &http.Client{Timeout: 5 * time.Minute} // Large timeout for file uploads
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result StorageUploadResult
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// Pin pins an existing CID
|
||||
func (s *StorageClientImpl) Pin(ctx context.Context, cid string, name string) (*StoragePinResult, error) {
|
||||
if err := s.client.requireAccess(ctx); err != nil {
|
||||
return nil, fmt.Errorf("authentication required: %w", err)
|
||||
}
|
||||
|
||||
gatewayURL := s.getGatewayURL()
|
||||
|
||||
reqBody := map[string]interface{}{
|
||||
"cid": cid,
|
||||
}
|
||||
if name != "" {
|
||||
reqBody["name"] = name
|
||||
}
|
||||
|
||||
jsonBody, err := json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", gatewayURL+"/v1/storage/pin", bytes.NewReader(jsonBody))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
s.addAuthHeaders(req)
|
||||
|
||||
client := &http.Client{Timeout: 60 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("pin failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result StoragePinResult
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// Status gets the pin status for a CID
|
||||
func (s *StorageClientImpl) Status(ctx context.Context, cid string) (*StorageStatus, error) {
|
||||
if err := s.client.requireAccess(ctx); err != nil {
|
||||
return nil, fmt.Errorf("authentication required: %w", err)
|
||||
}
|
||||
|
||||
gatewayURL := s.getGatewayURL()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", gatewayURL+"/v1/storage/status/"+cid, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
s.addAuthHeaders(req)
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("status failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result StorageStatus
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// Get retrieves content from IPFS by CID
|
||||
func (s *StorageClientImpl) Get(ctx context.Context, cid string) (io.ReadCloser, error) {
|
||||
if err := s.client.requireAccess(ctx); err != nil {
|
||||
return nil, fmt.Errorf("authentication required: %w", err)
|
||||
}
|
||||
|
||||
gatewayURL := s.getGatewayURL()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", gatewayURL+"/v1/storage/get/"+cid, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
s.addAuthHeaders(req)
|
||||
|
||||
client := &http.Client{Timeout: 5 * time.Minute} // Large timeout for file downloads
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("get failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// Unpin removes a pin from a CID
|
||||
func (s *StorageClientImpl) Unpin(ctx context.Context, cid string) error {
|
||||
if err := s.client.requireAccess(ctx); err != nil {
|
||||
return fmt.Errorf("authentication required: %w", err)
|
||||
}
|
||||
|
||||
gatewayURL := s.getGatewayURL()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "DELETE", gatewayURL+"/v1/storage/unpin/"+cid, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
s.addAuthHeaders(req)
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("unpin failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getGatewayURL returns the gateway URL from config, defaulting to localhost:6001
|
||||
func (s *StorageClientImpl) getGatewayURL() string {
|
||||
cfg := s.client.Config()
|
||||
if cfg != nil && cfg.GatewayURL != "" {
|
||||
return strings.TrimSuffix(cfg.GatewayURL, "/")
|
||||
}
|
||||
return "http://localhost:6001"
|
||||
}
|
||||
|
||||
// addAuthHeaders adds authentication headers to the request
|
||||
func (s *StorageClientImpl) addAuthHeaders(req *http.Request) {
|
||||
cfg := s.client.Config()
|
||||
if cfg == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Prefer JWT if available
|
||||
if cfg.JWT != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+cfg.JWT)
|
||||
return
|
||||
}
|
||||
|
||||
// Fallback to API key
|
||||
if cfg.APIKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+cfg.APIKey)
|
||||
req.Header.Set("X-API-Key", cfg.APIKey)
|
||||
}
|
||||
}
|
||||
378
pkg/client/storage_client_test.go
Normal file
378
pkg/client/storage_client_test.go
Normal file
@ -0,0 +1,378 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestStorageClientImpl_Upload(t *testing.T) {
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmUpload123"
|
||||
expectedName := "test.txt"
|
||||
expectedSize := int64(100)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/storage/upload" {
|
||||
t.Errorf("Expected path '/v1/storage/upload', got %s", r.URL.Path)
|
||||
}
|
||||
|
||||
// Verify multipart form
|
||||
if err := r.ParseMultipartForm(32 << 20); err != nil {
|
||||
t.Errorf("Failed to parse multipart form: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
file, header, err := r.FormFile("file")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get file: %v", err)
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
if header.Filename != expectedName {
|
||||
t.Errorf("Expected filename %s, got %s", expectedName, header.Filename)
|
||||
}
|
||||
|
||||
response := StorageUploadResult{
|
||||
Cid: expectedCID,
|
||||
Name: expectedName,
|
||||
Size: expectedSize,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &ClientConfig{
|
||||
GatewayURL: server.URL,
|
||||
AppName: "test-app",
|
||||
APIKey: "ak_test:test-app", // Required for requireAccess check
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
reader := strings.NewReader("test content")
|
||||
result, err := storage.Upload(context.Background(), reader, expectedName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to upload: %v", err)
|
||||
}
|
||||
|
||||
if result.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, result.Cid)
|
||||
}
|
||||
if result.Name != expectedName {
|
||||
t.Errorf("Expected name %s, got %s", expectedName, result.Name)
|
||||
}
|
||||
if result.Size != expectedSize {
|
||||
t.Errorf("Expected size %d, got %d", expectedSize, result.Size)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("server_error", func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte("internal error"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &ClientConfig{
|
||||
GatewayURL: server.URL,
|
||||
AppName: "test-app",
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
reader := strings.NewReader("test")
|
||||
_, err := storage.Upload(context.Background(), reader, "test.txt")
|
||||
if err == nil {
|
||||
t.Error("Expected error for server error")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("missing_credentials", func(t *testing.T) {
|
||||
cfg := &ClientConfig{
|
||||
GatewayURL: "http://localhost:6001",
|
||||
// No AppName, JWT, or APIKey
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
reader := strings.NewReader("test")
|
||||
_, err := storage.Upload(context.Background(), reader, "test.txt")
|
||||
if err == nil {
|
||||
t.Error("Expected error for missing credentials")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageClientImpl_Pin(t *testing.T) {
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmPin123"
|
||||
expectedName := "pinned-file"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/storage/pin" {
|
||||
t.Errorf("Expected path '/v1/storage/pin', got %s", r.URL.Path)
|
||||
}
|
||||
|
||||
var reqBody map[string]interface{}
|
||||
if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
|
||||
t.Errorf("Failed to decode request: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if reqBody["cid"] != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %v", expectedCID, reqBody["cid"])
|
||||
}
|
||||
|
||||
response := StoragePinResult{
|
||||
Cid: expectedCID,
|
||||
Name: expectedName,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &ClientConfig{
|
||||
GatewayURL: server.URL,
|
||||
AppName: "test-app",
|
||||
APIKey: "ak_test:test-app", // Required for requireAccess check
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
result, err := storage.Pin(context.Background(), expectedCID, expectedName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to pin: %v", err)
|
||||
}
|
||||
|
||||
if result.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, result.Cid)
|
||||
}
|
||||
if result.Name != expectedName {
|
||||
t.Errorf("Expected name %s, got %s", expectedName, result.Name)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageClientImpl_Status(t *testing.T) {
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmStatus123"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !strings.HasPrefix(r.URL.Path, "/v1/storage/status/") {
|
||||
t.Errorf("Expected path '/v1/storage/status/', got %s", r.URL.Path)
|
||||
}
|
||||
|
||||
response := StorageStatus{
|
||||
Cid: expectedCID,
|
||||
Name: "test-file",
|
||||
Status: "pinned",
|
||||
ReplicationMin: 3,
|
||||
ReplicationMax: 3,
|
||||
ReplicationFactor: 3,
|
||||
Peers: []string{"peer1", "peer2", "peer3"},
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &ClientConfig{
|
||||
GatewayURL: server.URL,
|
||||
AppName: "test-app",
|
||||
APIKey: "ak_test:test-app", // Required for requireAccess check
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
status, err := storage.Status(context.Background(), expectedCID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get status: %v", err)
|
||||
}
|
||||
|
||||
if status.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, status.Cid)
|
||||
}
|
||||
if status.Status != "pinned" {
|
||||
t.Errorf("Expected status 'pinned', got %s", status.Status)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageClientImpl_Get(t *testing.T) {
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmGet123"
|
||||
expectedContent := "test content"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !strings.HasPrefix(r.URL.Path, "/v1/storage/get/") {
|
||||
t.Errorf("Expected path '/v1/storage/get/', got %s", r.URL.Path)
|
||||
}
|
||||
w.Write([]byte(expectedContent))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &ClientConfig{
|
||||
GatewayURL: server.URL,
|
||||
AppName: "test-app",
|
||||
APIKey: "ak_test:test-app", // Required for requireAccess check
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
reader, err := storage.Get(context.Background(), expectedCID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get content: %v", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read content: %v", err)
|
||||
}
|
||||
|
||||
if string(data) != expectedContent {
|
||||
t.Errorf("Expected content %s, got %s", expectedContent, string(data))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageClientImpl_Unpin(t *testing.T) {
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmUnpin123"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !strings.HasPrefix(r.URL.Path, "/v1/storage/unpin/") {
|
||||
t.Errorf("Expected path '/v1/storage/unpin/', got %s", r.URL.Path)
|
||||
}
|
||||
if r.Method != "DELETE" {
|
||||
t.Errorf("Expected method DELETE, got %s", r.Method)
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := &ClientConfig{
|
||||
GatewayURL: server.URL,
|
||||
AppName: "test-app",
|
||||
APIKey: "ak_test:test-app", // Required for requireAccess check
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
err := storage.Unpin(context.Background(), expectedCID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to unpin: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageClientImpl_getGatewayURL(t *testing.T) {
|
||||
storage := &StorageClientImpl{}
|
||||
|
||||
t.Run("from_config", func(t *testing.T) {
|
||||
cfg := &ClientConfig{GatewayURL: "http://custom:6001"}
|
||||
client := &Client{config: cfg}
|
||||
storage.client = client
|
||||
|
||||
url := storage.getGatewayURL()
|
||||
if url != "http://custom:6001" {
|
||||
t.Errorf("Expected 'http://custom:6001', got %s", url)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("default", func(t *testing.T) {
|
||||
cfg := &ClientConfig{}
|
||||
client := &Client{config: cfg}
|
||||
storage.client = client
|
||||
|
||||
url := storage.getGatewayURL()
|
||||
if url != "http://localhost:6001" {
|
||||
t.Errorf("Expected 'http://localhost:6001', got %s", url)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("nil_config", func(t *testing.T) {
|
||||
client := &Client{config: nil}
|
||||
storage.client = client
|
||||
|
||||
url := storage.getGatewayURL()
|
||||
if url != "http://localhost:6001" {
|
||||
t.Errorf("Expected 'http://localhost:6001', got %s", url)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageClientImpl_addAuthHeaders(t *testing.T) {
|
||||
t.Run("jwt_preferred", func(t *testing.T) {
|
||||
cfg := &ClientConfig{
|
||||
JWT: "test-jwt-token",
|
||||
APIKey: "test-api-key",
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
req := httptest.NewRequest("POST", "/test", nil)
|
||||
storage.addAuthHeaders(req)
|
||||
|
||||
auth := req.Header.Get("Authorization")
|
||||
if auth != "Bearer test-jwt-token" {
|
||||
t.Errorf("Expected JWT in Authorization header, got %s", auth)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("apikey_fallback", func(t *testing.T) {
|
||||
cfg := &ClientConfig{
|
||||
APIKey: "test-api-key",
|
||||
}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
req := httptest.NewRequest("POST", "/test", nil)
|
||||
storage.addAuthHeaders(req)
|
||||
|
||||
auth := req.Header.Get("Authorization")
|
||||
if auth != "Bearer test-api-key" {
|
||||
t.Errorf("Expected API key in Authorization header, got %s", auth)
|
||||
}
|
||||
|
||||
apiKey := req.Header.Get("X-API-Key")
|
||||
if apiKey != "test-api-key" {
|
||||
t.Errorf("Expected API key in X-API-Key header, got %s", apiKey)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("no_auth", func(t *testing.T) {
|
||||
cfg := &ClientConfig{}
|
||||
client := &Client{config: cfg}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
req := httptest.NewRequest("POST", "/test", nil)
|
||||
storage.addAuthHeaders(req)
|
||||
|
||||
auth := req.Header.Get("Authorization")
|
||||
if auth != "" {
|
||||
t.Errorf("Expected no Authorization header, got %s", auth)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("nil_config", func(t *testing.T) {
|
||||
client := &Client{config: nil}
|
||||
storage := &StorageClientImpl{client: client}
|
||||
|
||||
req := httptest.NewRequest("POST", "/test", nil)
|
||||
storage.addAuthHeaders(req)
|
||||
|
||||
auth := req.Header.Get("Authorization")
|
||||
if auth != "" {
|
||||
t.Errorf("Expected no Authorization header, got %s", auth)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -36,7 +36,7 @@ type DatabaseConfig struct {
|
||||
RQLitePort int `yaml:"rqlite_port"` // RQLite HTTP API port
|
||||
RQLiteRaftPort int `yaml:"rqlite_raft_port"` // RQLite Raft consensus port
|
||||
RQLiteJoinAddress string `yaml:"rqlite_join_address"` // Address to join RQLite cluster
|
||||
|
||||
|
||||
// Dynamic discovery configuration (always enabled)
|
||||
ClusterSyncInterval time.Duration `yaml:"cluster_sync_interval"` // default: 30s
|
||||
PeerInactivityLimit time.Duration `yaml:"peer_inactivity_limit"` // default: 24h
|
||||
@ -45,6 +45,32 @@ type DatabaseConfig struct {
|
||||
// Olric cache configuration
|
||||
OlricHTTPPort int `yaml:"olric_http_port"` // Olric HTTP API port (default: 3320)
|
||||
OlricMemberlistPort int `yaml:"olric_memberlist_port"` // Olric memberlist port (default: 3322)
|
||||
|
||||
// IPFS storage configuration
|
||||
IPFS IPFSConfig `yaml:"ipfs"`
|
||||
}
|
||||
|
||||
// IPFSConfig contains IPFS storage configuration
|
||||
type IPFSConfig struct {
|
||||
// ClusterAPIURL is the IPFS Cluster HTTP API URL (e.g., "http://localhost:9094")
|
||||
// If empty, IPFS storage is disabled for this node
|
||||
ClusterAPIURL string `yaml:"cluster_api_url"`
|
||||
|
||||
// APIURL is the IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001")
|
||||
// If empty, defaults to "http://localhost:5001"
|
||||
APIURL string `yaml:"api_url"`
|
||||
|
||||
// Timeout for IPFS operations
|
||||
// If zero, defaults to 60 seconds
|
||||
Timeout time.Duration `yaml:"timeout"`
|
||||
|
||||
// ReplicationFactor is the replication factor for pinned content
|
||||
// If zero, defaults to 3
|
||||
ReplicationFactor int `yaml:"replication_factor"`
|
||||
|
||||
// EnableEncryption enables client-side encryption before upload
|
||||
// Defaults to true
|
||||
EnableEncryption bool `yaml:"enable_encryption"`
|
||||
}
|
||||
|
||||
// DiscoveryConfig contains peer discovery configuration
|
||||
@ -115,7 +141,7 @@ func DefaultConfig() *Config {
|
||||
RQLitePort: 5001,
|
||||
RQLiteRaftPort: 7001,
|
||||
RQLiteJoinAddress: "", // Empty for bootstrap node
|
||||
|
||||
|
||||
// Dynamic discovery (always enabled)
|
||||
ClusterSyncInterval: 30 * time.Second,
|
||||
PeerInactivityLimit: 24 * time.Hour,
|
||||
@ -124,6 +150,15 @@ func DefaultConfig() *Config {
|
||||
// Olric cache configuration
|
||||
OlricHTTPPort: 3320,
|
||||
OlricMemberlistPort: 3322,
|
||||
|
||||
// IPFS storage configuration
|
||||
IPFS: IPFSConfig{
|
||||
ClusterAPIURL: "", // Empty = disabled
|
||||
APIURL: "http://localhost:5001",
|
||||
Timeout: 60 * time.Second,
|
||||
ReplicationFactor: 3,
|
||||
EnableEncryption: true,
|
||||
},
|
||||
},
|
||||
Discovery: DiscoveryConfig{
|
||||
BootstrapPeers: []string{},
|
||||
|
||||
@ -6,11 +6,16 @@ import (
|
||||
"crypto/rsa"
|
||||
"database/sql"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/config"
|
||||
"github.com/DeBrosOfficial/network/pkg/ipfs"
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"github.com/DeBrosOfficial/network/pkg/olric"
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
@ -38,6 +43,13 @@ type Config struct {
|
||||
// Olric cache configuration
|
||||
OlricServers []string // List of Olric server addresses (e.g., ["localhost:3320"]). If empty, defaults to ["localhost:3320"]
|
||||
OlricTimeout time.Duration // Timeout for Olric operations (default: 10s)
|
||||
|
||||
// IPFS Cluster configuration
|
||||
IPFSClusterAPIURL string // IPFS Cluster HTTP API URL (e.g., "http://localhost:9094"). If empty, gateway will discover from node configs
|
||||
IPFSAPIURL string // IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001"). If empty, gateway will discover from node configs
|
||||
IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s)
|
||||
IPFSReplicationFactor int // Replication factor for pins (default: 3)
|
||||
IPFSEnableEncryption bool // Enable client-side encryption before upload (default: true, discovered from node configs)
|
||||
}
|
||||
|
||||
type Gateway struct {
|
||||
@ -56,6 +68,9 @@ type Gateway struct {
|
||||
// Olric cache client
|
||||
olricClient *olric.Client
|
||||
|
||||
// IPFS storage client
|
||||
ipfsClient ipfs.IPFSClient
|
||||
|
||||
// Local pub/sub bypass for same-gateway subscribers
|
||||
localSubscribers map[string][]*localSubscriber // topic+namespace -> subscribers
|
||||
mu sync.RWMutex
|
||||
@ -178,6 +193,80 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
|
||||
)
|
||||
}
|
||||
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "Initializing IPFS Cluster client...")
|
||||
|
||||
// Discover IPFS endpoints from node configs if not explicitly configured
|
||||
ipfsClusterURL := cfg.IPFSClusterAPIURL
|
||||
ipfsAPIURL := cfg.IPFSAPIURL
|
||||
ipfsTimeout := cfg.IPFSTimeout
|
||||
ipfsReplicationFactor := cfg.IPFSReplicationFactor
|
||||
ipfsEnableEncryption := cfg.IPFSEnableEncryption
|
||||
|
||||
if ipfsClusterURL == "" {
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "IPFS Cluster URL not configured, discovering from node configs...")
|
||||
discovered := discoverIPFSFromNodeConfigs(logger.Logger)
|
||||
if discovered.clusterURL != "" {
|
||||
ipfsClusterURL = discovered.clusterURL
|
||||
ipfsAPIURL = discovered.apiURL
|
||||
if discovered.timeout > 0 {
|
||||
ipfsTimeout = discovered.timeout
|
||||
}
|
||||
if discovered.replicationFactor > 0 {
|
||||
ipfsReplicationFactor = discovered.replicationFactor
|
||||
}
|
||||
ipfsEnableEncryption = discovered.enableEncryption
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "Discovered IPFS endpoints from node configs",
|
||||
zap.String("cluster_url", ipfsClusterURL),
|
||||
zap.String("api_url", ipfsAPIURL),
|
||||
zap.Bool("encryption_enabled", ipfsEnableEncryption))
|
||||
} else {
|
||||
// Fallback to localhost defaults
|
||||
ipfsClusterURL = "http://localhost:9094"
|
||||
ipfsAPIURL = "http://localhost:5001"
|
||||
ipfsEnableEncryption = true // Default to true
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "No IPFS config found in node configs, using localhost defaults")
|
||||
}
|
||||
}
|
||||
|
||||
if ipfsAPIURL == "" {
|
||||
ipfsAPIURL = "http://localhost:5001"
|
||||
}
|
||||
if ipfsTimeout == 0 {
|
||||
ipfsTimeout = 60 * time.Second
|
||||
}
|
||||
if ipfsReplicationFactor == 0 {
|
||||
ipfsReplicationFactor = 3
|
||||
}
|
||||
if !cfg.IPFSEnableEncryption && !ipfsEnableEncryption {
|
||||
// Only disable if explicitly set to false in both places
|
||||
ipfsEnableEncryption = false
|
||||
} else {
|
||||
// Default to true if not explicitly disabled
|
||||
ipfsEnableEncryption = true
|
||||
}
|
||||
|
||||
ipfsCfg := ipfs.Config{
|
||||
ClusterAPIURL: ipfsClusterURL,
|
||||
Timeout: ipfsTimeout,
|
||||
}
|
||||
ipfsClient, ipfsErr := ipfs.NewClient(ipfsCfg, logger.Logger)
|
||||
if ipfsErr != nil {
|
||||
logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize IPFS Cluster client; storage endpoints disabled", zap.Error(ipfsErr))
|
||||
} else {
|
||||
gw.ipfsClient = ipfsClient
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "IPFS Cluster client ready",
|
||||
zap.String("cluster_api_url", ipfsCfg.ClusterAPIURL),
|
||||
zap.String("ipfs_api_url", ipfsAPIURL),
|
||||
zap.Duration("timeout", ipfsCfg.Timeout),
|
||||
zap.Int("replication_factor", ipfsReplicationFactor),
|
||||
zap.Bool("encryption_enabled", ipfsEnableEncryption),
|
||||
)
|
||||
}
|
||||
// Store IPFS settings in gateway for use by handlers
|
||||
gw.cfg.IPFSAPIURL = ipfsAPIURL
|
||||
gw.cfg.IPFSReplicationFactor = ipfsReplicationFactor
|
||||
gw.cfg.IPFSEnableEncryption = ipfsEnableEncryption
|
||||
|
||||
logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...")
|
||||
return gw, nil
|
||||
}
|
||||
@ -204,6 +293,13 @@ func (g *Gateway) Close() {
|
||||
g.logger.ComponentWarn(logging.ComponentGeneral, "error during Olric client close", zap.Error(err))
|
||||
}
|
||||
}
|
||||
if g.ipfsClient != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := g.ipfsClient.Close(ctx); err != nil {
|
||||
g.logger.ComponentWarn(logging.ComponentGeneral, "error during IPFS client close", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getLocalSubscribers returns all local subscribers for a given topic and namespace
|
||||
@ -307,3 +403,77 @@ func discoverOlricServers(networkClient client.NetworkClient, logger *zap.Logger
|
||||
|
||||
return olricServers
|
||||
}
|
||||
|
||||
// ipfsDiscoveryResult holds discovered IPFS configuration
|
||||
type ipfsDiscoveryResult struct {
|
||||
clusterURL string
|
||||
apiURL string
|
||||
timeout time.Duration
|
||||
replicationFactor int
|
||||
enableEncryption bool
|
||||
}
|
||||
|
||||
// discoverIPFSFromNodeConfigs discovers IPFS configuration from node.yaml files
|
||||
// Checks bootstrap.yaml first, then node.yaml, node2.yaml, etc.
|
||||
func discoverIPFSFromNodeConfigs(logger *zap.Logger) ipfsDiscoveryResult {
|
||||
homeDir, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
logger.Debug("Failed to get home directory for IPFS discovery", zap.Error(err))
|
||||
return ipfsDiscoveryResult{}
|
||||
}
|
||||
|
||||
configDir := filepath.Join(homeDir, ".debros")
|
||||
|
||||
// Try bootstrap.yaml first, then node.yaml, node2.yaml, etc.
|
||||
configFiles := []string{"bootstrap.yaml", "node.yaml", "node2.yaml", "node3.yaml"}
|
||||
|
||||
for _, filename := range configFiles {
|
||||
configPath := filepath.Join(configDir, filename)
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var nodeCfg config.Config
|
||||
if err := config.DecodeStrict(strings.NewReader(string(data)), &nodeCfg); err != nil {
|
||||
logger.Debug("Failed to parse node config for IPFS discovery",
|
||||
zap.String("file", filename), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if IPFS is configured
|
||||
if nodeCfg.Database.IPFS.ClusterAPIURL != "" {
|
||||
result := ipfsDiscoveryResult{
|
||||
clusterURL: nodeCfg.Database.IPFS.ClusterAPIURL,
|
||||
apiURL: nodeCfg.Database.IPFS.APIURL,
|
||||
timeout: nodeCfg.Database.IPFS.Timeout,
|
||||
replicationFactor: nodeCfg.Database.IPFS.ReplicationFactor,
|
||||
enableEncryption: nodeCfg.Database.IPFS.EnableEncryption,
|
||||
}
|
||||
|
||||
if result.apiURL == "" {
|
||||
result.apiURL = "http://localhost:5001"
|
||||
}
|
||||
if result.timeout == 0 {
|
||||
result.timeout = 60 * time.Second
|
||||
}
|
||||
if result.replicationFactor == 0 {
|
||||
result.replicationFactor = 3
|
||||
}
|
||||
// Default encryption to true if not set
|
||||
if !result.enableEncryption {
|
||||
result.enableEncryption = true
|
||||
}
|
||||
|
||||
logger.Info("Discovered IPFS config from node config",
|
||||
zap.String("file", filename),
|
||||
zap.String("cluster_url", result.clusterURL),
|
||||
zap.String("api_url", result.apiURL),
|
||||
zap.Bool("encryption_enabled", result.enableEncryption))
|
||||
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
return ipfsDiscoveryResult{}
|
||||
}
|
||||
|
||||
@ -26,12 +26,3 @@ func TestExtractAPIKey(t *testing.T) {
|
||||
t.Fatalf("got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateNamespaceParam(t *testing.T) {
|
||||
g := &Gateway{}
|
||||
r := httptest.NewRequest(http.MethodGet, "/v1/storage/get?namespace=ns1&key=k", nil)
|
||||
// no context namespace: should be false
|
||||
if g.validateNamespaceParam(r) {
|
||||
t.Fatalf("expected false without context ns")
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,5 +54,12 @@ func (g *Gateway) Routes() http.Handler {
|
||||
mux.HandleFunc("/v1/cache/delete", g.cacheDeleteHandler)
|
||||
mux.HandleFunc("/v1/cache/scan", g.cacheScanHandler)
|
||||
|
||||
// storage endpoints (IPFS)
|
||||
mux.HandleFunc("/v1/storage/upload", g.storageUploadHandler)
|
||||
mux.HandleFunc("/v1/storage/pin", g.storagePinHandler)
|
||||
mux.HandleFunc("/v1/storage/status/", g.storageStatusHandler)
|
||||
mux.HandleFunc("/v1/storage/get/", g.storageGetHandler)
|
||||
mux.HandleFunc("/v1/storage/unpin/", g.storageUnpinHandler)
|
||||
|
||||
return g.withMiddleware(mux)
|
||||
}
|
||||
|
||||
@ -1,13 +1,338 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Database HTTP handlers
|
||||
// StorageUploadRequest represents a request to upload content to IPFS
|
||||
type StorageUploadRequest struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Data string `json:"data,omitempty"` // Base64 encoded data (alternative to multipart)
|
||||
}
|
||||
|
||||
// StorageUploadResponse represents the response from uploading content
|
||||
type StorageUploadResponse struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
// StoragePinRequest represents a request to pin a CID
|
||||
type StoragePinRequest struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name,omitempty"`
|
||||
}
|
||||
|
||||
// StoragePinResponse represents the response from pinning a CID
|
||||
type StoragePinResponse struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// StorageStatusResponse represents the status of a pinned CID
|
||||
type StorageStatusResponse struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"`
|
||||
ReplicationMin int `json:"replication_min"`
|
||||
ReplicationMax int `json:"replication_max"`
|
||||
ReplicationFactor int `json:"replication_factor"`
|
||||
Peers []string `json:"peers"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// storageUploadHandler handles POST /v1/storage/upload
|
||||
func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.ipfsClient == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "IPFS storage not available")
|
||||
return
|
||||
}
|
||||
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
|
||||
// Get namespace from context
|
||||
namespace := g.getNamespaceFromContext(r.Context())
|
||||
if namespace == "" {
|
||||
writeError(w, http.StatusUnauthorized, "namespace required")
|
||||
return
|
||||
}
|
||||
|
||||
// Get replication factor from config (default: 3)
|
||||
replicationFactor := g.cfg.IPFSReplicationFactor
|
||||
if replicationFactor == 0 {
|
||||
replicationFactor = 3
|
||||
}
|
||||
|
||||
// Check if it's multipart/form-data or JSON
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
var reader io.Reader
|
||||
var name string
|
||||
|
||||
if strings.HasPrefix(contentType, "multipart/form-data") {
|
||||
// Handle multipart upload
|
||||
if err := r.ParseMultipartForm(32 << 20); err != nil { // 32MB max
|
||||
writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to parse multipart form: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
file, header, err := r.FormFile("file")
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to get file: %v", err))
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
reader = file
|
||||
name = header.Filename
|
||||
} else {
|
||||
// Handle JSON request with base64 data
|
||||
var req StorageUploadRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode request: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if req.Data == "" {
|
||||
writeError(w, http.StatusBadRequest, "data field required")
|
||||
return
|
||||
}
|
||||
|
||||
// Decode base64 data
|
||||
data, err := base64Decode(req.Data)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode base64 data: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
reader = bytes.NewReader(data)
|
||||
name = req.Name
|
||||
}
|
||||
|
||||
// Add to IPFS
|
||||
ctx := r.Context()
|
||||
addResp, err := g.ipfsClient.Add(ctx, reader, name)
|
||||
if err != nil {
|
||||
g.logger.ComponentError(logging.ComponentGeneral, "failed to add content to IPFS", zap.Error(err))
|
||||
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to add content: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Pin with replication factor
|
||||
_, err = g.ipfsClient.Pin(ctx, addResp.Cid, name, replicationFactor)
|
||||
if err != nil {
|
||||
g.logger.ComponentWarn(logging.ComponentGeneral, "failed to pin content", zap.Error(err), zap.String("cid", addResp.Cid))
|
||||
// Still return success, but log the pin failure
|
||||
}
|
||||
|
||||
response := StorageUploadResponse{
|
||||
Cid: addResp.Cid,
|
||||
Name: addResp.Name,
|
||||
Size: addResp.Size,
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, response)
|
||||
}
|
||||
|
||||
// storagePinHandler handles POST /v1/storage/pin
|
||||
func (g *Gateway) storagePinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.ipfsClient == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "IPFS storage not available")
|
||||
return
|
||||
}
|
||||
|
||||
if r.Method != http.MethodPost {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
|
||||
var req StoragePinRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode request: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if req.Cid == "" {
|
||||
writeError(w, http.StatusBadRequest, "cid required")
|
||||
return
|
||||
}
|
||||
|
||||
// Get replication factor from config (default: 3)
|
||||
replicationFactor := g.cfg.IPFSReplicationFactor
|
||||
if replicationFactor == 0 {
|
||||
replicationFactor = 3
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
pinResp, err := g.ipfsClient.Pin(ctx, req.Cid, req.Name, replicationFactor)
|
||||
if err != nil {
|
||||
g.logger.ComponentError(logging.ComponentGeneral, "failed to pin CID", zap.Error(err), zap.String("cid", req.Cid))
|
||||
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to pin: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Use name from request if response doesn't have it
|
||||
name := pinResp.Name
|
||||
if name == "" {
|
||||
name = req.Name
|
||||
}
|
||||
|
||||
response := StoragePinResponse{
|
||||
Cid: pinResp.Cid,
|
||||
Name: name,
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, response)
|
||||
}
|
||||
|
||||
// storageStatusHandler handles GET /v1/storage/status/:cid
|
||||
func (g *Gateway) storageStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.ipfsClient == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "IPFS storage not available")
|
||||
return
|
||||
}
|
||||
|
||||
if r.Method != http.MethodGet {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
|
||||
// Extract CID from path
|
||||
path := strings.TrimPrefix(r.URL.Path, "/v1/storage/status/")
|
||||
if path == "" {
|
||||
writeError(w, http.StatusBadRequest, "cid required")
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
status, err := g.ipfsClient.PinStatus(ctx, path)
|
||||
if err != nil {
|
||||
g.logger.ComponentError(logging.ComponentGeneral, "failed to get pin status", zap.Error(err), zap.String("cid", path))
|
||||
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to get status: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
response := StorageStatusResponse{
|
||||
Cid: status.Cid,
|
||||
Name: status.Name,
|
||||
Status: status.Status,
|
||||
ReplicationMin: status.ReplicationMin,
|
||||
ReplicationMax: status.ReplicationMax,
|
||||
ReplicationFactor: status.ReplicationFactor,
|
||||
Peers: status.Peers,
|
||||
Error: status.Error,
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, response)
|
||||
}
|
||||
|
||||
// storageGetHandler handles GET /v1/storage/get/:cid
|
||||
func (g *Gateway) storageGetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.ipfsClient == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "IPFS storage not available")
|
||||
return
|
||||
}
|
||||
|
||||
if r.Method != http.MethodGet {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
|
||||
// Extract CID from path
|
||||
path := strings.TrimPrefix(r.URL.Path, "/v1/storage/get/")
|
||||
if path == "" {
|
||||
writeError(w, http.StatusBadRequest, "cid required")
|
||||
return
|
||||
}
|
||||
|
||||
// Get namespace from context
|
||||
namespace := g.getNamespaceFromContext(r.Context())
|
||||
if namespace == "" {
|
||||
writeError(w, http.StatusUnauthorized, "namespace required")
|
||||
return
|
||||
}
|
||||
|
||||
// Get IPFS API URL from config
|
||||
ipfsAPIURL := g.cfg.IPFSAPIURL
|
||||
if ipfsAPIURL == "" {
|
||||
ipfsAPIURL = "http://localhost:5001"
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
reader, err := g.ipfsClient.Get(ctx, path, ipfsAPIURL)
|
||||
if err != nil {
|
||||
g.logger.ComponentError(logging.ComponentGeneral, "failed to get content from IPFS", zap.Error(err), zap.String("cid", path))
|
||||
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to get content: %v", err))
|
||||
return
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
w.Header().Set("Content-Type", "application/octet-stream")
|
||||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", path))
|
||||
|
||||
if _, err := io.Copy(w, reader); err != nil {
|
||||
g.logger.ComponentError(logging.ComponentGeneral, "failed to write content", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// storageUnpinHandler handles DELETE /v1/storage/unpin/:cid
|
||||
func (g *Gateway) storageUnpinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.ipfsClient == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "IPFS storage not available")
|
||||
return
|
||||
}
|
||||
|
||||
if r.Method != http.MethodDelete {
|
||||
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
|
||||
return
|
||||
}
|
||||
|
||||
// Extract CID from path
|
||||
path := strings.TrimPrefix(r.URL.Path, "/v1/storage/unpin/")
|
||||
if path == "" {
|
||||
writeError(w, http.StatusBadRequest, "cid required")
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
if err := g.ipfsClient.Unpin(ctx, path); err != nil {
|
||||
g.logger.ComponentError(logging.ComponentGeneral, "failed to unpin CID", zap.Error(err), zap.String("cid", path))
|
||||
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to unpin: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "cid": path})
|
||||
}
|
||||
|
||||
// base64Decode decodes base64 string to bytes
|
||||
func base64Decode(s string) ([]byte, error) {
|
||||
return base64.StdEncoding.DecodeString(s)
|
||||
}
|
||||
|
||||
// getNamespaceFromContext extracts namespace from request context
|
||||
func (g *Gateway) getNamespaceFromContext(ctx context.Context) string {
|
||||
if v := ctx.Value(ctxKeyNamespaceOverride); v != nil {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
return s
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Network HTTP handlers
|
||||
|
||||
func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if g.client == nil {
|
||||
@ -84,17 +409,3 @@ func (g *Gateway) networkDisconnectHandler(w http.ResponseWriter, r *http.Reques
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
|
||||
}
|
||||
|
||||
func (g *Gateway) validateNamespaceParam(r *http.Request) bool {
|
||||
qns := r.URL.Query().Get("namespace")
|
||||
if qns == "" {
|
||||
return true
|
||||
}
|
||||
if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
return s == qns
|
||||
}
|
||||
}
|
||||
// If no namespace in context, disallow explicit namespace param
|
||||
return false
|
||||
}
|
||||
|
||||
554
pkg/gateway/storage_handlers_test.go
Normal file
554
pkg/gateway/storage_handlers_test.go
Normal file
@ -0,0 +1,554 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/ipfs"
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
)
|
||||
|
||||
// mockIPFSClient is a mock implementation of ipfs.IPFSClient for testing
|
||||
type mockIPFSClient struct {
|
||||
addFunc func(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error)
|
||||
pinFunc func(ctx context.Context, cid string, name string, replicationFactor int) (*ipfs.PinResponse, error)
|
||||
pinStatusFunc func(ctx context.Context, cid string) (*ipfs.PinStatus, error)
|
||||
getFunc func(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error)
|
||||
unpinFunc func(ctx context.Context, cid string) error
|
||||
}
|
||||
|
||||
func (m *mockIPFSClient) Add(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) {
|
||||
if m.addFunc != nil {
|
||||
return m.addFunc(ctx, reader, name)
|
||||
}
|
||||
return &ipfs.AddResponse{Cid: "QmTest123", Name: name, Size: 100}, nil
|
||||
}
|
||||
|
||||
func (m *mockIPFSClient) Pin(ctx context.Context, cid string, name string, replicationFactor int) (*ipfs.PinResponse, error) {
|
||||
if m.pinFunc != nil {
|
||||
return m.pinFunc(ctx, cid, name, replicationFactor)
|
||||
}
|
||||
return &ipfs.PinResponse{Cid: cid, Name: name}, nil
|
||||
}
|
||||
|
||||
func (m *mockIPFSClient) PinStatus(ctx context.Context, cid string) (*ipfs.PinStatus, error) {
|
||||
if m.pinStatusFunc != nil {
|
||||
return m.pinStatusFunc(ctx, cid)
|
||||
}
|
||||
return &ipfs.PinStatus{
|
||||
Cid: cid,
|
||||
Name: "test",
|
||||
Status: "pinned",
|
||||
ReplicationMin: 3,
|
||||
ReplicationMax: 3,
|
||||
ReplicationFactor: 3,
|
||||
Peers: []string{"peer1", "peer2", "peer3"},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mockIPFSClient) Get(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error) {
|
||||
if m.getFunc != nil {
|
||||
return m.getFunc(ctx, cid, ipfsAPIURL)
|
||||
}
|
||||
return io.NopCloser(strings.NewReader("test content")), nil
|
||||
}
|
||||
|
||||
func (m *mockIPFSClient) Unpin(ctx context.Context, cid string) error {
|
||||
if m.unpinFunc != nil {
|
||||
return m.unpinFunc(ctx, cid)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockIPFSClient) Health(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockIPFSClient) Close(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTestGatewayWithIPFS(t *testing.T, ipfsClient ipfs.IPFSClient) *Gateway {
|
||||
logger, err := logging.NewColoredLogger(logging.ComponentGeneral, true)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create logger: %v", err)
|
||||
}
|
||||
|
||||
cfg := &Config{
|
||||
ListenAddr: ":6001",
|
||||
ClientNamespace: "test",
|
||||
IPFSReplicationFactor: 3,
|
||||
IPFSEnableEncryption: true,
|
||||
IPFSAPIURL: "http://localhost:5001",
|
||||
}
|
||||
|
||||
gw := &Gateway{
|
||||
logger: logger,
|
||||
cfg: cfg,
|
||||
}
|
||||
|
||||
if ipfsClient != nil {
|
||||
gw.ipfsClient = ipfsClient
|
||||
}
|
||||
|
||||
return gw
|
||||
}
|
||||
|
||||
func TestStorageUploadHandler_MissingIPFSClient(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, nil)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", nil)
|
||||
ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns")
|
||||
req = req.WithContext(ctx)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUploadHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusServiceUnavailable {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusServiceUnavailable, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageUploadHandler_MethodNotAllowed(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, &mockIPFSClient{})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/v1/storage/upload", nil)
|
||||
ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns")
|
||||
req = req.WithContext(ctx)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUploadHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusMethodNotAllowed {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusMethodNotAllowed, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageUploadHandler_MissingNamespace(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, &mockIPFSClient{})
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUploadHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusUnauthorized, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageUploadHandler_MultipartUpload(t *testing.T) {
|
||||
expectedCID := "QmTest456"
|
||||
expectedName := "test.txt"
|
||||
expectedSize := int64(200)
|
||||
|
||||
mockClient := &mockIPFSClient{
|
||||
addFunc: func(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) {
|
||||
// Read and verify content
|
||||
data, _ := io.ReadAll(reader)
|
||||
if len(data) == 0 {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
return &ipfs.AddResponse{
|
||||
Cid: expectedCID,
|
||||
Name: name,
|
||||
Size: expectedSize,
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
|
||||
gw := newTestGatewayWithIPFS(t, mockClient)
|
||||
|
||||
var buf bytes.Buffer
|
||||
writer := multipart.NewWriter(&buf)
|
||||
part, _ := writer.CreateFormFile("file", expectedName)
|
||||
part.Write([]byte("test file content"))
|
||||
writer.Close()
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", &buf)
|
||||
req.Header.Set("Content-Type", writer.FormDataContentType())
|
||||
ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns")
|
||||
req = req.WithContext(ctx)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUploadHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code)
|
||||
}
|
||||
|
||||
var resp StorageUploadResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("Failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
if resp.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid)
|
||||
}
|
||||
if resp.Name != expectedName {
|
||||
t.Errorf("Expected name %s, got %s", expectedName, resp.Name)
|
||||
}
|
||||
if resp.Size != expectedSize {
|
||||
t.Errorf("Expected size %d, got %d", expectedSize, resp.Size)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageUploadHandler_JSONUpload(t *testing.T) {
|
||||
expectedCID := "QmTest789"
|
||||
expectedName := "test.json"
|
||||
testData := []byte("test json data")
|
||||
base64Data := base64.StdEncoding.EncodeToString(testData)
|
||||
|
||||
mockClient := &mockIPFSClient{
|
||||
addFunc: func(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) {
|
||||
data, _ := io.ReadAll(reader)
|
||||
if string(data) != string(testData) {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
return &ipfs.AddResponse{
|
||||
Cid: expectedCID,
|
||||
Name: name,
|
||||
Size: int64(len(testData)),
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
|
||||
gw := newTestGatewayWithIPFS(t, mockClient)
|
||||
|
||||
reqBody := StorageUploadRequest{
|
||||
Name: expectedName,
|
||||
Data: base64Data,
|
||||
}
|
||||
bodyBytes, _ := json.Marshal(reqBody)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", bytes.NewReader(bodyBytes))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns")
|
||||
req = req.WithContext(ctx)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUploadHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code)
|
||||
}
|
||||
|
||||
var resp StorageUploadResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("Failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
if resp.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageUploadHandler_InvalidBase64(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, &mockIPFSClient{})
|
||||
|
||||
reqBody := StorageUploadRequest{
|
||||
Name: "test.txt",
|
||||
Data: "invalid base64!!!",
|
||||
}
|
||||
bodyBytes, _ := json.Marshal(reqBody)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", bytes.NewReader(bodyBytes))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns")
|
||||
req = req.WithContext(ctx)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUploadHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageUploadHandler_IPFSError(t *testing.T) {
|
||||
mockClient := &mockIPFSClient{
|
||||
addFunc: func(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
},
|
||||
}
|
||||
|
||||
gw := newTestGatewayWithIPFS(t, mockClient)
|
||||
|
||||
var buf bytes.Buffer
|
||||
writer := multipart.NewWriter(&buf)
|
||||
part, _ := writer.CreateFormFile("file", "test.txt")
|
||||
part.Write([]byte("test"))
|
||||
writer.Close()
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", &buf)
|
||||
req.Header.Set("Content-Type", writer.FormDataContentType())
|
||||
ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns")
|
||||
req = req.WithContext(ctx)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUploadHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusInternalServerError, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoragePinHandler_Success(t *testing.T) {
|
||||
expectedCID := "QmPin123"
|
||||
expectedName := "pinned-file"
|
||||
|
||||
mockClient := &mockIPFSClient{
|
||||
pinFunc: func(ctx context.Context, cid string, name string, replicationFactor int) (*ipfs.PinResponse, error) {
|
||||
if cid != expectedCID {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
if replicationFactor != 3 {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
return &ipfs.PinResponse{Cid: cid, Name: name}, nil
|
||||
},
|
||||
}
|
||||
|
||||
gw := newTestGatewayWithIPFS(t, mockClient)
|
||||
|
||||
reqBody := StoragePinRequest{
|
||||
Cid: expectedCID,
|
||||
Name: expectedName,
|
||||
}
|
||||
bodyBytes, _ := json.Marshal(reqBody)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/storage/pin", bytes.NewReader(bodyBytes))
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storagePinHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code)
|
||||
}
|
||||
|
||||
var resp StoragePinResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("Failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
if resp.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid)
|
||||
}
|
||||
if resp.Name != expectedName {
|
||||
t.Errorf("Expected name %s, got %s", expectedName, resp.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoragePinHandler_MissingCID(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, &mockIPFSClient{})
|
||||
|
||||
reqBody := StoragePinRequest{}
|
||||
bodyBytes, _ := json.Marshal(reqBody)
|
||||
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/storage/pin", bytes.NewReader(bodyBytes))
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storagePinHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageStatusHandler_Success(t *testing.T) {
|
||||
expectedCID := "QmStatus123"
|
||||
mockClient := &mockIPFSClient{
|
||||
pinStatusFunc: func(ctx context.Context, cid string) (*ipfs.PinStatus, error) {
|
||||
return &ipfs.PinStatus{
|
||||
Cid: cid,
|
||||
Name: "test-file",
|
||||
Status: "pinned",
|
||||
ReplicationMin: 3,
|
||||
ReplicationMax: 3,
|
||||
ReplicationFactor: 3,
|
||||
Peers: []string{"peer1", "peer2", "peer3"},
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
|
||||
gw := newTestGatewayWithIPFS(t, mockClient)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/v1/storage/status/"+expectedCID, nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageStatusHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code)
|
||||
}
|
||||
|
||||
var resp StorageStatusResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("Failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
if resp.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid)
|
||||
}
|
||||
if resp.Status != "pinned" {
|
||||
t.Errorf("Expected status 'pinned', got %s", resp.Status)
|
||||
}
|
||||
if resp.ReplicationFactor != 3 {
|
||||
t.Errorf("Expected replication factor 3, got %d", resp.ReplicationFactor)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageStatusHandler_MissingCID(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, &mockIPFSClient{})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/v1/storage/status/", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageStatusHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageGetHandler_Success(t *testing.T) {
|
||||
expectedCID := "QmGet123"
|
||||
expectedContent := "test content from IPFS"
|
||||
|
||||
mockClient := &mockIPFSClient{
|
||||
getFunc: func(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error) {
|
||||
if cid != expectedCID {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
return io.NopCloser(strings.NewReader(expectedContent)), nil
|
||||
},
|
||||
}
|
||||
|
||||
gw := newTestGatewayWithIPFS(t, mockClient)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/v1/storage/get/"+expectedCID, nil)
|
||||
ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns")
|
||||
req = req.WithContext(ctx)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageGetHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code)
|
||||
}
|
||||
|
||||
if w.Body.String() != expectedContent {
|
||||
t.Errorf("Expected content %s, got %s", expectedContent, w.Body.String())
|
||||
}
|
||||
|
||||
if w.Header().Get("Content-Type") != "application/octet-stream" {
|
||||
t.Errorf("Expected Content-Type 'application/octet-stream', got %s", w.Header().Get("Content-Type"))
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageGetHandler_MissingNamespace(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, &mockIPFSClient{})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/v1/storage/get/QmTest123", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageGetHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusUnauthorized, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageUnpinHandler_Success(t *testing.T) {
|
||||
expectedCID := "QmUnpin123"
|
||||
|
||||
mockClient := &mockIPFSClient{
|
||||
unpinFunc: func(ctx context.Context, cid string) error {
|
||||
if cid != expectedCID {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
gw := newTestGatewayWithIPFS(t, mockClient)
|
||||
|
||||
req := httptest.NewRequest(http.MethodDelete, "/v1/storage/unpin/"+expectedCID, nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUnpinHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code)
|
||||
}
|
||||
|
||||
var resp map[string]any
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("Failed to decode response: %v", err)
|
||||
}
|
||||
|
||||
if resp["cid"] != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %v", expectedCID, resp["cid"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageUnpinHandler_MissingCID(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, &mockIPFSClient{})
|
||||
|
||||
req := httptest.NewRequest(http.MethodDelete, "/v1/storage/unpin/", nil)
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
gw.storageUnpinHandler(w, req)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// Test helper functions
|
||||
|
||||
func TestBase64Decode(t *testing.T) {
|
||||
testData := []byte("test data")
|
||||
encoded := base64.StdEncoding.EncodeToString(testData)
|
||||
|
||||
decoded, err := base64Decode(encoded)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to decode: %v", err)
|
||||
}
|
||||
|
||||
if string(decoded) != string(testData) {
|
||||
t.Errorf("Expected %s, got %s", string(testData), string(decoded))
|
||||
}
|
||||
|
||||
// Test invalid base64
|
||||
_, err = base64Decode("invalid!!!")
|
||||
if err == nil {
|
||||
t.Error("Expected error for invalid base64")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNamespaceFromContext(t *testing.T) {
|
||||
gw := newTestGatewayWithIPFS(t, nil)
|
||||
|
||||
// Test with namespace in context
|
||||
ctx := context.WithValue(context.Background(), ctxKeyNamespaceOverride, "test-ns")
|
||||
ns := gw.getNamespaceFromContext(ctx)
|
||||
if ns != "test-ns" {
|
||||
t.Errorf("Expected 'test-ns', got %s", ns)
|
||||
}
|
||||
|
||||
// Test without namespace
|
||||
ctx2 := context.Background()
|
||||
ns2 := gw.getNamespaceFromContext(ctx2)
|
||||
if ns2 != "" {
|
||||
t.Errorf("Expected empty namespace, got %s", ns2)
|
||||
}
|
||||
}
|
||||
345
pkg/ipfs/client.go
Normal file
345
pkg/ipfs/client.go
Normal file
@ -0,0 +1,345 @@
|
||||
package ipfs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// IPFSClient defines the interface for IPFS operations
|
||||
type IPFSClient interface {
|
||||
Add(ctx context.Context, reader io.Reader, name string) (*AddResponse, error)
|
||||
Pin(ctx context.Context, cid string, name string, replicationFactor int) (*PinResponse, error)
|
||||
PinStatus(ctx context.Context, cid string) (*PinStatus, error)
|
||||
Get(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error)
|
||||
Unpin(ctx context.Context, cid string) error
|
||||
Health(ctx context.Context) error
|
||||
Close(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Client wraps an IPFS Cluster HTTP API client for storage operations
|
||||
type Client struct {
|
||||
apiURL string
|
||||
httpClient *http.Client
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// Config holds configuration for the IPFS client
|
||||
type Config struct {
|
||||
// ClusterAPIURL is the base URL for IPFS Cluster HTTP API (e.g., "http://localhost:9094")
|
||||
// If empty, defaults to "http://localhost:9094"
|
||||
ClusterAPIURL string
|
||||
|
||||
// Timeout is the timeout for client operations
|
||||
// If zero, defaults to 60 seconds
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
// PinStatus represents the status of a pinned CID
|
||||
type PinStatus struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"` // "pinned", "pinning", "queued", "unpinned", "error"
|
||||
ReplicationMin int `json:"replication_min"`
|
||||
ReplicationMax int `json:"replication_max"`
|
||||
ReplicationFactor int `json:"replication_factor"`
|
||||
Peers []string `json:"peers"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// AddResponse represents the response from adding content to IPFS
|
||||
type AddResponse struct {
|
||||
Name string `json:"name"`
|
||||
Cid string `json:"cid"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
// PinResponse represents the response from pinning a CID
|
||||
type PinResponse struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// NewClient creates a new IPFS Cluster client wrapper
|
||||
func NewClient(cfg Config, logger *zap.Logger) (*Client, error) {
|
||||
apiURL := cfg.ClusterAPIURL
|
||||
if apiURL == "" {
|
||||
apiURL = "http://localhost:9094"
|
||||
}
|
||||
|
||||
timeout := cfg.Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 60 * time.Second
|
||||
}
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
return &Client{
|
||||
apiURL: apiURL,
|
||||
httpClient: httpClient,
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Health checks if the IPFS Cluster API is healthy
|
||||
func (c *Client) Health(ctx context.Context) error {
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", c.apiURL+"/id", nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create health check request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("health check request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("health check failed with status: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add adds content to IPFS and returns the CID
|
||||
func (c *Client) Add(ctx context.Context, reader io.Reader, name string) (*AddResponse, error) {
|
||||
// Create multipart form request for IPFS Cluster API
|
||||
var buf bytes.Buffer
|
||||
writer := multipart.NewWriter(&buf)
|
||||
|
||||
// Create form file field
|
||||
part, err := writer.CreateFormFile("file", name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create form file: %w", err)
|
||||
}
|
||||
|
||||
if _, err := io.Copy(part, reader); err != nil {
|
||||
return nil, fmt.Errorf("failed to copy data: %w", err)
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return nil, fmt.Errorf("failed to close writer: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", c.apiURL+"/add", &buf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create add request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", writer.FormDataContentType())
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("add request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("add failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result AddResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode add response: %w", err)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// Pin pins a CID with specified replication factor
|
||||
func (c *Client) Pin(ctx context.Context, cid string, name string, replicationFactor int) (*PinResponse, error) {
|
||||
reqBody := map[string]interface{}{
|
||||
"cid": cid,
|
||||
"replication_factor_min": replicationFactor,
|
||||
"replication_factor_max": replicationFactor,
|
||||
}
|
||||
if name != "" {
|
||||
reqBody["name"] = name
|
||||
}
|
||||
|
||||
jsonBody, err := json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal pin request: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", c.apiURL+"/pins/"+cid, bytes.NewReader(jsonBody))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create pin request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pin request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("pin failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result PinResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode pin response: %w", err)
|
||||
}
|
||||
|
||||
// If IPFS Cluster doesn't return the name in the response, use the one from the request
|
||||
if result.Name == "" && name != "" {
|
||||
result.Name = name
|
||||
}
|
||||
// Ensure CID is set
|
||||
if result.Cid == "" {
|
||||
result.Cid = cid
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// PinStatus retrieves the status of a pinned CID
|
||||
func (c *Client) PinStatus(ctx context.Context, cid string) (*PinStatus, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", c.apiURL+"/pins/"+cid, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create pin status request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pin status request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, fmt.Errorf("pin not found: %s", cid)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("pin status failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// IPFS Cluster returns GlobalPinInfo, we need to map it to our PinStatus
|
||||
var gpi struct {
|
||||
Cid string `json:"cid"`
|
||||
Name string `json:"name"`
|
||||
PeerMap map[string]struct {
|
||||
Status interface{} `json:"status"` // TrackerStatus can be string or int
|
||||
Error string `json:"error,omitempty"`
|
||||
} `json:"peer_map"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&gpi); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode pin status response: %w", err)
|
||||
}
|
||||
|
||||
// Extract status from peer map (use first peer's status, or aggregate)
|
||||
status := "unknown"
|
||||
peers := make([]string, 0, len(gpi.PeerMap))
|
||||
var errorMsg string
|
||||
for peerID, pinInfo := range gpi.PeerMap {
|
||||
peers = append(peers, peerID)
|
||||
if pinInfo.Status != nil {
|
||||
// Convert status to string
|
||||
if s, ok := pinInfo.Status.(string); ok {
|
||||
if status == "unknown" || s != "" {
|
||||
status = s
|
||||
}
|
||||
} else if status == "unknown" {
|
||||
// If status is not a string, try to convert it
|
||||
status = fmt.Sprintf("%v", pinInfo.Status)
|
||||
}
|
||||
}
|
||||
if pinInfo.Error != "" {
|
||||
errorMsg = pinInfo.Error
|
||||
}
|
||||
}
|
||||
|
||||
// Normalize status string (common IPFS Cluster statuses)
|
||||
if status == "" || status == "unknown" {
|
||||
status = "pinned" // Default to pinned if we have peers
|
||||
if len(peers) == 0 {
|
||||
status = "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
result := &PinStatus{
|
||||
Cid: gpi.Cid,
|
||||
Name: gpi.Name,
|
||||
Status: status,
|
||||
ReplicationMin: 0, // Not available in GlobalPinInfo
|
||||
ReplicationMax: 0, // Not available in GlobalPinInfo
|
||||
ReplicationFactor: len(peers),
|
||||
Peers: peers,
|
||||
Error: errorMsg,
|
||||
}
|
||||
|
||||
// Ensure CID is set
|
||||
if result.Cid == "" {
|
||||
result.Cid = cid
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Unpin removes a pin from a CID
|
||||
func (c *Client) Unpin(ctx context.Context, cid string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, "DELETE", c.apiURL+"/pins/"+cid, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create unpin request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unpin request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("unpin failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves content from IPFS by CID
|
||||
// Note: This uses the IPFS HTTP API (typically on port 5001), not the Cluster API
|
||||
func (c *Client) Get(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error) {
|
||||
if ipfsAPIURL == "" {
|
||||
ipfsAPIURL = "http://localhost:5001"
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("%s/api/v0/cat?arg=%s", ipfsAPIURL, cid)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create get request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get request failed: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("get failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// Close closes the IPFS client connection
|
||||
func (c *Client) Close(ctx context.Context) error {
|
||||
// HTTP client doesn't need explicit closing
|
||||
return nil
|
||||
}
|
||||
483
pkg/ipfs/client_test.go
Normal file
483
pkg/ipfs/client_test.go
Normal file
@ -0,0 +1,483 @@
|
||||
package ipfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestNewClient(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
t.Run("default_config", func(t *testing.T) {
|
||||
cfg := Config{}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
if client.apiURL != "http://localhost:9094" {
|
||||
t.Errorf("Expected default API URL 'http://localhost:9094', got %s", client.apiURL)
|
||||
}
|
||||
|
||||
if client.httpClient.Timeout != 60*time.Second {
|
||||
t.Errorf("Expected default timeout 60s, got %v", client.httpClient.Timeout)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("custom_config", func(t *testing.T) {
|
||||
cfg := Config{
|
||||
ClusterAPIURL: "http://custom:9094",
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
if client.apiURL != "http://custom:9094" {
|
||||
t.Errorf("Expected API URL 'http://custom:9094', got %s", client.apiURL)
|
||||
}
|
||||
|
||||
if client.httpClient.Timeout != 30*time.Second {
|
||||
t.Errorf("Expected timeout 30s, got %v", client.httpClient.Timeout)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_Add(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmTest123"
|
||||
expectedName := "test.txt"
|
||||
expectedSize := int64(100)
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/add" {
|
||||
t.Errorf("Expected path '/add', got %s", r.URL.Path)
|
||||
}
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("Expected method POST, got %s", r.Method)
|
||||
}
|
||||
|
||||
// Verify multipart form
|
||||
if err := r.ParseMultipartForm(32 << 20); err != nil {
|
||||
t.Errorf("Failed to parse multipart form: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
file, header, err := r.FormFile("file")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get file: %v", err)
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
if header.Filename != expectedName {
|
||||
t.Errorf("Expected filename %s, got %s", expectedName, header.Filename)
|
||||
}
|
||||
|
||||
// Read file content
|
||||
_, _ = io.ReadAll(file)
|
||||
|
||||
response := AddResponse{
|
||||
Cid: expectedCID,
|
||||
Name: expectedName,
|
||||
Size: expectedSize,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
reader := strings.NewReader("test content")
|
||||
resp, err := client.Add(context.Background(), reader, expectedName)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add content: %v", err)
|
||||
}
|
||||
|
||||
if resp.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid)
|
||||
}
|
||||
if resp.Name != expectedName {
|
||||
t.Errorf("Expected name %s, got %s", expectedName, resp.Name)
|
||||
}
|
||||
if resp.Size != expectedSize {
|
||||
t.Errorf("Expected size %d, got %d", expectedSize, resp.Size)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("server_error", func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte("internal error"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
reader := strings.NewReader("test")
|
||||
_, err = client.Add(context.Background(), reader, "test.txt")
|
||||
if err == nil {
|
||||
t.Error("Expected error for server error")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_Pin(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmPin123"
|
||||
expectedName := "pinned-file"
|
||||
expectedReplicationFactor := 3
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !strings.HasPrefix(r.URL.Path, "/pins/") {
|
||||
t.Errorf("Expected path '/pins/', got %s", r.URL.Path)
|
||||
}
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("Expected method POST, got %s", r.Method)
|
||||
}
|
||||
|
||||
var reqBody map[string]interface{}
|
||||
if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil {
|
||||
t.Errorf("Failed to decode request: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if reqBody["cid"] != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %v", expectedCID, reqBody["cid"])
|
||||
}
|
||||
|
||||
response := PinResponse{
|
||||
Cid: expectedCID,
|
||||
Name: expectedName,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
resp, err := client.Pin(context.Background(), expectedCID, expectedName, expectedReplicationFactor)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to pin: %v", err)
|
||||
}
|
||||
|
||||
if resp.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid)
|
||||
}
|
||||
if resp.Name != expectedName {
|
||||
t.Errorf("Expected name %s, got %s", expectedName, resp.Name)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("accepted_status", func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
response := PinResponse{Cid: "QmTest", Name: "test"}
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
_, err = client.Pin(context.Background(), "QmTest", "test", 3)
|
||||
if err != nil {
|
||||
t.Errorf("Expected success for Accepted status, got error: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_PinStatus(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmStatus123"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !strings.HasPrefix(r.URL.Path, "/pins/") {
|
||||
t.Errorf("Expected path '/pins/', got %s", r.URL.Path)
|
||||
}
|
||||
if r.Method != "GET" {
|
||||
t.Errorf("Expected method GET, got %s", r.Method)
|
||||
}
|
||||
|
||||
response := PinStatus{
|
||||
Cid: expectedCID,
|
||||
Name: "test-file",
|
||||
Status: "pinned",
|
||||
ReplicationMin: 3,
|
||||
ReplicationMax: 3,
|
||||
ReplicationFactor: 3,
|
||||
Peers: []string{"peer1", "peer2", "peer3"},
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(response)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
status, err := client.PinStatus(context.Background(), expectedCID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get pin status: %v", err)
|
||||
}
|
||||
|
||||
if status.Cid != expectedCID {
|
||||
t.Errorf("Expected CID %s, got %s", expectedCID, status.Cid)
|
||||
}
|
||||
if status.Status != "pinned" {
|
||||
t.Errorf("Expected status 'pinned', got %s", status.Status)
|
||||
}
|
||||
if len(status.Peers) != 3 {
|
||||
t.Errorf("Expected 3 peers, got %d", len(status.Peers))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("not_found", func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
_, err = client.PinStatus(context.Background(), "QmNotFound")
|
||||
if err == nil {
|
||||
t.Error("Expected error for not found")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_Unpin(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmUnpin123"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !strings.HasPrefix(r.URL.Path, "/pins/") {
|
||||
t.Errorf("Expected path '/pins/', got %s", r.URL.Path)
|
||||
}
|
||||
if r.Method != "DELETE" {
|
||||
t.Errorf("Expected method DELETE, got %s", r.Method)
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
err = client.Unpin(context.Background(), expectedCID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to unpin: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("accepted_status", func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
err = client.Unpin(context.Background(), "QmTest")
|
||||
if err != nil {
|
||||
t.Errorf("Expected success for Accepted status, got error: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_Get(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
expectedCID := "QmGet123"
|
||||
expectedContent := "test content from IPFS"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if !strings.Contains(r.URL.Path, "/api/v0/cat") {
|
||||
t.Errorf("Expected path containing '/api/v0/cat', got %s", r.URL.Path)
|
||||
}
|
||||
if r.Method != "POST" {
|
||||
t.Errorf("Expected method POST, got %s", r.Method)
|
||||
}
|
||||
|
||||
// Verify CID parameter
|
||||
if !strings.Contains(r.URL.RawQuery, expectedCID) {
|
||||
t.Errorf("Expected CID %s in query, got %s", expectedCID, r.URL.RawQuery)
|
||||
}
|
||||
|
||||
w.Write([]byte(expectedContent))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: "http://localhost:9094"}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
reader, err := client.Get(context.Background(), expectedCID, server.URL)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get content: %v", err)
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read content: %v", err)
|
||||
}
|
||||
|
||||
if string(data) != expectedContent {
|
||||
t.Errorf("Expected content %s, got %s", expectedContent, string(data))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("not_found", func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: "http://localhost:9094"}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
_, err = client.Get(context.Background(), "QmNotFound", server.URL)
|
||||
if err == nil {
|
||||
t.Error("Expected error for not found")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("default_ipfs_api_url", func(t *testing.T) {
|
||||
expectedCID := "QmDefault"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write([]byte("content"))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: "http://localhost:9094"}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
// Test with empty IPFS API URL (should use default)
|
||||
// Note: This will fail because we're using a test server, but it tests the logic
|
||||
_, err = client.Get(context.Background(), expectedCID, "")
|
||||
// We expect an error here because default localhost:5001 won't exist
|
||||
if err == nil {
|
||||
t.Error("Expected error when using default localhost:5001")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_Health(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/id" {
|
||||
t.Errorf("Expected path '/id', got %s", r.URL.Path)
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"id": "test"}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
err = client.Health(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed health check: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("unhealthy", func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
cfg := Config{ClusterAPIURL: server.URL}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
err = client.Health(context.Background())
|
||||
if err == nil {
|
||||
t.Error("Expected error for unhealthy status")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestClient_Close(t *testing.T) {
|
||||
logger := zap.NewNop()
|
||||
|
||||
cfg := Config{ClusterAPIURL: "http://localhost:9094"}
|
||||
client, err := NewClient(cfg, logger)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client: %v", err)
|
||||
}
|
||||
|
||||
// Close should not error
|
||||
err = client.Close(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("Close should not error, got: %v", err)
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user