network/pkg/gateway/storage_handlers.go

280 lines
7.4 KiB
Go

package gateway
import (
"encoding/json"
"io"
"net/http"
"strconv"
"git.debros.io/DeBros/network/pkg/storage"
)
func (g *Gateway) storageHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key' query parameter")
return
}
ctx := r.Context()
switch r.Method {
case http.MethodGet:
val, err := g.client.Storage().Get(ctx, key)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(val)
return
case http.MethodPut:
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, http.StatusBadRequest, "failed to read body")
return
}
if err := g.client.Storage().Put(ctx, key, b); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{
"status": "ok",
"key": key,
"size": len(b),
})
return
case http.MethodOptions:
w.WriteHeader(http.StatusNoContent)
return
default:
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
}
func (g *Gateway) networkStatusHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
status, err := g.client.Network().GetStatus(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, status)
}
func (g *Gateway) networkPeersHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
ctx := r.Context()
peers, err := g.client.Network().GetPeers(ctx)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, peers)
}
func (g *Gateway) storageGetHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key'")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
val, err := g.client.Storage().Get(r.Context(), key)
if err != nil {
writeError(w, http.StatusNotFound, err.Error())
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(val)
}
func (g *Gateway) storagePutHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key'")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
defer r.Body.Close()
b, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, http.StatusBadRequest, "failed to read body")
return
}
if err := g.client.Storage().Put(r.Context(), key, b); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, map[string]any{"status": "ok", "key": key, "size": len(b)})
}
func (g *Gateway) storageDeleteHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
key := r.URL.Query().Get("key")
if key == "" {
var body struct {
Key string `json:"key"`
Namespace string `json:"namespace"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err == nil {
key = body.Key
}
}
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key'")
return
}
if err := g.client.Storage().Delete(r.Context(), key); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok", "key": key})
}
func (g *Gateway) storageListHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
prefix := r.URL.Query().Get("prefix")
limitStr := r.URL.Query().Get("limit")
limit := 100
if limitStr != "" {
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 {
limit = n
}
}
keys, err := g.client.Storage().List(r.Context(), prefix, limit)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"keys": keys})
}
func (g *Gateway) storageExistsHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if !g.validateNamespaceParam(r) {
writeError(w, http.StatusForbidden, "namespace mismatch")
return
}
key := r.URL.Query().Get("key")
if key == "" {
writeError(w, http.StatusBadRequest, "missing 'key'")
return
}
exists, err := g.client.Storage().Exists(r.Context(), key)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"exists": exists})
}
func (g *Gateway) networkConnectHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var body struct {
Multiaddr string `json:"multiaddr"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Multiaddr == "" {
writeError(w, http.StatusBadRequest, "invalid body: expected {multiaddr}")
return
}
if err := g.client.Network().ConnectToPeer(r.Context(), body.Multiaddr); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
func (g *Gateway) networkDisconnectHandler(w http.ResponseWriter, r *http.Request) {
if g.client == nil {
writeError(w, http.StatusServiceUnavailable, "client not initialized")
return
}
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "method not allowed")
return
}
var body struct {
PeerID string `json:"peer_id"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.PeerID == "" {
writeError(w, http.StatusBadRequest, "invalid body: expected {peer_id}")
return
}
if err := g.client.Network().DisconnectFromPeer(r.Context(), body.PeerID); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
func (g *Gateway) validateNamespaceParam(r *http.Request) bool {
qns := r.URL.Query().Get("namespace")
if qns == "" {
return true
}
if v := r.Context().Value(storage.CtxKeyNamespaceOverride); v != nil {
if s, ok := v.(string); ok && s != "" {
return s == qns
}
}
// If no namespace in context, disallow explicit namespace param
return false
}