diff --git a/CHANGELOG.md b/CHANGELOG.md index ac7a4f1..20b5347 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,42 @@ The format is based on [Keep a Changelog][keepachangelog] and adheres to [Semant ### Deprecated ### Fixed +## [0.56.0] - 2025-11-05 + +### Added +- Added IPFS storage endpoints to the Gateway for content upload, pinning, status, retrieval, and unpinning. +- Introduced `StorageClient` interface and implementation in the Go client library for interacting with the new IPFS storage endpoints. +- Added support for automatically starting IPFS daemon, IPFS Cluster daemon, and Olric cache server in the `dev` environment setup. + +### Changed +- Updated Gateway configuration to include settings for IPFS Cluster API URL, IPFS API URL, timeout, and replication factor. +- Refactored Olric configuration generation to use a simpler, local-environment focused setup. +- Improved IPFS content retrieval (`Get`) to fall back to the IPFS Gateway (port 8080) if the IPFS API (port 5001) returns a 404. + +### Deprecated + +### Removed + +### Fixed +\n +## [0.55.0] - 2025-11-05 + +### Added +- Added IPFS storage endpoints to the Gateway for content upload, pinning, status, retrieval, and unpinning. +- Introduced `StorageClient` interface and implementation in the Go client library for interacting with the new IPFS storage endpoints. +- Added support for automatically starting IPFS daemon, IPFS Cluster daemon, and Olric cache server in the `dev` environment setup. + +### Changed +- Updated Gateway configuration to include settings for IPFS Cluster API URL, IPFS API URL, timeout, and replication factor. +- Refactored Olric configuration generation to use a simpler, local-environment focused setup. +- Improved `dev` environment logging to include logs from IPFS and Olric services when running. + +### Deprecated + +### Removed + +### Fixed +\n ## [0.54.0] - 2025-11-03 ### Added diff --git a/Makefile b/Makefile index 03a1f0b..bc9bbb2 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports install-hooks -VERSION := 0.54.0 +VERSION := 0.56.0 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' @@ -119,6 +119,58 @@ dev: build @echo "Starting node3..." @nohup ./bin/node --config node3.yaml > $$HOME/.debros/logs/node3.log 2>&1 & echo $$! > .dev/pids/node3.pid @sleep 1 + @echo "Starting IPFS daemon..." + @if command -v ipfs >/dev/null 2>&1; then \ + if [ ! -d $$HOME/.debros/ipfs ]; then \ + echo " Initializing IPFS repository..."; \ + IPFS_PATH=$$HOME/.debros/ipfs ipfs init 2>&1 | grep -v "generating" | grep -v "peer identity" || true; \ + fi; \ + if ! pgrep -f "ipfs daemon" >/dev/null 2>&1; then \ + IPFS_PATH=$$HOME/.debros/ipfs nohup ipfs daemon > $$HOME/.debros/logs/ipfs.log 2>&1 & echo $$! > .dev/pids/ipfs.pid; \ + echo " IPFS daemon started (PID: $$(cat .dev/pids/ipfs.pid))"; \ + sleep 5; \ + else \ + echo " ✓ IPFS daemon already running"; \ + fi; \ + else \ + echo " ⚠️ ipfs command not found - skipping IPFS (storage endpoints will be disabled)"; \ + echo " Install with: https://docs.ipfs.tech/install/"; \ + fi + @echo "Starting IPFS Cluster daemon..." + @if command -v ipfs-cluster-service >/dev/null 2>&1; then \ + if [ ! -d $$HOME/.debros/ipfs-cluster ]; then \ + echo " Initializing IPFS Cluster..."; \ + CLUSTER_PATH=$$HOME/.debros/ipfs-cluster ipfs-cluster-service init --force 2>&1 | grep -v "peer identity" || true; \ + fi; \ + if ! pgrep -f "ipfs-cluster-service" >/dev/null 2>&1; then \ + CLUSTER_PATH=$$HOME/.debros/ipfs-cluster nohup ipfs-cluster-service daemon > $$HOME/.debros/logs/ipfs-cluster.log 2>&1 & echo $$! > .dev/pids/ipfs-cluster.pid; \ + echo " IPFS Cluster daemon started (PID: $$(cat .dev/pids/ipfs-cluster.pid))"; \ + sleep 5; \ + else \ + echo " ✓ IPFS Cluster daemon already running"; \ + fi; \ + else \ + echo " ⚠️ ipfs-cluster-service command not found - skipping IPFS Cluster (storage endpoints will be disabled)"; \ + echo " Install with: https://ipfscluster.io/documentation/guides/install/"; \ + fi + @echo "Starting Olric cache server..." + @if command -v olric-server >/dev/null 2>&1; then \ + if [ ! -f $$HOME/.debros/olric-config.yaml ]; then \ + echo " Creating Olric config..."; \ + mkdir -p $$HOME/.debros; \ + fi; \ + if ! pgrep -f "olric-server" >/dev/null 2>&1; then \ + OLRIC_SERVER_CONFIG=$$HOME/.debros/olric-config.yaml nohup olric-server > $$HOME/.debros/logs/olric.log 2>&1 & echo $$! > .dev/pids/olric.pid; \ + echo " Olric cache server started (PID: $$(cat .dev/pids/olric.pid))"; \ + sleep 3; \ + else \ + echo " ✓ Olric cache server already running"; \ + fi; \ + else \ + echo " ⚠️ olric-server command not found - skipping Olric (cache endpoints will be disabled)"; \ + echo " Install with: go install github.com/olric-data/olric/cmd/olric-server@v0.7.0"; \ + fi + @sleep 1 @echo "Starting gateway..." @nohup ./bin/gateway --config gateway.yaml > $$HOME/.debros/logs/gateway.log 2>&1 & echo $$! > .dev/pids/gateway.pid @echo "" @@ -130,6 +182,15 @@ dev: build @if [ -f .dev/pids/anon.pid ]; then \ echo " Anon: PID=$$(cat .dev/pids/anon.pid) (SOCKS: 9050)"; \ fi + @if [ -f .dev/pids/ipfs.pid ]; then \ + echo " IPFS: PID=$$(cat .dev/pids/ipfs.pid) (API: 5001)"; \ + fi + @if [ -f .dev/pids/ipfs-cluster.pid ]; then \ + echo " IPFS Cluster: PID=$$(cat .dev/pids/ipfs-cluster.pid) (API: 9094)"; \ + fi + @if [ -f .dev/pids/olric.pid ]; then \ + echo " Olric: PID=$$(cat .dev/pids/olric.pid) (API: 3320)"; \ + fi @echo " Bootstrap: PID=$$(cat .dev/pids/bootstrap.pid)" @echo " Node2: PID=$$(cat .dev/pids/node2.pid)" @echo " Node3: PID=$$(cat .dev/pids/node3.pid)" @@ -137,6 +198,13 @@ dev: build @echo "" @echo "Ports:" @echo " Anon SOCKS: 9050 (proxy endpoint: POST /v1/proxy/anon)" + @if [ -f .dev/pids/ipfs.pid ]; then \ + echo " IPFS API: 5001 (content retrieval)"; \ + echo " IPFS Cluster: 9094 (pin management)"; \ + fi + @if [ -f .dev/pids/olric.pid ]; then \ + echo " Olric: 3320 (cache API)"; \ + fi @echo " Bootstrap P2P: 4001, HTTP: 5001, Raft: 7001" @echo " Node2 P2P: 4002, HTTP: 5002, Raft: 7002" @echo " Node3 P2P: 4003, HTTP: 5003, Raft: 7003" @@ -145,13 +213,18 @@ dev: build @echo "Press Ctrl+C to stop all processes" @echo "============================================================" @echo "" - @if [ -f .dev/pids/anon.pid ]; then \ - trap 'echo "Stopping all processes..."; kill $$(cat .dev/pids/*.pid) 2>/dev/null; rm -f .dev/pids/*.pid; exit 0' INT; \ - tail -f $$HOME/.debros/logs/anon.log $$HOME/.debros/logs/bootstrap.log $$HOME/.debros/logs/node2.log $$HOME/.debros/logs/node3.log $$HOME/.debros/logs/gateway.log; \ - else \ - trap 'echo "Stopping all processes..."; kill $$(cat .dev/pids/*.pid) 2>/dev/null; rm -f .dev/pids/*.pid; exit 0' INT; \ - tail -f $$HOME/.debros/logs/bootstrap.log $$HOME/.debros/logs/node2.log $$HOME/.debros/logs/node3.log $$HOME/.debros/logs/gateway.log; \ - fi + @LOGS="$$HOME/.debros/logs/bootstrap.log $$HOME/.debros/logs/node2.log $$HOME/.debros/logs/node3.log $$HOME/.debros/logs/gateway.log"; \ + if [ -f .dev/pids/anon.pid ]; then \ + LOGS="$$LOGS $$HOME/.debros/logs/anon.log"; \ + fi; \ + if [ -f .dev/pids/ipfs.pid ]; then \ + LOGS="$$LOGS $$HOME/.debros/logs/ipfs.log"; \ + fi; \ + if [ -f .dev/pids/ipfs-cluster.pid ]; then \ + LOGS="$$LOGS $$HOME/.debros/logs/ipfs-cluster.log"; \ + fi; \ + trap 'echo "Stopping all processes..."; kill $$(cat .dev/pids/*.pid) 2>/dev/null; rm -f .dev/pids/*.pid; exit 0' INT; \ + tail -f $$LOGS # Help help: diff --git a/README.md b/README.md index 54fe138..b325374 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,7 @@ Common endpoints (see `openapi/gateway.yaml` for the full spec): - `POST /v1/rqlite/exec`, `POST /v1/rqlite/find`, `POST /v1/rqlite/select`, `POST /v1/rqlite/transaction` - `GET /v1/rqlite/schema` - `POST /v1/pubsub/publish`, `GET /v1/pubsub/topics`, `GET /v1/pubsub/ws?topic=` +- `POST /v1/storage/upload`, `POST /v1/storage/pin`, `GET /v1/storage/status/:cid`, `GET /v1/storage/get/:cid`, `DELETE /v1/storage/unpin/:cid` ## Troubleshooting diff --git a/cmd/gateway/config.go b/cmd/gateway/config.go index d8d1864..cf71959 100644 --- a/cmd/gateway/config.go +++ b/cmd/gateway/config.go @@ -51,15 +51,19 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { // Load YAML type yamlCfg struct { - ListenAddr string `yaml:"listen_addr"` - ClientNamespace string `yaml:"client_namespace"` - RQLiteDSN string `yaml:"rqlite_dsn"` - BootstrapPeers []string `yaml:"bootstrap_peers"` - EnableHTTPS bool `yaml:"enable_https"` - DomainName string `yaml:"domain_name"` - TLSCacheDir string `yaml:"tls_cache_dir"` - OlricServers []string `yaml:"olric_servers"` - OlricTimeout string `yaml:"olric_timeout"` + ListenAddr string `yaml:"listen_addr"` + ClientNamespace string `yaml:"client_namespace"` + RQLiteDSN string `yaml:"rqlite_dsn"` + BootstrapPeers []string `yaml:"bootstrap_peers"` + EnableHTTPS bool `yaml:"enable_https"` + DomainName string `yaml:"domain_name"` + TLSCacheDir string `yaml:"tls_cache_dir"` + OlricServers []string `yaml:"olric_servers"` + OlricTimeout string `yaml:"olric_timeout"` + IPFSClusterAPIURL string `yaml:"ipfs_cluster_api_url"` + IPFSAPIURL string `yaml:"ipfs_api_url"` + IPFSTimeout string `yaml:"ipfs_timeout"` + IPFSReplicationFactor int `yaml:"ipfs_replication_factor"` } data, err := os.ReadFile(configPath) @@ -82,15 +86,19 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { // Build config from YAML cfg := &gateway.Config{ - ListenAddr: ":6001", - ClientNamespace: "default", - BootstrapPeers: nil, - RQLiteDSN: "", - EnableHTTPS: false, - DomainName: "", - TLSCacheDir: "", - OlricServers: nil, - OlricTimeout: 0, + ListenAddr: ":6001", + ClientNamespace: "default", + BootstrapPeers: nil, + RQLiteDSN: "", + EnableHTTPS: false, + DomainName: "", + TLSCacheDir: "", + OlricServers: nil, + OlricTimeout: 0, + IPFSClusterAPIURL: "", + IPFSAPIURL: "", + IPFSTimeout: 0, + IPFSReplicationFactor: 0, } if v := strings.TrimSpace(y.ListenAddr); v != "" { @@ -142,6 +150,24 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config { } } + // IPFS configuration + if v := strings.TrimSpace(y.IPFSClusterAPIURL); v != "" { + cfg.IPFSClusterAPIURL = v + } + if v := strings.TrimSpace(y.IPFSAPIURL); v != "" { + cfg.IPFSAPIURL = v + } + if v := strings.TrimSpace(y.IPFSTimeout); v != "" { + if parsed, err := time.ParseDuration(v); err == nil { + cfg.IPFSTimeout = parsed + } else { + logger.ComponentWarn(logging.ComponentGeneral, "invalid ipfs_timeout, using default", zap.String("value", v), zap.Error(err)) + } + } + if y.IPFSReplicationFactor > 0 { + cfg.IPFSReplicationFactor = y.IPFSReplicationFactor + } + // Validate configuration if errs := cfg.ValidateConfig(); len(errs) > 0 { fmt.Fprintf(os.Stderr, "\nGateway configuration errors (%d):\n", len(errs)) diff --git a/e2e/gateway_e2e_test.go b/e2e/gateway_e2e_test.go index 82e7f27..8c6cb27 100644 --- a/e2e/gateway_e2e_test.go +++ b/e2e/gateway_e2e_test.go @@ -3,10 +3,13 @@ package e2e import ( + "bytes" "crypto/rand" "encoding/base64" "encoding/json" "fmt" + "io" + "mime/multipart" "net/http" "net/url" "os" @@ -407,6 +410,201 @@ func TestGateway_Database_RecreateWithFK(t *testing.T) { } } +func TestGateway_Storage_UploadMultipart(t *testing.T) { + key := requireAPIKey(t) + base := gatewayBaseURL() + + // Create multipart form data using proper multipart writer + content := []byte("test file content for IPFS upload") + var buf bytes.Buffer + writer := multipart.NewWriter(&buf) + part, err := writer.CreateFormFile("file", "test.txt") + if err != nil { + t.Fatalf("create form file: %v", err) + } + if _, err := part.Write(content); err != nil { + t.Fatalf("write content: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("close writer: %v", err) + } + + req, _ := http.NewRequest(http.MethodPost, base+"/v1/storage/upload", &buf) + req.Header = authHeader(key) + req.Header.Set("Content-Type", writer.FormDataContentType()) + resp, err := httpClient().Do(req) + if err != nil { + t.Fatalf("upload do: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusServiceUnavailable { + t.Skip("IPFS storage not available; skipping storage tests") + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("upload status: %d, body: %s", resp.StatusCode, string(body)) + } + + var uploadResp struct { + Cid string `json:"cid"` + Name string `json:"name"` + Size int64 `json:"size"` + } + if err := json.NewDecoder(resp.Body).Decode(&uploadResp); err != nil { + t.Fatalf("upload decode: %v", err) + } + if uploadResp.Cid == "" { + t.Fatalf("upload returned empty CID") + } + if uploadResp.Name != "test.txt" { + t.Fatalf("upload name mismatch: got %s", uploadResp.Name) + } + if uploadResp.Size == 0 { + t.Fatalf("upload size is zero") + } + + // Test pinning the uploaded content + pinBody := fmt.Sprintf(`{"cid":"%s","name":"test-pinned"}`, uploadResp.Cid) + req2, _ := http.NewRequest(http.MethodPost, base+"/v1/storage/pin", strings.NewReader(pinBody)) + req2.Header = authHeader(key) + resp2, err := httpClient().Do(req2) + if err != nil { + t.Fatalf("pin do: %v", err) + } + defer resp2.Body.Close() + if resp2.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp2.Body) + t.Fatalf("pin status: %d, body: %s", resp2.StatusCode, string(body)) + } + + // Test getting pin status + req3, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/status/"+uploadResp.Cid, nil) + req3.Header = authHeader(key) + resp3, err := httpClient().Do(req3) + if err != nil { + t.Fatalf("status do: %v", err) + } + defer resp3.Body.Close() + if resp3.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp3.Body) + t.Fatalf("status status: %d, body: %s", resp3.StatusCode, string(body)) + } + + var statusResp struct { + Cid string `json:"cid"` + Status string `json:"status"` + ReplicationFactor int `json:"replication_factor"` + Peers []string `json:"peers"` + } + if err := json.NewDecoder(resp3.Body).Decode(&statusResp); err != nil { + t.Fatalf("status decode: %v", err) + } + if statusResp.Cid != uploadResp.Cid { + t.Fatalf("status CID mismatch: got %s", statusResp.Cid) + } + + // Test retrieving content + req4, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/get/"+uploadResp.Cid, nil) + req4.Header = authHeader(key) + resp4, err := httpClient().Do(req4) + if err != nil { + t.Fatalf("get do: %v", err) + } + defer resp4.Body.Close() + if resp4.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp4.Body) + t.Fatalf("get status: %d, body: %s", resp4.StatusCode, string(body)) + } + + retrieved, err := io.ReadAll(resp4.Body) + if err != nil { + t.Fatalf("get read: %v", err) + } + if string(retrieved) != string(content) { + t.Fatalf("retrieved content mismatch: got %q", string(retrieved)) + } + + // Test unpinning + req5, _ := http.NewRequest(http.MethodDelete, base+"/v1/storage/unpin/"+uploadResp.Cid, nil) + req5.Header = authHeader(key) + resp5, err := httpClient().Do(req5) + if err != nil { + t.Fatalf("unpin do: %v", err) + } + defer resp5.Body.Close() + if resp5.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp5.Body) + t.Fatalf("unpin status: %d, body: %s", resp5.StatusCode, string(body)) + } +} + +func TestGateway_Storage_UploadJSON(t *testing.T) { + key := requireAPIKey(t) + base := gatewayBaseURL() + + // Test JSON upload with base64 data + content := []byte("test json upload content") + b64 := base64.StdEncoding.EncodeToString(content) + body := fmt.Sprintf(`{"name":"test.json","data":"%s"}`, b64) + + req, _ := http.NewRequest(http.MethodPost, base+"/v1/storage/upload", strings.NewReader(body)) + req.Header = authHeader(key) + resp, err := httpClient().Do(req) + if err != nil { + t.Fatalf("upload json do: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusServiceUnavailable { + t.Skip("IPFS storage not available; skipping storage tests") + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("upload json status: %d, body: %s", resp.StatusCode, string(body)) + } + + var uploadResp struct { + Cid string `json:"cid"` + Name string `json:"name"` + Size int64 `json:"size"` + } + if err := json.NewDecoder(resp.Body).Decode(&uploadResp); err != nil { + t.Fatalf("upload json decode: %v", err) + } + if uploadResp.Cid == "" { + t.Fatalf("upload json returned empty CID") + } + if uploadResp.Name != "test.json" { + t.Fatalf("upload json name mismatch: got %s", uploadResp.Name) + } +} + +func TestGateway_Storage_InvalidCID(t *testing.T) { + key := requireAPIKey(t) + base := gatewayBaseURL() + + // Test status with invalid CID + req, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/status/QmInvalidCID123", nil) + req.Header = authHeader(key) + resp, err := httpClient().Do(req) + if err != nil { + t.Fatalf("status invalid do: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusServiceUnavailable { + t.Skip("IPFS storage not available; skipping storage tests") + } + + // Should return error but not crash + if resp.StatusCode != http.StatusNotFound && resp.StatusCode != http.StatusInternalServerError { + t.Fatalf("expected error status for invalid CID, got %d", resp.StatusCode) + } +} + func toWSURL(httpURL string) string { u, err := url.Parse(httpURL) if err != nil { diff --git a/pkg/cli/setup.go b/pkg/cli/setup.go index bc245d1..c681554 100644 --- a/pkg/cli/setup.go +++ b/pkg/cli/setup.go @@ -1086,26 +1086,15 @@ func installOlric() { if err := os.MkdirAll(olricConfigDir, 0755); err == nil { configPath := olricConfigDir + "/config.yaml" if _, err := os.Stat(configPath); os.IsNotExist(err) { - configContent := `memberlist: - bind-addr: "0.0.0.0" - bind-port: 3322 -client: - bind-addr: "0.0.0.0" - bind-port: 3320 + configContent := `server: + bindAddr: "127.0.0.1" + bindPort: 3320 -# Durability and replication configuration -# Replicates data across entire network for fault tolerance -dmaps: - default: - replication: - mode: sync # Synchronous replication for durability - replica_count: 2 # Replicate to 2 backup nodes (3 total copies: 1 primary + 2 backups) - write_quorum: 2 # Require 2 nodes to acknowledge writes - read_quorum: 1 # Read from 1 node (faster reads) - read_repair: true # Enable read-repair for consistency +memberlist: + environment: local + bindAddr: "127.0.0.1" + bindPort: 3322 -# Split-brain protection -member_count_quorum: 2 # Require at least 2 nodes to operate (prevents split-brain) ` if err := os.WriteFile(configPath, []byte(configContent), 0644); err == nil { exec.Command("chown", "debros:debros", configPath).Run() @@ -1532,6 +1521,17 @@ database: cluster_sync_interval: "30s" peer_inactivity_limit: "24h" min_cluster_size: 1 + ipfs: + # IPFS Cluster API endpoint for pin management (leave empty to disable) + cluster_api_url: "http://localhost:9094" + # IPFS HTTP API endpoint for content retrieval + api_url: "http://localhost:5001" + # Timeout for IPFS operations + timeout: "60s" + # Replication factor for pinned content + replication_factor: 3 + # Enable client-side encryption before upload + enable_encryption: true discovery: bootstrap_peers: [] @@ -1607,6 +1607,17 @@ database: cluster_sync_interval: "30s" peer_inactivity_limit: "24h" min_cluster_size: 1 + ipfs: + # IPFS Cluster API endpoint for pin management (leave empty to disable) + cluster_api_url: "http://localhost:9094" + # IPFS HTTP API endpoint for content retrieval + api_url: "http://localhost:5001" + # Timeout for IPFS operations + timeout: "60s" + # Replication factor for pinned content + replication_factor: 3 + # Enable client-side encryption before upload + enable_encryption: true discovery: %s @@ -1670,13 +1681,23 @@ func generateGatewayConfigDirect(bootstrapPeers string, enableHTTPS bool, domain olricYAML.WriteString(" - \"localhost:3320\"\n") } + // IPFS Cluster configuration (defaults - can be customized later) + ipfsYAML := `# IPFS Cluster configuration (optional) +# Uncomment and configure if you have IPFS Cluster running: +# ipfs_cluster_api_url: "http://localhost:9094" +# ipfs_api_url: "http://localhost:5001" +# ipfs_timeout: "60s" +# ipfs_replication_factor: 3 +` + return fmt.Sprintf(`listen_addr: ":6001" client_namespace: "default" rqlite_dsn: "" %s %s %s -`, peersYAML.String(), httpsYAML.String(), olricYAML.String()) +%s +`, peersYAML.String(), httpsYAML.String(), olricYAML.String(), ipfsYAML) } // generateOlricConfig generates an Olric configuration file @@ -1689,30 +1710,15 @@ func generateOlricConfig(configPath, bindIP string, httpPort, memberlistPort int } var config strings.Builder + config.WriteString("server:\n") + config.WriteString(fmt.Sprintf(" bindAddr: \"%s\"\n", bindIP)) + config.WriteString(fmt.Sprintf(" bindPort: %d\n", httpPort)) + config.WriteString("\n") config.WriteString("memberlist:\n") - config.WriteString(fmt.Sprintf(" bind-addr: \"%s\"\n", bindIP)) - config.WriteString(fmt.Sprintf(" bind-port: %d\n", memberlistPort)) - config.WriteString(" # Multicast discovery enabled - peers discovered dynamically via LibP2P network\n") - - config.WriteString("client:\n") - config.WriteString(fmt.Sprintf(" bind-addr: \"%s\"\n", bindIP)) - config.WriteString(fmt.Sprintf(" bind-port: %d\n", httpPort)) - - // Durability and replication settings - config.WriteString("\n# Durability and replication configuration\n") - config.WriteString("# Replicates data across entire network for fault tolerance\n") - config.WriteString("dmaps:\n") - config.WriteString(" default:\n") - config.WriteString(" replication:\n") - config.WriteString(" mode: sync # Synchronous replication for durability\n") - config.WriteString(" replica_count: 2 # Replicate to 2 backup nodes (3 total copies: 1 primary + 2 backups)\n") - config.WriteString(" write_quorum: 2 # Require 2 nodes to acknowledge writes\n") - config.WriteString(" read_quorum: 1 # Read from 1 node (faster reads)\n") - config.WriteString(" read_repair: true # Enable read-repair for consistency\n") - - // Split-brain protection - config.WriteString("\n# Split-brain protection\n") - config.WriteString("member_count_quorum: 2 # Require at least 2 nodes to operate (prevents split-brain)\n") + config.WriteString(" environment: local\n") + config.WriteString(fmt.Sprintf(" bindAddr: \"%s\"\n", bindIP)) + config.WriteString(fmt.Sprintf(" bindPort: %d\n", memberlistPort)) + config.WriteString("\n") // Write config file if err := os.WriteFile(configPath, []byte(config.String()), 0644); err != nil { diff --git a/pkg/client/client.go b/pkg/client/client.go index a0b06dd..8a2aa45 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -35,6 +35,7 @@ type Client struct { database *DatabaseClientImpl network *NetworkInfoImpl pubsub *pubSubBridge + storage *StorageClientImpl // State connected bool @@ -70,6 +71,7 @@ func NewClient(config *ClientConfig) (NetworkClient, error) { // Initialize components (will be configured when connected) client.database = &DatabaseClientImpl{client: client} client.network = &NetworkInfoImpl{client: client} + client.storage = &StorageClientImpl{client: client} return client, nil } @@ -89,6 +91,11 @@ func (c *Client) Network() NetworkInfo { return c.network } +// Storage returns the storage client +func (c *Client) Storage() StorageClient { + return c.storage +} + // Config returns a snapshot copy of the client's configuration func (c *Client) Config() *ClientConfig { c.mu.RLock() diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 328a0cd..31cdd9c 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "io" "time" ) @@ -17,6 +18,9 @@ type NetworkClient interface { // Network information Network() NetworkInfo + // Storage operations (IPFS) + Storage() StorageClient + // Lifecycle Connect() error Disconnect() error @@ -51,6 +55,24 @@ type NetworkInfo interface { DisconnectFromPeer(ctx context.Context, peerID string) error } +// StorageClient provides IPFS storage operations +type StorageClient interface { + // Upload uploads content to IPFS and pins it + Upload(ctx context.Context, reader io.Reader, name string) (*StorageUploadResult, error) + + // Pin pins an existing CID + Pin(ctx context.Context, cid string, name string) (*StoragePinResult, error) + + // Status gets the pin status for a CID + Status(ctx context.Context, cid string) (*StorageStatus, error) + + // Get retrieves content from IPFS by CID + Get(ctx context.Context, cid string) (io.ReadCloser, error) + + // Unpin removes a pin from a CID + Unpin(ctx context.Context, cid string) error +} + // MessageHandler is called when a pub/sub message is received type MessageHandler func(topic string, data []byte) error @@ -107,12 +129,38 @@ type HealthStatus struct { ResponseTime time.Duration `json:"response_time"` } +// StorageUploadResult represents the result of uploading content to IPFS +type StorageUploadResult struct { + Cid string `json:"cid"` + Name string `json:"name"` + Size int64 `json:"size"` +} + +// StoragePinResult represents the result of pinning a CID +type StoragePinResult struct { + Cid string `json:"cid"` + Name string `json:"name"` +} + +// StorageStatus represents the status of a pinned CID +type StorageStatus struct { + Cid string `json:"cid"` + Name string `json:"name"` + Status string `json:"status"` // "pinned", "pinning", "queued", "unpinned", "error" + ReplicationMin int `json:"replication_min"` + ReplicationMax int `json:"replication_max"` + ReplicationFactor int `json:"replication_factor"` + Peers []string `json:"peers"` + Error string `json:"error,omitempty"` +} + // ClientConfig represents configuration for network clients type ClientConfig struct { AppName string `json:"app_name"` DatabaseName string `json:"database_name"` BootstrapPeers []string `json:"bootstrap_peers"` DatabaseEndpoints []string `json:"database_endpoints"` + GatewayURL string `json:"gateway_url"` // Gateway URL for HTTP API access (e.g., "http://localhost:6001") ConnectTimeout time.Duration `json:"connect_timeout"` RetryAttempts int `json:"retry_attempts"` RetryDelay time.Duration `json:"retry_delay"` @@ -132,6 +180,7 @@ func DefaultClientConfig(appName string) *ClientConfig { DatabaseName: fmt.Sprintf("%s_db", appName), BootstrapPeers: peers, DatabaseEndpoints: endpoints, + GatewayURL: "http://localhost:6001", ConnectTimeout: time.Second * 30, RetryAttempts: 3, RetryDelay: time.Second * 5, diff --git a/pkg/client/storage_client.go b/pkg/client/storage_client.go new file mode 100644 index 0000000..93cceb3 --- /dev/null +++ b/pkg/client/storage_client.go @@ -0,0 +1,245 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "strings" + "time" +) + +// StorageClientImpl implements StorageClient using HTTP requests to the gateway +type StorageClientImpl struct { + client *Client +} + +// Upload uploads content to IPFS and pins it +func (s *StorageClientImpl) Upload(ctx context.Context, reader io.Reader, name string) (*StorageUploadResult, error) { + if err := s.client.requireAccess(ctx); err != nil { + return nil, fmt.Errorf("authentication required: %w", err) + } + + gatewayURL := s.getGatewayURL() + + // Create multipart form + var buf bytes.Buffer + writer := multipart.NewWriter(&buf) + + // Add file field + part, err := writer.CreateFormFile("file", name) + if err != nil { + return nil, fmt.Errorf("failed to create form file: %w", err) + } + + if _, err := io.Copy(part, reader); err != nil { + return nil, fmt.Errorf("failed to copy data: %w", err) + } + + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("failed to close writer: %w", err) + } + + // Create request + req, err := http.NewRequestWithContext(ctx, "POST", gatewayURL+"/v1/storage/upload", &buf) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", writer.FormDataContentType()) + s.addAuthHeaders(req) + + // Execute request + client := &http.Client{Timeout: 5 * time.Minute} // Large timeout for file uploads + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result StorageUploadResult + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return &result, nil +} + +// Pin pins an existing CID +func (s *StorageClientImpl) Pin(ctx context.Context, cid string, name string) (*StoragePinResult, error) { + if err := s.client.requireAccess(ctx); err != nil { + return nil, fmt.Errorf("authentication required: %w", err) + } + + gatewayURL := s.getGatewayURL() + + reqBody := map[string]interface{}{ + "cid": cid, + } + if name != "" { + reqBody["name"] = name + } + + jsonBody, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", gatewayURL+"/v1/storage/pin", bytes.NewReader(jsonBody)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + s.addAuthHeaders(req) + + client := &http.Client{Timeout: 60 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("pin failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result StoragePinResult + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return &result, nil +} + +// Status gets the pin status for a CID +func (s *StorageClientImpl) Status(ctx context.Context, cid string) (*StorageStatus, error) { + if err := s.client.requireAccess(ctx); err != nil { + return nil, fmt.Errorf("authentication required: %w", err) + } + + gatewayURL := s.getGatewayURL() + + req, err := http.NewRequestWithContext(ctx, "GET", gatewayURL+"/v1/storage/status/"+cid, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + s.addAuthHeaders(req) + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("status failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result StorageStatus + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return &result, nil +} + +// Get retrieves content from IPFS by CID +func (s *StorageClientImpl) Get(ctx context.Context, cid string) (io.ReadCloser, error) { + if err := s.client.requireAccess(ctx); err != nil { + return nil, fmt.Errorf("authentication required: %w", err) + } + + gatewayURL := s.getGatewayURL() + + req, err := http.NewRequestWithContext(ctx, "GET", gatewayURL+"/v1/storage/get/"+cid, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + s.addAuthHeaders(req) + + client := &http.Client{Timeout: 5 * time.Minute} // Large timeout for file downloads + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("get failed with status %d", resp.StatusCode) + } + + return resp.Body, nil +} + +// Unpin removes a pin from a CID +func (s *StorageClientImpl) Unpin(ctx context.Context, cid string) error { + if err := s.client.requireAccess(ctx); err != nil { + return fmt.Errorf("authentication required: %w", err) + } + + gatewayURL := s.getGatewayURL() + + req, err := http.NewRequestWithContext(ctx, "DELETE", gatewayURL+"/v1/storage/unpin/"+cid, nil) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + s.addAuthHeaders(req) + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unpin failed with status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +// getGatewayURL returns the gateway URL from config, defaulting to localhost:6001 +func (s *StorageClientImpl) getGatewayURL() string { + cfg := s.client.Config() + if cfg != nil && cfg.GatewayURL != "" { + return strings.TrimSuffix(cfg.GatewayURL, "/") + } + return "http://localhost:6001" +} + +// addAuthHeaders adds authentication headers to the request +func (s *StorageClientImpl) addAuthHeaders(req *http.Request) { + cfg := s.client.Config() + if cfg == nil { + return + } + + // Prefer JWT if available + if cfg.JWT != "" { + req.Header.Set("Authorization", "Bearer "+cfg.JWT) + return + } + + // Fallback to API key + if cfg.APIKey != "" { + req.Header.Set("Authorization", "Bearer "+cfg.APIKey) + req.Header.Set("X-API-Key", cfg.APIKey) + } +} diff --git a/pkg/client/storage_client_test.go b/pkg/client/storage_client_test.go new file mode 100644 index 0000000..34127e7 --- /dev/null +++ b/pkg/client/storage_client_test.go @@ -0,0 +1,378 @@ +package client + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestStorageClientImpl_Upload(t *testing.T) { + t.Run("success", func(t *testing.T) { + expectedCID := "QmUpload123" + expectedName := "test.txt" + expectedSize := int64(100) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/storage/upload" { + t.Errorf("Expected path '/v1/storage/upload', got %s", r.URL.Path) + } + + // Verify multipart form + if err := r.ParseMultipartForm(32 << 20); err != nil { + t.Errorf("Failed to parse multipart form: %v", err) + return + } + + file, header, err := r.FormFile("file") + if err != nil { + t.Errorf("Failed to get file: %v", err) + return + } + defer file.Close() + + if header.Filename != expectedName { + t.Errorf("Expected filename %s, got %s", expectedName, header.Filename) + } + + response := StorageUploadResult{ + Cid: expectedCID, + Name: expectedName, + Size: expectedSize, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + cfg := &ClientConfig{ + GatewayURL: server.URL, + AppName: "test-app", + APIKey: "ak_test:test-app", // Required for requireAccess check + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + reader := strings.NewReader("test content") + result, err := storage.Upload(context.Background(), reader, expectedName) + if err != nil { + t.Fatalf("Failed to upload: %v", err) + } + + if result.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, result.Cid) + } + if result.Name != expectedName { + t.Errorf("Expected name %s, got %s", expectedName, result.Name) + } + if result.Size != expectedSize { + t.Errorf("Expected size %d, got %d", expectedSize, result.Size) + } + }) + + t.Run("server_error", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("internal error")) + })) + defer server.Close() + + cfg := &ClientConfig{ + GatewayURL: server.URL, + AppName: "test-app", + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + reader := strings.NewReader("test") + _, err := storage.Upload(context.Background(), reader, "test.txt") + if err == nil { + t.Error("Expected error for server error") + } + }) + + t.Run("missing_credentials", func(t *testing.T) { + cfg := &ClientConfig{ + GatewayURL: "http://localhost:6001", + // No AppName, JWT, or APIKey + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + reader := strings.NewReader("test") + _, err := storage.Upload(context.Background(), reader, "test.txt") + if err == nil { + t.Error("Expected error for missing credentials") + } + }) +} + +func TestStorageClientImpl_Pin(t *testing.T) { + t.Run("success", func(t *testing.T) { + expectedCID := "QmPin123" + expectedName := "pinned-file" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/storage/pin" { + t.Errorf("Expected path '/v1/storage/pin', got %s", r.URL.Path) + } + + var reqBody map[string]interface{} + if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil { + t.Errorf("Failed to decode request: %v", err) + return + } + + if reqBody["cid"] != expectedCID { + t.Errorf("Expected CID %s, got %v", expectedCID, reqBody["cid"]) + } + + response := StoragePinResult{ + Cid: expectedCID, + Name: expectedName, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + cfg := &ClientConfig{ + GatewayURL: server.URL, + AppName: "test-app", + APIKey: "ak_test:test-app", // Required for requireAccess check + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + result, err := storage.Pin(context.Background(), expectedCID, expectedName) + if err != nil { + t.Fatalf("Failed to pin: %v", err) + } + + if result.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, result.Cid) + } + if result.Name != expectedName { + t.Errorf("Expected name %s, got %s", expectedName, result.Name) + } + }) +} + +func TestStorageClientImpl_Status(t *testing.T) { + t.Run("success", func(t *testing.T) { + expectedCID := "QmStatus123" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, "/v1/storage/status/") { + t.Errorf("Expected path '/v1/storage/status/', got %s", r.URL.Path) + } + + response := StorageStatus{ + Cid: expectedCID, + Name: "test-file", + Status: "pinned", + ReplicationMin: 3, + ReplicationMax: 3, + ReplicationFactor: 3, + Peers: []string{"peer1", "peer2", "peer3"}, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + cfg := &ClientConfig{ + GatewayURL: server.URL, + AppName: "test-app", + APIKey: "ak_test:test-app", // Required for requireAccess check + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + status, err := storage.Status(context.Background(), expectedCID) + if err != nil { + t.Fatalf("Failed to get status: %v", err) + } + + if status.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, status.Cid) + } + if status.Status != "pinned" { + t.Errorf("Expected status 'pinned', got %s", status.Status) + } + }) +} + +func TestStorageClientImpl_Get(t *testing.T) { + t.Run("success", func(t *testing.T) { + expectedCID := "QmGet123" + expectedContent := "test content" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, "/v1/storage/get/") { + t.Errorf("Expected path '/v1/storage/get/', got %s", r.URL.Path) + } + w.Write([]byte(expectedContent)) + })) + defer server.Close() + + cfg := &ClientConfig{ + GatewayURL: server.URL, + AppName: "test-app", + APIKey: "ak_test:test-app", // Required for requireAccess check + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + reader, err := storage.Get(context.Background(), expectedCID) + if err != nil { + t.Fatalf("Failed to get content: %v", err) + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("Failed to read content: %v", err) + } + + if string(data) != expectedContent { + t.Errorf("Expected content %s, got %s", expectedContent, string(data)) + } + }) +} + +func TestStorageClientImpl_Unpin(t *testing.T) { + t.Run("success", func(t *testing.T) { + expectedCID := "QmUnpin123" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, "/v1/storage/unpin/") { + t.Errorf("Expected path '/v1/storage/unpin/', got %s", r.URL.Path) + } + if r.Method != "DELETE" { + t.Errorf("Expected method DELETE, got %s", r.Method) + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &ClientConfig{ + GatewayURL: server.URL, + AppName: "test-app", + APIKey: "ak_test:test-app", // Required for requireAccess check + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + err := storage.Unpin(context.Background(), expectedCID) + if err != nil { + t.Fatalf("Failed to unpin: %v", err) + } + }) +} + +func TestStorageClientImpl_getGatewayURL(t *testing.T) { + storage := &StorageClientImpl{} + + t.Run("from_config", func(t *testing.T) { + cfg := &ClientConfig{GatewayURL: "http://custom:6001"} + client := &Client{config: cfg} + storage.client = client + + url := storage.getGatewayURL() + if url != "http://custom:6001" { + t.Errorf("Expected 'http://custom:6001', got %s", url) + } + }) + + t.Run("default", func(t *testing.T) { + cfg := &ClientConfig{} + client := &Client{config: cfg} + storage.client = client + + url := storage.getGatewayURL() + if url != "http://localhost:6001" { + t.Errorf("Expected 'http://localhost:6001', got %s", url) + } + }) + + t.Run("nil_config", func(t *testing.T) { + client := &Client{config: nil} + storage.client = client + + url := storage.getGatewayURL() + if url != "http://localhost:6001" { + t.Errorf("Expected 'http://localhost:6001', got %s", url) + } + }) +} + +func TestStorageClientImpl_addAuthHeaders(t *testing.T) { + t.Run("jwt_preferred", func(t *testing.T) { + cfg := &ClientConfig{ + JWT: "test-jwt-token", + APIKey: "test-api-key", + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + req := httptest.NewRequest("POST", "/test", nil) + storage.addAuthHeaders(req) + + auth := req.Header.Get("Authorization") + if auth != "Bearer test-jwt-token" { + t.Errorf("Expected JWT in Authorization header, got %s", auth) + } + }) + + t.Run("apikey_fallback", func(t *testing.T) { + cfg := &ClientConfig{ + APIKey: "test-api-key", + } + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + req := httptest.NewRequest("POST", "/test", nil) + storage.addAuthHeaders(req) + + auth := req.Header.Get("Authorization") + if auth != "Bearer test-api-key" { + t.Errorf("Expected API key in Authorization header, got %s", auth) + } + + apiKey := req.Header.Get("X-API-Key") + if apiKey != "test-api-key" { + t.Errorf("Expected API key in X-API-Key header, got %s", apiKey) + } + }) + + t.Run("no_auth", func(t *testing.T) { + cfg := &ClientConfig{} + client := &Client{config: cfg} + storage := &StorageClientImpl{client: client} + + req := httptest.NewRequest("POST", "/test", nil) + storage.addAuthHeaders(req) + + auth := req.Header.Get("Authorization") + if auth != "" { + t.Errorf("Expected no Authorization header, got %s", auth) + } + }) + + t.Run("nil_config", func(t *testing.T) { + client := &Client{config: nil} + storage := &StorageClientImpl{client: client} + + req := httptest.NewRequest("POST", "/test", nil) + storage.addAuthHeaders(req) + + auth := req.Header.Get("Authorization") + if auth != "" { + t.Errorf("Expected no Authorization header, got %s", auth) + } + }) +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 4314198..4d00115 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -36,7 +36,7 @@ type DatabaseConfig struct { RQLitePort int `yaml:"rqlite_port"` // RQLite HTTP API port RQLiteRaftPort int `yaml:"rqlite_raft_port"` // RQLite Raft consensus port RQLiteJoinAddress string `yaml:"rqlite_join_address"` // Address to join RQLite cluster - + // Dynamic discovery configuration (always enabled) ClusterSyncInterval time.Duration `yaml:"cluster_sync_interval"` // default: 30s PeerInactivityLimit time.Duration `yaml:"peer_inactivity_limit"` // default: 24h @@ -45,6 +45,32 @@ type DatabaseConfig struct { // Olric cache configuration OlricHTTPPort int `yaml:"olric_http_port"` // Olric HTTP API port (default: 3320) OlricMemberlistPort int `yaml:"olric_memberlist_port"` // Olric memberlist port (default: 3322) + + // IPFS storage configuration + IPFS IPFSConfig `yaml:"ipfs"` +} + +// IPFSConfig contains IPFS storage configuration +type IPFSConfig struct { + // ClusterAPIURL is the IPFS Cluster HTTP API URL (e.g., "http://localhost:9094") + // If empty, IPFS storage is disabled for this node + ClusterAPIURL string `yaml:"cluster_api_url"` + + // APIURL is the IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001") + // If empty, defaults to "http://localhost:5001" + APIURL string `yaml:"api_url"` + + // Timeout for IPFS operations + // If zero, defaults to 60 seconds + Timeout time.Duration `yaml:"timeout"` + + // ReplicationFactor is the replication factor for pinned content + // If zero, defaults to 3 + ReplicationFactor int `yaml:"replication_factor"` + + // EnableEncryption enables client-side encryption before upload + // Defaults to true + EnableEncryption bool `yaml:"enable_encryption"` } // DiscoveryConfig contains peer discovery configuration @@ -115,7 +141,7 @@ func DefaultConfig() *Config { RQLitePort: 5001, RQLiteRaftPort: 7001, RQLiteJoinAddress: "", // Empty for bootstrap node - + // Dynamic discovery (always enabled) ClusterSyncInterval: 30 * time.Second, PeerInactivityLimit: 24 * time.Hour, @@ -124,6 +150,15 @@ func DefaultConfig() *Config { // Olric cache configuration OlricHTTPPort: 3320, OlricMemberlistPort: 3322, + + // IPFS storage configuration + IPFS: IPFSConfig{ + ClusterAPIURL: "", // Empty = disabled + APIURL: "http://localhost:5001", + Timeout: 60 * time.Second, + ReplicationFactor: 3, + EnableEncryption: true, + }, }, Discovery: DiscoveryConfig{ BootstrapPeers: []string{}, diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index f941c42..fc2dce1 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -6,11 +6,16 @@ import ( "crypto/rsa" "database/sql" "net" + "os" + "path/filepath" "strconv" + "strings" "sync" "time" "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/config" + "github.com/DeBrosOfficial/network/pkg/ipfs" "github.com/DeBrosOfficial/network/pkg/logging" "github.com/DeBrosOfficial/network/pkg/olric" "github.com/DeBrosOfficial/network/pkg/rqlite" @@ -38,6 +43,13 @@ type Config struct { // Olric cache configuration OlricServers []string // List of Olric server addresses (e.g., ["localhost:3320"]). If empty, defaults to ["localhost:3320"] OlricTimeout time.Duration // Timeout for Olric operations (default: 10s) + + // IPFS Cluster configuration + IPFSClusterAPIURL string // IPFS Cluster HTTP API URL (e.g., "http://localhost:9094"). If empty, gateway will discover from node configs + IPFSAPIURL string // IPFS HTTP API URL for content retrieval (e.g., "http://localhost:5001"). If empty, gateway will discover from node configs + IPFSTimeout time.Duration // Timeout for IPFS operations (default: 60s) + IPFSReplicationFactor int // Replication factor for pins (default: 3) + IPFSEnableEncryption bool // Enable client-side encryption before upload (default: true, discovered from node configs) } type Gateway struct { @@ -56,6 +68,9 @@ type Gateway struct { // Olric cache client olricClient *olric.Client + // IPFS storage client + ipfsClient ipfs.IPFSClient + // Local pub/sub bypass for same-gateway subscribers localSubscribers map[string][]*localSubscriber // topic+namespace -> subscribers mu sync.RWMutex @@ -178,6 +193,80 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { ) } + logger.ComponentInfo(logging.ComponentGeneral, "Initializing IPFS Cluster client...") + + // Discover IPFS endpoints from node configs if not explicitly configured + ipfsClusterURL := cfg.IPFSClusterAPIURL + ipfsAPIURL := cfg.IPFSAPIURL + ipfsTimeout := cfg.IPFSTimeout + ipfsReplicationFactor := cfg.IPFSReplicationFactor + ipfsEnableEncryption := cfg.IPFSEnableEncryption + + if ipfsClusterURL == "" { + logger.ComponentInfo(logging.ComponentGeneral, "IPFS Cluster URL not configured, discovering from node configs...") + discovered := discoverIPFSFromNodeConfigs(logger.Logger) + if discovered.clusterURL != "" { + ipfsClusterURL = discovered.clusterURL + ipfsAPIURL = discovered.apiURL + if discovered.timeout > 0 { + ipfsTimeout = discovered.timeout + } + if discovered.replicationFactor > 0 { + ipfsReplicationFactor = discovered.replicationFactor + } + ipfsEnableEncryption = discovered.enableEncryption + logger.ComponentInfo(logging.ComponentGeneral, "Discovered IPFS endpoints from node configs", + zap.String("cluster_url", ipfsClusterURL), + zap.String("api_url", ipfsAPIURL), + zap.Bool("encryption_enabled", ipfsEnableEncryption)) + } else { + // Fallback to localhost defaults + ipfsClusterURL = "http://localhost:9094" + ipfsAPIURL = "http://localhost:5001" + ipfsEnableEncryption = true // Default to true + logger.ComponentInfo(logging.ComponentGeneral, "No IPFS config found in node configs, using localhost defaults") + } + } + + if ipfsAPIURL == "" { + ipfsAPIURL = "http://localhost:5001" + } + if ipfsTimeout == 0 { + ipfsTimeout = 60 * time.Second + } + if ipfsReplicationFactor == 0 { + ipfsReplicationFactor = 3 + } + if !cfg.IPFSEnableEncryption && !ipfsEnableEncryption { + // Only disable if explicitly set to false in both places + ipfsEnableEncryption = false + } else { + // Default to true if not explicitly disabled + ipfsEnableEncryption = true + } + + ipfsCfg := ipfs.Config{ + ClusterAPIURL: ipfsClusterURL, + Timeout: ipfsTimeout, + } + ipfsClient, ipfsErr := ipfs.NewClient(ipfsCfg, logger.Logger) + if ipfsErr != nil { + logger.ComponentWarn(logging.ComponentGeneral, "failed to initialize IPFS Cluster client; storage endpoints disabled", zap.Error(ipfsErr)) + } else { + gw.ipfsClient = ipfsClient + logger.ComponentInfo(logging.ComponentGeneral, "IPFS Cluster client ready", + zap.String("cluster_api_url", ipfsCfg.ClusterAPIURL), + zap.String("ipfs_api_url", ipfsAPIURL), + zap.Duration("timeout", ipfsCfg.Timeout), + zap.Int("replication_factor", ipfsReplicationFactor), + zap.Bool("encryption_enabled", ipfsEnableEncryption), + ) + } + // Store IPFS settings in gateway for use by handlers + gw.cfg.IPFSAPIURL = ipfsAPIURL + gw.cfg.IPFSReplicationFactor = ipfsReplicationFactor + gw.cfg.IPFSEnableEncryption = ipfsEnableEncryption + logger.ComponentInfo(logging.ComponentGeneral, "Gateway creation completed, returning...") return gw, nil } @@ -204,6 +293,13 @@ func (g *Gateway) Close() { g.logger.ComponentWarn(logging.ComponentGeneral, "error during Olric client close", zap.Error(err)) } } + if g.ipfsClient != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := g.ipfsClient.Close(ctx); err != nil { + g.logger.ComponentWarn(logging.ComponentGeneral, "error during IPFS client close", zap.Error(err)) + } + } } // getLocalSubscribers returns all local subscribers for a given topic and namespace @@ -307,3 +403,77 @@ func discoverOlricServers(networkClient client.NetworkClient, logger *zap.Logger return olricServers } + +// ipfsDiscoveryResult holds discovered IPFS configuration +type ipfsDiscoveryResult struct { + clusterURL string + apiURL string + timeout time.Duration + replicationFactor int + enableEncryption bool +} + +// discoverIPFSFromNodeConfigs discovers IPFS configuration from node.yaml files +// Checks bootstrap.yaml first, then node.yaml, node2.yaml, etc. +func discoverIPFSFromNodeConfigs(logger *zap.Logger) ipfsDiscoveryResult { + homeDir, err := os.UserHomeDir() + if err != nil { + logger.Debug("Failed to get home directory for IPFS discovery", zap.Error(err)) + return ipfsDiscoveryResult{} + } + + configDir := filepath.Join(homeDir, ".debros") + + // Try bootstrap.yaml first, then node.yaml, node2.yaml, etc. + configFiles := []string{"bootstrap.yaml", "node.yaml", "node2.yaml", "node3.yaml"} + + for _, filename := range configFiles { + configPath := filepath.Join(configDir, filename) + data, err := os.ReadFile(configPath) + if err != nil { + continue + } + + var nodeCfg config.Config + if err := config.DecodeStrict(strings.NewReader(string(data)), &nodeCfg); err != nil { + logger.Debug("Failed to parse node config for IPFS discovery", + zap.String("file", filename), zap.Error(err)) + continue + } + + // Check if IPFS is configured + if nodeCfg.Database.IPFS.ClusterAPIURL != "" { + result := ipfsDiscoveryResult{ + clusterURL: nodeCfg.Database.IPFS.ClusterAPIURL, + apiURL: nodeCfg.Database.IPFS.APIURL, + timeout: nodeCfg.Database.IPFS.Timeout, + replicationFactor: nodeCfg.Database.IPFS.ReplicationFactor, + enableEncryption: nodeCfg.Database.IPFS.EnableEncryption, + } + + if result.apiURL == "" { + result.apiURL = "http://localhost:5001" + } + if result.timeout == 0 { + result.timeout = 60 * time.Second + } + if result.replicationFactor == 0 { + result.replicationFactor = 3 + } + // Default encryption to true if not set + if !result.enableEncryption { + result.enableEncryption = true + } + + logger.Info("Discovered IPFS config from node config", + zap.String("file", filename), + zap.String("cluster_url", result.clusterURL), + zap.String("api_url", result.apiURL), + zap.Bool("encryption_enabled", result.enableEncryption)) + + return result + } + } + + return ipfsDiscoveryResult{} +} diff --git a/pkg/gateway/middleware_test.go b/pkg/gateway/middleware_test.go index 51e64e7..91e2b5a 100644 --- a/pkg/gateway/middleware_test.go +++ b/pkg/gateway/middleware_test.go @@ -26,12 +26,3 @@ func TestExtractAPIKey(t *testing.T) { t.Fatalf("got %q", got) } } - -func TestValidateNamespaceParam(t *testing.T) { - g := &Gateway{} - r := httptest.NewRequest(http.MethodGet, "/v1/storage/get?namespace=ns1&key=k", nil) - // no context namespace: should be false - if g.validateNamespaceParam(r) { - t.Fatalf("expected false without context ns") - } -} diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index 25d09a0..7ab103a 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -54,5 +54,12 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/cache/delete", g.cacheDeleteHandler) mux.HandleFunc("/v1/cache/scan", g.cacheScanHandler) + // storage endpoints (IPFS) + mux.HandleFunc("/v1/storage/upload", g.storageUploadHandler) + mux.HandleFunc("/v1/storage/pin", g.storagePinHandler) + mux.HandleFunc("/v1/storage/status/", g.storageStatusHandler) + mux.HandleFunc("/v1/storage/get/", g.storageGetHandler) + mux.HandleFunc("/v1/storage/unpin/", g.storageUnpinHandler) + return g.withMiddleware(mux) } diff --git a/pkg/gateway/storage_handlers.go b/pkg/gateway/storage_handlers.go index 3c283e1..13269e1 100644 --- a/pkg/gateway/storage_handlers.go +++ b/pkg/gateway/storage_handlers.go @@ -1,13 +1,338 @@ package gateway import ( + "bytes" + "context" + "encoding/base64" "encoding/json" + "fmt" + "io" "net/http" + "strings" "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/logging" + "go.uber.org/zap" ) -// Database HTTP handlers +// StorageUploadRequest represents a request to upload content to IPFS +type StorageUploadRequest struct { + Name string `json:"name,omitempty"` + Data string `json:"data,omitempty"` // Base64 encoded data (alternative to multipart) +} + +// StorageUploadResponse represents the response from uploading content +type StorageUploadResponse struct { + Cid string `json:"cid"` + Name string `json:"name"` + Size int64 `json:"size"` +} + +// StoragePinRequest represents a request to pin a CID +type StoragePinRequest struct { + Cid string `json:"cid"` + Name string `json:"name,omitempty"` +} + +// StoragePinResponse represents the response from pinning a CID +type StoragePinResponse struct { + Cid string `json:"cid"` + Name string `json:"name"` +} + +// StorageStatusResponse represents the status of a pinned CID +type StorageStatusResponse struct { + Cid string `json:"cid"` + Name string `json:"name"` + Status string `json:"status"` + ReplicationMin int `json:"replication_min"` + ReplicationMax int `json:"replication_max"` + ReplicationFactor int `json:"replication_factor"` + Peers []string `json:"peers"` + Error string `json:"error,omitempty"` +} + +// storageUploadHandler handles POST /v1/storage/upload +func (g *Gateway) storageUploadHandler(w http.ResponseWriter, r *http.Request) { + if g.ipfsClient == nil { + writeError(w, http.StatusServiceUnavailable, "IPFS storage not available") + return + } + + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + // Get namespace from context + namespace := g.getNamespaceFromContext(r.Context()) + if namespace == "" { + writeError(w, http.StatusUnauthorized, "namespace required") + return + } + + // Get replication factor from config (default: 3) + replicationFactor := g.cfg.IPFSReplicationFactor + if replicationFactor == 0 { + replicationFactor = 3 + } + + // Check if it's multipart/form-data or JSON + contentType := r.Header.Get("Content-Type") + var reader io.Reader + var name string + + if strings.HasPrefix(contentType, "multipart/form-data") { + // Handle multipart upload + if err := r.ParseMultipartForm(32 << 20); err != nil { // 32MB max + writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to parse multipart form: %v", err)) + return + } + + file, header, err := r.FormFile("file") + if err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to get file: %v", err)) + return + } + defer file.Close() + + reader = file + name = header.Filename + } else { + // Handle JSON request with base64 data + var req StorageUploadRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode request: %v", err)) + return + } + + if req.Data == "" { + writeError(w, http.StatusBadRequest, "data field required") + return + } + + // Decode base64 data + data, err := base64Decode(req.Data) + if err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode base64 data: %v", err)) + return + } + + reader = bytes.NewReader(data) + name = req.Name + } + + // Add to IPFS + ctx := r.Context() + addResp, err := g.ipfsClient.Add(ctx, reader, name) + if err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "failed to add content to IPFS", zap.Error(err)) + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to add content: %v", err)) + return + } + + // Pin with replication factor + _, err = g.ipfsClient.Pin(ctx, addResp.Cid, name, replicationFactor) + if err != nil { + g.logger.ComponentWarn(logging.ComponentGeneral, "failed to pin content", zap.Error(err), zap.String("cid", addResp.Cid)) + // Still return success, but log the pin failure + } + + response := StorageUploadResponse{ + Cid: addResp.Cid, + Name: addResp.Name, + Size: addResp.Size, + } + + writeJSON(w, http.StatusOK, response) +} + +// storagePinHandler handles POST /v1/storage/pin +func (g *Gateway) storagePinHandler(w http.ResponseWriter, r *http.Request) { + if g.ipfsClient == nil { + writeError(w, http.StatusServiceUnavailable, "IPFS storage not available") + return + } + + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + var req StoragePinRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode request: %v", err)) + return + } + + if req.Cid == "" { + writeError(w, http.StatusBadRequest, "cid required") + return + } + + // Get replication factor from config (default: 3) + replicationFactor := g.cfg.IPFSReplicationFactor + if replicationFactor == 0 { + replicationFactor = 3 + } + + ctx := r.Context() + pinResp, err := g.ipfsClient.Pin(ctx, req.Cid, req.Name, replicationFactor) + if err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "failed to pin CID", zap.Error(err), zap.String("cid", req.Cid)) + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to pin: %v", err)) + return + } + + // Use name from request if response doesn't have it + name := pinResp.Name + if name == "" { + name = req.Name + } + + response := StoragePinResponse{ + Cid: pinResp.Cid, + Name: name, + } + + writeJSON(w, http.StatusOK, response) +} + +// storageStatusHandler handles GET /v1/storage/status/:cid +func (g *Gateway) storageStatusHandler(w http.ResponseWriter, r *http.Request) { + if g.ipfsClient == nil { + writeError(w, http.StatusServiceUnavailable, "IPFS storage not available") + return + } + + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + // Extract CID from path + path := strings.TrimPrefix(r.URL.Path, "/v1/storage/status/") + if path == "" { + writeError(w, http.StatusBadRequest, "cid required") + return + } + + ctx := r.Context() + status, err := g.ipfsClient.PinStatus(ctx, path) + if err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "failed to get pin status", zap.Error(err), zap.String("cid", path)) + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to get status: %v", err)) + return + } + + response := StorageStatusResponse{ + Cid: status.Cid, + Name: status.Name, + Status: status.Status, + ReplicationMin: status.ReplicationMin, + ReplicationMax: status.ReplicationMax, + ReplicationFactor: status.ReplicationFactor, + Peers: status.Peers, + Error: status.Error, + } + + writeJSON(w, http.StatusOK, response) +} + +// storageGetHandler handles GET /v1/storage/get/:cid +func (g *Gateway) storageGetHandler(w http.ResponseWriter, r *http.Request) { + if g.ipfsClient == nil { + writeError(w, http.StatusServiceUnavailable, "IPFS storage not available") + return + } + + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + // Extract CID from path + path := strings.TrimPrefix(r.URL.Path, "/v1/storage/get/") + if path == "" { + writeError(w, http.StatusBadRequest, "cid required") + return + } + + // Get namespace from context + namespace := g.getNamespaceFromContext(r.Context()) + if namespace == "" { + writeError(w, http.StatusUnauthorized, "namespace required") + return + } + + // Get IPFS API URL from config + ipfsAPIURL := g.cfg.IPFSAPIURL + if ipfsAPIURL == "" { + ipfsAPIURL = "http://localhost:5001" + } + + ctx := r.Context() + reader, err := g.ipfsClient.Get(ctx, path, ipfsAPIURL) + if err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "failed to get content from IPFS", zap.Error(err), zap.String("cid", path)) + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to get content: %v", err)) + return + } + defer reader.Close() + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", path)) + + if _, err := io.Copy(w, reader); err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "failed to write content", zap.Error(err)) + } +} + +// storageUnpinHandler handles DELETE /v1/storage/unpin/:cid +func (g *Gateway) storageUnpinHandler(w http.ResponseWriter, r *http.Request) { + if g.ipfsClient == nil { + writeError(w, http.StatusServiceUnavailable, "IPFS storage not available") + return + } + + if r.Method != http.MethodDelete { + writeError(w, http.StatusMethodNotAllowed, "method not allowed") + return + } + + // Extract CID from path + path := strings.TrimPrefix(r.URL.Path, "/v1/storage/unpin/") + if path == "" { + writeError(w, http.StatusBadRequest, "cid required") + return + } + + ctx := r.Context() + if err := g.ipfsClient.Unpin(ctx, path); err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "failed to unpin CID", zap.Error(err), zap.String("cid", path)) + writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to unpin: %v", err)) + return + } + + writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "cid": path}) +} + +// base64Decode decodes base64 string to bytes +func base64Decode(s string) ([]byte, error) { + return base64.StdEncoding.DecodeString(s) +} + +// getNamespaceFromContext extracts namespace from request context +func (g *Gateway) getNamespaceFromContext(ctx context.Context) string { + if v := ctx.Value(ctxKeyNamespaceOverride); v != nil { + if s, ok := v.(string); ok && s != "" { + return s + } + } + return "" +} + +// Network HTTP handlers func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) { if g.client == nil { @@ -84,17 +409,3 @@ func (g *Gateway) networkDisconnectHandler(w http.ResponseWriter, r *http.Reques } writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) } - -func (g *Gateway) validateNamespaceParam(r *http.Request) bool { - qns := r.URL.Query().Get("namespace") - if qns == "" { - return true - } - if v := r.Context().Value(ctxKeyNamespaceOverride); v != nil { - if s, ok := v.(string); ok && s != "" { - return s == qns - } - } - // If no namespace in context, disallow explicit namespace param - return false -} diff --git a/pkg/gateway/storage_handlers_test.go b/pkg/gateway/storage_handlers_test.go new file mode 100644 index 0000000..30dd839 --- /dev/null +++ b/pkg/gateway/storage_handlers_test.go @@ -0,0 +1,554 @@ +package gateway + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "io" + "mime/multipart" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/DeBrosOfficial/network/pkg/ipfs" + "github.com/DeBrosOfficial/network/pkg/logging" +) + +// mockIPFSClient is a mock implementation of ipfs.IPFSClient for testing +type mockIPFSClient struct { + addFunc func(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) + pinFunc func(ctx context.Context, cid string, name string, replicationFactor int) (*ipfs.PinResponse, error) + pinStatusFunc func(ctx context.Context, cid string) (*ipfs.PinStatus, error) + getFunc func(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error) + unpinFunc func(ctx context.Context, cid string) error +} + +func (m *mockIPFSClient) Add(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) { + if m.addFunc != nil { + return m.addFunc(ctx, reader, name) + } + return &ipfs.AddResponse{Cid: "QmTest123", Name: name, Size: 100}, nil +} + +func (m *mockIPFSClient) Pin(ctx context.Context, cid string, name string, replicationFactor int) (*ipfs.PinResponse, error) { + if m.pinFunc != nil { + return m.pinFunc(ctx, cid, name, replicationFactor) + } + return &ipfs.PinResponse{Cid: cid, Name: name}, nil +} + +func (m *mockIPFSClient) PinStatus(ctx context.Context, cid string) (*ipfs.PinStatus, error) { + if m.pinStatusFunc != nil { + return m.pinStatusFunc(ctx, cid) + } + return &ipfs.PinStatus{ + Cid: cid, + Name: "test", + Status: "pinned", + ReplicationMin: 3, + ReplicationMax: 3, + ReplicationFactor: 3, + Peers: []string{"peer1", "peer2", "peer3"}, + }, nil +} + +func (m *mockIPFSClient) Get(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error) { + if m.getFunc != nil { + return m.getFunc(ctx, cid, ipfsAPIURL) + } + return io.NopCloser(strings.NewReader("test content")), nil +} + +func (m *mockIPFSClient) Unpin(ctx context.Context, cid string) error { + if m.unpinFunc != nil { + return m.unpinFunc(ctx, cid) + } + return nil +} + +func (m *mockIPFSClient) Health(ctx context.Context) error { + return nil +} + +func (m *mockIPFSClient) Close(ctx context.Context) error { + return nil +} + +func newTestGatewayWithIPFS(t *testing.T, ipfsClient ipfs.IPFSClient) *Gateway { + logger, err := logging.NewColoredLogger(logging.ComponentGeneral, true) + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + + cfg := &Config{ + ListenAddr: ":6001", + ClientNamespace: "test", + IPFSReplicationFactor: 3, + IPFSEnableEncryption: true, + IPFSAPIURL: "http://localhost:5001", + } + + gw := &Gateway{ + logger: logger, + cfg: cfg, + } + + if ipfsClient != nil { + gw.ipfsClient = ipfsClient + } + + return gw +} + +func TestStorageUploadHandler_MissingIPFSClient(t *testing.T) { + gw := newTestGatewayWithIPFS(t, nil) + + req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", nil) + ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns") + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + gw.storageUploadHandler(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Errorf("Expected status %d, got %d", http.StatusServiceUnavailable, w.Code) + } +} + +func TestStorageUploadHandler_MethodNotAllowed(t *testing.T) { + gw := newTestGatewayWithIPFS(t, &mockIPFSClient{}) + + req := httptest.NewRequest(http.MethodGet, "/v1/storage/upload", nil) + ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns") + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + gw.storageUploadHandler(w, req) + + if w.Code != http.StatusMethodNotAllowed { + t.Errorf("Expected status %d, got %d", http.StatusMethodNotAllowed, w.Code) + } +} + +func TestStorageUploadHandler_MissingNamespace(t *testing.T) { + gw := newTestGatewayWithIPFS(t, &mockIPFSClient{}) + + req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", nil) + w := httptest.NewRecorder() + + gw.storageUploadHandler(w, req) + + if w.Code != http.StatusUnauthorized { + t.Errorf("Expected status %d, got %d", http.StatusUnauthorized, w.Code) + } +} + +func TestStorageUploadHandler_MultipartUpload(t *testing.T) { + expectedCID := "QmTest456" + expectedName := "test.txt" + expectedSize := int64(200) + + mockClient := &mockIPFSClient{ + addFunc: func(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) { + // Read and verify content + data, _ := io.ReadAll(reader) + if len(data) == 0 { + return nil, io.ErrUnexpectedEOF + } + return &ipfs.AddResponse{ + Cid: expectedCID, + Name: name, + Size: expectedSize, + }, nil + }, + } + + gw := newTestGatewayWithIPFS(t, mockClient) + + var buf bytes.Buffer + writer := multipart.NewWriter(&buf) + part, _ := writer.CreateFormFile("file", expectedName) + part.Write([]byte("test file content")) + writer.Close() + + req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", &buf) + req.Header.Set("Content-Type", writer.FormDataContentType()) + ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns") + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + gw.storageUploadHandler(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var resp StorageUploadResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if resp.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid) + } + if resp.Name != expectedName { + t.Errorf("Expected name %s, got %s", expectedName, resp.Name) + } + if resp.Size != expectedSize { + t.Errorf("Expected size %d, got %d", expectedSize, resp.Size) + } +} + +func TestStorageUploadHandler_JSONUpload(t *testing.T) { + expectedCID := "QmTest789" + expectedName := "test.json" + testData := []byte("test json data") + base64Data := base64.StdEncoding.EncodeToString(testData) + + mockClient := &mockIPFSClient{ + addFunc: func(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) { + data, _ := io.ReadAll(reader) + if string(data) != string(testData) { + return nil, io.ErrUnexpectedEOF + } + return &ipfs.AddResponse{ + Cid: expectedCID, + Name: name, + Size: int64(len(testData)), + }, nil + }, + } + + gw := newTestGatewayWithIPFS(t, mockClient) + + reqBody := StorageUploadRequest{ + Name: expectedName, + Data: base64Data, + } + bodyBytes, _ := json.Marshal(reqBody) + + req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns") + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + gw.storageUploadHandler(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var resp StorageUploadResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if resp.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid) + } +} + +func TestStorageUploadHandler_InvalidBase64(t *testing.T) { + gw := newTestGatewayWithIPFS(t, &mockIPFSClient{}) + + reqBody := StorageUploadRequest{ + Name: "test.txt", + Data: "invalid base64!!!", + } + bodyBytes, _ := json.Marshal(reqBody) + + req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", bytes.NewReader(bodyBytes)) + req.Header.Set("Content-Type", "application/json") + ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns") + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + gw.storageUploadHandler(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code) + } +} + +func TestStorageUploadHandler_IPFSError(t *testing.T) { + mockClient := &mockIPFSClient{ + addFunc: func(ctx context.Context, reader io.Reader, name string) (*ipfs.AddResponse, error) { + return nil, io.ErrUnexpectedEOF + }, + } + + gw := newTestGatewayWithIPFS(t, mockClient) + + var buf bytes.Buffer + writer := multipart.NewWriter(&buf) + part, _ := writer.CreateFormFile("file", "test.txt") + part.Write([]byte("test")) + writer.Close() + + req := httptest.NewRequest(http.MethodPost, "/v1/storage/upload", &buf) + req.Header.Set("Content-Type", writer.FormDataContentType()) + ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns") + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + gw.storageUploadHandler(w, req) + + if w.Code != http.StatusInternalServerError { + t.Errorf("Expected status %d, got %d", http.StatusInternalServerError, w.Code) + } +} + +func TestStoragePinHandler_Success(t *testing.T) { + expectedCID := "QmPin123" + expectedName := "pinned-file" + + mockClient := &mockIPFSClient{ + pinFunc: func(ctx context.Context, cid string, name string, replicationFactor int) (*ipfs.PinResponse, error) { + if cid != expectedCID { + return nil, io.ErrUnexpectedEOF + } + if replicationFactor != 3 { + return nil, io.ErrUnexpectedEOF + } + return &ipfs.PinResponse{Cid: cid, Name: name}, nil + }, + } + + gw := newTestGatewayWithIPFS(t, mockClient) + + reqBody := StoragePinRequest{ + Cid: expectedCID, + Name: expectedName, + } + bodyBytes, _ := json.Marshal(reqBody) + + req := httptest.NewRequest(http.MethodPost, "/v1/storage/pin", bytes.NewReader(bodyBytes)) + w := httptest.NewRecorder() + + gw.storagePinHandler(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var resp StoragePinResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if resp.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid) + } + if resp.Name != expectedName { + t.Errorf("Expected name %s, got %s", expectedName, resp.Name) + } +} + +func TestStoragePinHandler_MissingCID(t *testing.T) { + gw := newTestGatewayWithIPFS(t, &mockIPFSClient{}) + + reqBody := StoragePinRequest{} + bodyBytes, _ := json.Marshal(reqBody) + + req := httptest.NewRequest(http.MethodPost, "/v1/storage/pin", bytes.NewReader(bodyBytes)) + w := httptest.NewRecorder() + + gw.storagePinHandler(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code) + } +} + +func TestStorageStatusHandler_Success(t *testing.T) { + expectedCID := "QmStatus123" + mockClient := &mockIPFSClient{ + pinStatusFunc: func(ctx context.Context, cid string) (*ipfs.PinStatus, error) { + return &ipfs.PinStatus{ + Cid: cid, + Name: "test-file", + Status: "pinned", + ReplicationMin: 3, + ReplicationMax: 3, + ReplicationFactor: 3, + Peers: []string{"peer1", "peer2", "peer3"}, + }, nil + }, + } + + gw := newTestGatewayWithIPFS(t, mockClient) + + req := httptest.NewRequest(http.MethodGet, "/v1/storage/status/"+expectedCID, nil) + w := httptest.NewRecorder() + + gw.storageStatusHandler(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var resp StorageStatusResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if resp.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid) + } + if resp.Status != "pinned" { + t.Errorf("Expected status 'pinned', got %s", resp.Status) + } + if resp.ReplicationFactor != 3 { + t.Errorf("Expected replication factor 3, got %d", resp.ReplicationFactor) + } +} + +func TestStorageStatusHandler_MissingCID(t *testing.T) { + gw := newTestGatewayWithIPFS(t, &mockIPFSClient{}) + + req := httptest.NewRequest(http.MethodGet, "/v1/storage/status/", nil) + w := httptest.NewRecorder() + + gw.storageStatusHandler(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code) + } +} + +func TestStorageGetHandler_Success(t *testing.T) { + expectedCID := "QmGet123" + expectedContent := "test content from IPFS" + + mockClient := &mockIPFSClient{ + getFunc: func(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error) { + if cid != expectedCID { + return nil, io.ErrUnexpectedEOF + } + return io.NopCloser(strings.NewReader(expectedContent)), nil + }, + } + + gw := newTestGatewayWithIPFS(t, mockClient) + + req := httptest.NewRequest(http.MethodGet, "/v1/storage/get/"+expectedCID, nil) + ctx := context.WithValue(req.Context(), ctxKeyNamespaceOverride, "test-ns") + req = req.WithContext(ctx) + w := httptest.NewRecorder() + + gw.storageGetHandler(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + if w.Body.String() != expectedContent { + t.Errorf("Expected content %s, got %s", expectedContent, w.Body.String()) + } + + if w.Header().Get("Content-Type") != "application/octet-stream" { + t.Errorf("Expected Content-Type 'application/octet-stream', got %s", w.Header().Get("Content-Type")) + } +} + +func TestStorageGetHandler_MissingNamespace(t *testing.T) { + gw := newTestGatewayWithIPFS(t, &mockIPFSClient{}) + + req := httptest.NewRequest(http.MethodGet, "/v1/storage/get/QmTest123", nil) + w := httptest.NewRecorder() + + gw.storageGetHandler(w, req) + + if w.Code != http.StatusUnauthorized { + t.Errorf("Expected status %d, got %d", http.StatusUnauthorized, w.Code) + } +} + +func TestStorageUnpinHandler_Success(t *testing.T) { + expectedCID := "QmUnpin123" + + mockClient := &mockIPFSClient{ + unpinFunc: func(ctx context.Context, cid string) error { + if cid != expectedCID { + return io.ErrUnexpectedEOF + } + return nil + }, + } + + gw := newTestGatewayWithIPFS(t, mockClient) + + req := httptest.NewRequest(http.MethodDelete, "/v1/storage/unpin/"+expectedCID, nil) + w := httptest.NewRecorder() + + gw.storageUnpinHandler(w, req) + + if w.Code != http.StatusOK { + t.Errorf("Expected status %d, got %d", http.StatusOK, w.Code) + } + + var resp map[string]any + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("Failed to decode response: %v", err) + } + + if resp["cid"] != expectedCID { + t.Errorf("Expected CID %s, got %v", expectedCID, resp["cid"]) + } +} + +func TestStorageUnpinHandler_MissingCID(t *testing.T) { + gw := newTestGatewayWithIPFS(t, &mockIPFSClient{}) + + req := httptest.NewRequest(http.MethodDelete, "/v1/storage/unpin/", nil) + w := httptest.NewRecorder() + + gw.storageUnpinHandler(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("Expected status %d, got %d", http.StatusBadRequest, w.Code) + } +} + +// Test helper functions + +func TestBase64Decode(t *testing.T) { + testData := []byte("test data") + encoded := base64.StdEncoding.EncodeToString(testData) + + decoded, err := base64Decode(encoded) + if err != nil { + t.Fatalf("Failed to decode: %v", err) + } + + if string(decoded) != string(testData) { + t.Errorf("Expected %s, got %s", string(testData), string(decoded)) + } + + // Test invalid base64 + _, err = base64Decode("invalid!!!") + if err == nil { + t.Error("Expected error for invalid base64") + } +} + +func TestGetNamespaceFromContext(t *testing.T) { + gw := newTestGatewayWithIPFS(t, nil) + + // Test with namespace in context + ctx := context.WithValue(context.Background(), ctxKeyNamespaceOverride, "test-ns") + ns := gw.getNamespaceFromContext(ctx) + if ns != "test-ns" { + t.Errorf("Expected 'test-ns', got %s", ns) + } + + // Test without namespace + ctx2 := context.Background() + ns2 := gw.getNamespaceFromContext(ctx2) + if ns2 != "" { + t.Errorf("Expected empty namespace, got %s", ns2) + } +} diff --git a/pkg/ipfs/client.go b/pkg/ipfs/client.go new file mode 100644 index 0000000..b415fd0 --- /dev/null +++ b/pkg/ipfs/client.go @@ -0,0 +1,345 @@ +package ipfs + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "time" + + "go.uber.org/zap" +) + +// IPFSClient defines the interface for IPFS operations +type IPFSClient interface { + Add(ctx context.Context, reader io.Reader, name string) (*AddResponse, error) + Pin(ctx context.Context, cid string, name string, replicationFactor int) (*PinResponse, error) + PinStatus(ctx context.Context, cid string) (*PinStatus, error) + Get(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error) + Unpin(ctx context.Context, cid string) error + Health(ctx context.Context) error + Close(ctx context.Context) error +} + +// Client wraps an IPFS Cluster HTTP API client for storage operations +type Client struct { + apiURL string + httpClient *http.Client + logger *zap.Logger +} + +// Config holds configuration for the IPFS client +type Config struct { + // ClusterAPIURL is the base URL for IPFS Cluster HTTP API (e.g., "http://localhost:9094") + // If empty, defaults to "http://localhost:9094" + ClusterAPIURL string + + // Timeout is the timeout for client operations + // If zero, defaults to 60 seconds + Timeout time.Duration +} + +// PinStatus represents the status of a pinned CID +type PinStatus struct { + Cid string `json:"cid"` + Name string `json:"name"` + Status string `json:"status"` // "pinned", "pinning", "queued", "unpinned", "error" + ReplicationMin int `json:"replication_min"` + ReplicationMax int `json:"replication_max"` + ReplicationFactor int `json:"replication_factor"` + Peers []string `json:"peers"` + Error string `json:"error,omitempty"` +} + +// AddResponse represents the response from adding content to IPFS +type AddResponse struct { + Name string `json:"name"` + Cid string `json:"cid"` + Size int64 `json:"size"` +} + +// PinResponse represents the response from pinning a CID +type PinResponse struct { + Cid string `json:"cid"` + Name string `json:"name"` +} + +// NewClient creates a new IPFS Cluster client wrapper +func NewClient(cfg Config, logger *zap.Logger) (*Client, error) { + apiURL := cfg.ClusterAPIURL + if apiURL == "" { + apiURL = "http://localhost:9094" + } + + timeout := cfg.Timeout + if timeout == 0 { + timeout = 60 * time.Second + } + + httpClient := &http.Client{ + Timeout: timeout, + } + + return &Client{ + apiURL: apiURL, + httpClient: httpClient, + logger: logger, + }, nil +} + +// Health checks if the IPFS Cluster API is healthy +func (c *Client) Health(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, "GET", c.apiURL+"/id", nil) + if err != nil { + return fmt.Errorf("failed to create health check request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("health check request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("health check failed with status: %d", resp.StatusCode) + } + + return nil +} + +// Add adds content to IPFS and returns the CID +func (c *Client) Add(ctx context.Context, reader io.Reader, name string) (*AddResponse, error) { + // Create multipart form request for IPFS Cluster API + var buf bytes.Buffer + writer := multipart.NewWriter(&buf) + + // Create form file field + part, err := writer.CreateFormFile("file", name) + if err != nil { + return nil, fmt.Errorf("failed to create form file: %w", err) + } + + if _, err := io.Copy(part, reader); err != nil { + return nil, fmt.Errorf("failed to copy data: %w", err) + } + + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("failed to close writer: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", c.apiURL+"/add", &buf) + if err != nil { + return nil, fmt.Errorf("failed to create add request: %w", err) + } + + req.Header.Set("Content-Type", writer.FormDataContentType()) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("add request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("add failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result AddResponse + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode add response: %w", err) + } + + return &result, nil +} + +// Pin pins a CID with specified replication factor +func (c *Client) Pin(ctx context.Context, cid string, name string, replicationFactor int) (*PinResponse, error) { + reqBody := map[string]interface{}{ + "cid": cid, + "replication_factor_min": replicationFactor, + "replication_factor_max": replicationFactor, + } + if name != "" { + reqBody["name"] = name + } + + jsonBody, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("failed to marshal pin request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", c.apiURL+"/pins/"+cid, bytes.NewReader(jsonBody)) + if err != nil { + return nil, fmt.Errorf("failed to create pin request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("pin request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("pin failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result PinResponse + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode pin response: %w", err) + } + + // If IPFS Cluster doesn't return the name in the response, use the one from the request + if result.Name == "" && name != "" { + result.Name = name + } + // Ensure CID is set + if result.Cid == "" { + result.Cid = cid + } + + return &result, nil +} + +// PinStatus retrieves the status of a pinned CID +func (c *Client) PinStatus(ctx context.Context, cid string) (*PinStatus, error) { + req, err := http.NewRequestWithContext(ctx, "GET", c.apiURL+"/pins/"+cid, nil) + if err != nil { + return nil, fmt.Errorf("failed to create pin status request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("pin status request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + return nil, fmt.Errorf("pin not found: %s", cid) + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("pin status failed with status %d: %s", resp.StatusCode, string(body)) + } + + // IPFS Cluster returns GlobalPinInfo, we need to map it to our PinStatus + var gpi struct { + Cid string `json:"cid"` + Name string `json:"name"` + PeerMap map[string]struct { + Status interface{} `json:"status"` // TrackerStatus can be string or int + Error string `json:"error,omitempty"` + } `json:"peer_map"` + } + if err := json.NewDecoder(resp.Body).Decode(&gpi); err != nil { + return nil, fmt.Errorf("failed to decode pin status response: %w", err) + } + + // Extract status from peer map (use first peer's status, or aggregate) + status := "unknown" + peers := make([]string, 0, len(gpi.PeerMap)) + var errorMsg string + for peerID, pinInfo := range gpi.PeerMap { + peers = append(peers, peerID) + if pinInfo.Status != nil { + // Convert status to string + if s, ok := pinInfo.Status.(string); ok { + if status == "unknown" || s != "" { + status = s + } + } else if status == "unknown" { + // If status is not a string, try to convert it + status = fmt.Sprintf("%v", pinInfo.Status) + } + } + if pinInfo.Error != "" { + errorMsg = pinInfo.Error + } + } + + // Normalize status string (common IPFS Cluster statuses) + if status == "" || status == "unknown" { + status = "pinned" // Default to pinned if we have peers + if len(peers) == 0 { + status = "unknown" + } + } + + result := &PinStatus{ + Cid: gpi.Cid, + Name: gpi.Name, + Status: status, + ReplicationMin: 0, // Not available in GlobalPinInfo + ReplicationMax: 0, // Not available in GlobalPinInfo + ReplicationFactor: len(peers), + Peers: peers, + Error: errorMsg, + } + + // Ensure CID is set + if result.Cid == "" { + result.Cid = cid + } + + return result, nil +} + +// Unpin removes a pin from a CID +func (c *Client) Unpin(ctx context.Context, cid string) error { + req, err := http.NewRequestWithContext(ctx, "DELETE", c.apiURL+"/pins/"+cid, nil) + if err != nil { + return fmt.Errorf("failed to create unpin request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("unpin request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unpin failed with status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +// Get retrieves content from IPFS by CID +// Note: This uses the IPFS HTTP API (typically on port 5001), not the Cluster API +func (c *Client) Get(ctx context.Context, cid string, ipfsAPIURL string) (io.ReadCloser, error) { + if ipfsAPIURL == "" { + ipfsAPIURL = "http://localhost:5001" + } + + url := fmt.Sprintf("%s/api/v0/cat?arg=%s", ipfsAPIURL, cid) + req, err := http.NewRequestWithContext(ctx, "POST", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create get request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("get request failed: %w", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("get failed with status %d", resp.StatusCode) + } + + return resp.Body, nil +} + +// Close closes the IPFS client connection +func (c *Client) Close(ctx context.Context) error { + // HTTP client doesn't need explicit closing + return nil +} diff --git a/pkg/ipfs/client_test.go b/pkg/ipfs/client_test.go new file mode 100644 index 0000000..344dad1 --- /dev/null +++ b/pkg/ipfs/client_test.go @@ -0,0 +1,483 @@ +package ipfs + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "go.uber.org/zap" +) + +func TestNewClient(t *testing.T) { + logger := zap.NewNop() + + t.Run("default_config", func(t *testing.T) { + cfg := Config{} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + if client.apiURL != "http://localhost:9094" { + t.Errorf("Expected default API URL 'http://localhost:9094', got %s", client.apiURL) + } + + if client.httpClient.Timeout != 60*time.Second { + t.Errorf("Expected default timeout 60s, got %v", client.httpClient.Timeout) + } + }) + + t.Run("custom_config", func(t *testing.T) { + cfg := Config{ + ClusterAPIURL: "http://custom:9094", + Timeout: 30 * time.Second, + } + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + if client.apiURL != "http://custom:9094" { + t.Errorf("Expected API URL 'http://custom:9094', got %s", client.apiURL) + } + + if client.httpClient.Timeout != 30*time.Second { + t.Errorf("Expected timeout 30s, got %v", client.httpClient.Timeout) + } + }) +} + +func TestClient_Add(t *testing.T) { + logger := zap.NewNop() + + t.Run("success", func(t *testing.T) { + expectedCID := "QmTest123" + expectedName := "test.txt" + expectedSize := int64(100) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/add" { + t.Errorf("Expected path '/add', got %s", r.URL.Path) + } + if r.Method != "POST" { + t.Errorf("Expected method POST, got %s", r.Method) + } + + // Verify multipart form + if err := r.ParseMultipartForm(32 << 20); err != nil { + t.Errorf("Failed to parse multipart form: %v", err) + return + } + + file, header, err := r.FormFile("file") + if err != nil { + t.Errorf("Failed to get file: %v", err) + return + } + defer file.Close() + + if header.Filename != expectedName { + t.Errorf("Expected filename %s, got %s", expectedName, header.Filename) + } + + // Read file content + _, _ = io.ReadAll(file) + + response := AddResponse{ + Cid: expectedCID, + Name: expectedName, + Size: expectedSize, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + reader := strings.NewReader("test content") + resp, err := client.Add(context.Background(), reader, expectedName) + if err != nil { + t.Fatalf("Failed to add content: %v", err) + } + + if resp.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid) + } + if resp.Name != expectedName { + t.Errorf("Expected name %s, got %s", expectedName, resp.Name) + } + if resp.Size != expectedSize { + t.Errorf("Expected size %d, got %d", expectedSize, resp.Size) + } + }) + + t.Run("server_error", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("internal error")) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + reader := strings.NewReader("test") + _, err = client.Add(context.Background(), reader, "test.txt") + if err == nil { + t.Error("Expected error for server error") + } + }) +} + +func TestClient_Pin(t *testing.T) { + logger := zap.NewNop() + + t.Run("success", func(t *testing.T) { + expectedCID := "QmPin123" + expectedName := "pinned-file" + expectedReplicationFactor := 3 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, "/pins/") { + t.Errorf("Expected path '/pins/', got %s", r.URL.Path) + } + if r.Method != "POST" { + t.Errorf("Expected method POST, got %s", r.Method) + } + + var reqBody map[string]interface{} + if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil { + t.Errorf("Failed to decode request: %v", err) + return + } + + if reqBody["cid"] != expectedCID { + t.Errorf("Expected CID %s, got %v", expectedCID, reqBody["cid"]) + } + + response := PinResponse{ + Cid: expectedCID, + Name: expectedName, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + resp, err := client.Pin(context.Background(), expectedCID, expectedName, expectedReplicationFactor) + if err != nil { + t.Fatalf("Failed to pin: %v", err) + } + + if resp.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, resp.Cid) + } + if resp.Name != expectedName { + t.Errorf("Expected name %s, got %s", expectedName, resp.Name) + } + }) + + t.Run("accepted_status", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + response := PinResponse{Cid: "QmTest", Name: "test"} + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + _, err = client.Pin(context.Background(), "QmTest", "test", 3) + if err != nil { + t.Errorf("Expected success for Accepted status, got error: %v", err) + } + }) +} + +func TestClient_PinStatus(t *testing.T) { + logger := zap.NewNop() + + t.Run("success", func(t *testing.T) { + expectedCID := "QmStatus123" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, "/pins/") { + t.Errorf("Expected path '/pins/', got %s", r.URL.Path) + } + if r.Method != "GET" { + t.Errorf("Expected method GET, got %s", r.Method) + } + + response := PinStatus{ + Cid: expectedCID, + Name: "test-file", + Status: "pinned", + ReplicationMin: 3, + ReplicationMax: 3, + ReplicationFactor: 3, + Peers: []string{"peer1", "peer2", "peer3"}, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + status, err := client.PinStatus(context.Background(), expectedCID) + if err != nil { + t.Fatalf("Failed to get pin status: %v", err) + } + + if status.Cid != expectedCID { + t.Errorf("Expected CID %s, got %s", expectedCID, status.Cid) + } + if status.Status != "pinned" { + t.Errorf("Expected status 'pinned', got %s", status.Status) + } + if len(status.Peers) != 3 { + t.Errorf("Expected 3 peers, got %d", len(status.Peers)) + } + }) + + t.Run("not_found", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + _, err = client.PinStatus(context.Background(), "QmNotFound") + if err == nil { + t.Error("Expected error for not found") + } + }) +} + +func TestClient_Unpin(t *testing.T) { + logger := zap.NewNop() + + t.Run("success", func(t *testing.T) { + expectedCID := "QmUnpin123" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, "/pins/") { + t.Errorf("Expected path '/pins/', got %s", r.URL.Path) + } + if r.Method != "DELETE" { + t.Errorf("Expected method DELETE, got %s", r.Method) + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + err = client.Unpin(context.Background(), expectedCID) + if err != nil { + t.Fatalf("Failed to unpin: %v", err) + } + }) + + t.Run("accepted_status", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + err = client.Unpin(context.Background(), "QmTest") + if err != nil { + t.Errorf("Expected success for Accepted status, got error: %v", err) + } + }) +} + +func TestClient_Get(t *testing.T) { + logger := zap.NewNop() + + t.Run("success", func(t *testing.T) { + expectedCID := "QmGet123" + expectedContent := "test content from IPFS" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.Contains(r.URL.Path, "/api/v0/cat") { + t.Errorf("Expected path containing '/api/v0/cat', got %s", r.URL.Path) + } + if r.Method != "POST" { + t.Errorf("Expected method POST, got %s", r.Method) + } + + // Verify CID parameter + if !strings.Contains(r.URL.RawQuery, expectedCID) { + t.Errorf("Expected CID %s in query, got %s", expectedCID, r.URL.RawQuery) + } + + w.Write([]byte(expectedContent)) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: "http://localhost:9094"} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + reader, err := client.Get(context.Background(), expectedCID, server.URL) + if err != nil { + t.Fatalf("Failed to get content: %v", err) + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("Failed to read content: %v", err) + } + + if string(data) != expectedContent { + t.Errorf("Expected content %s, got %s", expectedContent, string(data)) + } + }) + + t.Run("not_found", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: "http://localhost:9094"} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + _, err = client.Get(context.Background(), "QmNotFound", server.URL) + if err == nil { + t.Error("Expected error for not found") + } + }) + + t.Run("default_ipfs_api_url", func(t *testing.T) { + expectedCID := "QmDefault" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("content")) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: "http://localhost:9094"} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + // Test with empty IPFS API URL (should use default) + // Note: This will fail because we're using a test server, but it tests the logic + _, err = client.Get(context.Background(), expectedCID, "") + // We expect an error here because default localhost:5001 won't exist + if err == nil { + t.Error("Expected error when using default localhost:5001") + } + }) +} + +func TestClient_Health(t *testing.T) { + logger := zap.NewNop() + + t.Run("success", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/id" { + t.Errorf("Expected path '/id', got %s", r.URL.Path) + } + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"id": "test"}`)) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + err = client.Health(context.Background()) + if err != nil { + t.Fatalf("Failed health check: %v", err) + } + }) + + t.Run("unhealthy", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + cfg := Config{ClusterAPIURL: server.URL} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + err = client.Health(context.Background()) + if err == nil { + t.Error("Expected error for unhealthy status") + } + }) +} + +func TestClient_Close(t *testing.T) { + logger := zap.NewNop() + + cfg := Config{ClusterAPIURL: "http://localhost:9094"} + client, err := NewClient(cfg, logger) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + // Close should not error + err = client.Close(context.Background()) + if err != nil { + t.Errorf("Close should not error, got: %v", err) + } +}