mirror of
https://github.com/DeBrosOfficial/network.git
synced 2025-10-06 10:19:07 +00:00
Delete legacy e2e_test.go and add new gateway e2e tests
This commit is contained in:
parent
e15a547a10
commit
895f1d5dff
108
e2e/e2e_test.go
108
e2e/e2e_test.go
@ -1,108 +0,0 @@
|
||||
//go:build e2e
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.debros.io/DeBros/network/pkg/client"
|
||||
"git.debros.io/DeBros/network/pkg/config"
|
||||
"git.debros.io/DeBros/network/pkg/node"
|
||||
)
|
||||
|
||||
func startNode(t *testing.T, id string, p2pPort, httpPort, raftPort int, dataDir string) *node.Node {
|
||||
// Ensure rqlited is available
|
||||
if _, err := exec.LookPath("rqlited"); err != nil {
|
||||
t.Skip("rqlited not found in PATH; skipping e2e")
|
||||
}
|
||||
|
||||
t.Helper()
|
||||
cfg := config.DefaultConfig()
|
||||
cfg.Node.ID = id
|
||||
cfg.Node.ListenAddresses = []string{fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p2pPort)}
|
||||
cfg.Node.DataDir = dataDir
|
||||
cfg.Database.RQLitePort = httpPort
|
||||
cfg.Database.RQLiteRaftPort = raftPort
|
||||
cfg.Database.RQLiteJoinAddress = ""
|
||||
cfg.Discovery.HttpAdvAddress = "127.0.0.1"
|
||||
cfg.Discovery.RaftAdvAddress = ""
|
||||
cfg.Discovery.BootstrapPeers = nil
|
||||
|
||||
n, err := node.NewNode(cfg)
|
||||
if err != nil { t.Fatalf("new node: %v", err) }
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
if err := n.Start(ctx); err != nil { t.Fatalf("start node: %v", err) }
|
||||
t.Cleanup(func() { _ = n.Stop() })
|
||||
return n
|
||||
}
|
||||
|
||||
func waitUntil(t *testing.T, d time.Duration, cond func() bool, msg string) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(d)
|
||||
for time.Now().Before(deadline) {
|
||||
if cond() { return }
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("timeout: %s", msg)
|
||||
}
|
||||
|
||||
func TestE2E_Nodes_Client_DB_Storage(t *testing.T) {
|
||||
// Start single node
|
||||
n1 := startNode(t, "n1", 4001, 5001, 7001, t.TempDir()+"/n1")
|
||||
|
||||
// Build bootstrap multiaddr with peer ID
|
||||
n1Addr := fmt.Sprintf("/ip4/127.0.0.1/tcp/4001/p2p/%s", n1.GetPeerID())
|
||||
|
||||
// Create client and connect via bootstrap
|
||||
cliCfg := client.DefaultClientConfig("e2e")
|
||||
cliCfg.BootstrapPeers = []string{n1Addr}
|
||||
cliCfg.DatabaseEndpoints = []string{"http://127.0.0.1:5001"}
|
||||
cliCfg.APIKey = "ak_test:default"
|
||||
cliCfg.QuietMode = true
|
||||
c, err := client.NewClient(cliCfg)
|
||||
if err != nil { t.Fatalf("new client: %v", err) }
|
||||
if err := c.Connect(); err != nil { t.Fatalf("client connect: %v", err) }
|
||||
defer c.Disconnect()
|
||||
|
||||
// Wait until client has at least one peer (bootstrap)
|
||||
waitUntil(t, 20*time.Second, func() bool {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
peers, err := c.Network().GetPeers(ctx)
|
||||
return err == nil && len(peers) >= 1
|
||||
}, "client did not connect to any peer")
|
||||
|
||||
// Create kv table for storage service (best-effort)
|
||||
ctx := client.WithInternalAuth(context.Background())
|
||||
_, _ = c.Database().Query(ctx, `CREATE TABLE IF NOT EXISTS kv_storage (
|
||||
namespace TEXT NOT NULL,
|
||||
key TEXT NOT NULL,
|
||||
value BLOB NOT NULL,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (namespace, key)
|
||||
)`)
|
||||
|
||||
// Storage put/get through P2P
|
||||
putCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := c.Storage().Put(putCtx, "e2e:key", []byte("hello")); err != nil {
|
||||
t.Fatalf("storage put: %v", err)
|
||||
}
|
||||
getCtx, cancel2b := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel2b()
|
||||
val, err := c.Storage().Get(getCtx, "e2e:key")
|
||||
if err != nil { t.Fatalf("storage get: %v", err) }
|
||||
if string(val) != "hello" {
|
||||
// Some environments may return base64-encoded text; accept if it decodes to "hello"
|
||||
if dec, derr := base64.StdEncoding.DecodeString(string(val)); derr != nil || string(dec) != "hello" {
|
||||
t.Fatalf("unexpected value: %q", string(val))
|
||||
}
|
||||
}
|
||||
}
|
226
e2e/gateway_e2e_test.go
Normal file
226
e2e/gateway_e2e_test.go
Normal file
@ -0,0 +1,226 @@
|
||||
//go:build e2e
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func getEnv(key, def string) string {
|
||||
if v := strings.TrimSpace(os.Getenv(key)); v != "" {
|
||||
return v
|
||||
}
|
||||
return def
|
||||
}
|
||||
|
||||
func requireAPIKey(t *testing.T) string {
|
||||
t.Helper()
|
||||
key := strings.TrimSpace(os.Getenv("GATEWAY_API_KEY"))
|
||||
if key == "" {
|
||||
t.Skip("GATEWAY_API_KEY not set; skipping gateway auth-required tests")
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func gatewayBaseURL() string {
|
||||
return getEnv("GATEWAY_BASE_URL", "http://127.0.0.1:8080")
|
||||
}
|
||||
|
||||
func httpClient() *http.Client {
|
||||
return &http.Client{Timeout: 10 * time.Second}
|
||||
}
|
||||
|
||||
func authHeader(key string) http.Header {
|
||||
h := http.Header{}
|
||||
h.Set("Authorization", "Bearer "+key)
|
||||
h.Set("Content-Type", "application/json")
|
||||
return h
|
||||
}
|
||||
|
||||
func TestGateway_Health(t *testing.T) {
|
||||
base := gatewayBaseURL()
|
||||
resp, err := httpClient().Get(base + "/v1/health")
|
||||
if err != nil {
|
||||
t.Fatalf("health request error: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", resp.StatusCode)
|
||||
}
|
||||
var body map[string]any
|
||||
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
|
||||
t.Fatalf("decode: %v", err)
|
||||
}
|
||||
if body["status"] != "ok" {
|
||||
t.Fatalf("status not ok: %+v", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGateway_Storage_PutGetListExistsDelete(t *testing.T) {
|
||||
key := requireAPIKey(t)
|
||||
base := gatewayBaseURL()
|
||||
|
||||
// Unique key and payload
|
||||
ts := time.Now().UnixNano()
|
||||
kvKey := fmt.Sprintf("e2e-gw/%d", ts)
|
||||
payload := randomBytes(32)
|
||||
|
||||
// PUT
|
||||
{
|
||||
req, err := http.NewRequest(http.MethodPost, base+"/v1/storage/put?key="+url.QueryEscape(kvKey), strings.NewReader(string(payload)))
|
||||
if err != nil { t.Fatalf("put new req: %v", err) }
|
||||
req.Header = authHeader(key)
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil { t.Fatalf("put do: %v", err) }
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
t.Fatalf("put status: %d", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
// EXISTS
|
||||
{
|
||||
req, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/exists?key="+url.QueryEscape(kvKey), nil)
|
||||
req.Header = authHeader(key)
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil { t.Fatalf("exists do: %v", err) }
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK { t.Fatalf("exists status: %d", resp.StatusCode) }
|
||||
var b struct{ Exists bool `json:"exists"` }
|
||||
if err := json.NewDecoder(resp.Body).Decode(&b); err != nil { t.Fatalf("exists decode: %v", err) }
|
||||
if !b.Exists { t.Fatalf("exists=false for %s", kvKey) }
|
||||
}
|
||||
|
||||
// GET
|
||||
{
|
||||
req, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/get?key="+url.QueryEscape(kvKey), nil)
|
||||
req.Header = authHeader(key)
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil { t.Fatalf("get do: %v", err) }
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK { t.Fatalf("get status: %d", resp.StatusCode) }
|
||||
got := make([]byte, len(payload))
|
||||
n, _ := resp.Body.Read(got)
|
||||
if n == 0 || string(got[:n]) != string(payload[:n]) {
|
||||
t.Fatalf("payload mismatch: want %q got %q", string(payload), string(got[:n]))
|
||||
}
|
||||
}
|
||||
|
||||
// LIST (prefix)
|
||||
{
|
||||
prefix := url.QueryEscape(strings.Split(kvKey, "/")[0])
|
||||
req, _ := http.NewRequest(http.MethodGet, base+"/v1/storage/list?prefix="+prefix, nil)
|
||||
req.Header = authHeader(key)
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil { t.Fatalf("list do: %v", err) }
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK { t.Fatalf("list status: %d", resp.StatusCode) }
|
||||
var b struct{ Keys []string `json:"keys"` }
|
||||
if err := json.NewDecoder(resp.Body).Decode(&b); err != nil { t.Fatalf("list decode: %v", err) }
|
||||
found := false
|
||||
for _, k := range b.Keys { if k == kvKey { found = true; break } }
|
||||
if !found { t.Fatalf("key %s not found in list", kvKey) }
|
||||
}
|
||||
|
||||
// DELETE
|
||||
{
|
||||
req, _ := http.NewRequest(http.MethodPost, base+"/v1/storage/delete?key="+url.QueryEscape(kvKey), nil)
|
||||
req.Header = authHeader(key)
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil { t.Fatalf("delete do: %v", err) }
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK { t.Fatalf("delete status: %d", resp.StatusCode) }
|
||||
}
|
||||
}
|
||||
|
||||
func TestGateway_PubSub_WS_Echo(t *testing.T) {
|
||||
key := requireAPIKey(t)
|
||||
base := gatewayBaseURL()
|
||||
|
||||
topic := fmt.Sprintf("e2e-ws-%d", time.Now().UnixNano())
|
||||
wsURL, hdr := toWSURL(base+"/v1/pubsub/ws?topic="+url.QueryEscape(topic)), http.Header{}
|
||||
hdr.Set("Authorization", "Bearer "+key)
|
||||
|
||||
c, _, err := websocket.DefaultDialer.Dial(wsURL, hdr)
|
||||
if err != nil {
|
||||
t.Fatalf("ws dial: %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
defer c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||
|
||||
msg := []byte("hello-ws")
|
||||
if err := c.WriteMessage(websocket.TextMessage, msg); err != nil {
|
||||
t.Fatalf("ws write: %v", err)
|
||||
}
|
||||
|
||||
_, data, err := c.ReadMessage()
|
||||
if err != nil { t.Fatalf("ws read: %v", err) }
|
||||
if string(data) != string(msg) { t.Fatalf("ws echo mismatch: %q", string(data)) }
|
||||
}
|
||||
|
||||
func TestGateway_PubSub_RestPublishToWS(t *testing.T) {
|
||||
key := requireAPIKey(t)
|
||||
base := gatewayBaseURL()
|
||||
|
||||
topic := fmt.Sprintf("e2e-rest-%d", time.Now().UnixNano())
|
||||
wsURL, hdr := toWSURL(base+"/v1/pubsub/ws?topic="+url.QueryEscape(topic)), http.Header{}
|
||||
hdr.Set("Authorization", "Bearer "+key)
|
||||
c, _, err := websocket.DefaultDialer.Dial(wsURL, hdr)
|
||||
if err != nil { t.Fatalf("ws dial: %v", err) }
|
||||
defer c.Close()
|
||||
|
||||
// Publish via REST
|
||||
payload := randomBytes(24)
|
||||
b64 := base64.StdEncoding.EncodeToString(payload)
|
||||
body := fmt.Sprintf(`{"topic":"%s","data_base64":"%s"}`, topic, b64)
|
||||
req, _ := http.NewRequest(http.MethodPost, base+"/v1/pubsub/publish", strings.NewReader(body))
|
||||
req.Header = authHeader(key)
|
||||
resp, err := httpClient().Do(req)
|
||||
if err != nil { t.Fatalf("publish do: %v", err) }
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK { t.Fatalf("publish status: %d", resp.StatusCode) }
|
||||
|
||||
// Expect the message via WS
|
||||
_ = c.SetReadDeadline(time.Now().Add(5 * time.Second))
|
||||
_, data, err := c.ReadMessage()
|
||||
if err != nil { t.Fatalf("ws read: %v", err) }
|
||||
if string(data) != string(payload) { t.Fatalf("payload mismatch: %q != %q", string(data), string(payload)) }
|
||||
|
||||
// Topics list should include our topic (without namespace prefix)
|
||||
req2, _ := http.NewRequest(http.MethodGet, base+"/v1/pubsub/topics", nil)
|
||||
req2.Header = authHeader(key)
|
||||
resp2, err := httpClient().Do(req2)
|
||||
if err != nil { t.Fatalf("topics do: %v", err) }
|
||||
defer resp2.Body.Close()
|
||||
if resp2.StatusCode != http.StatusOK { t.Fatalf("topics status: %d", resp2.StatusCode) }
|
||||
var tlist struct{ Topics []string `json:"topics"` }
|
||||
if err := json.NewDecoder(resp2.Body).Decode(&tlist); err != nil { t.Fatalf("topics decode: %v", err) }
|
||||
found := false
|
||||
for _, tt := range tlist.Topics { if tt == topic { found = true; break } }
|
||||
if !found { t.Fatalf("topic %s not found in topics list", topic) }
|
||||
}
|
||||
|
||||
func toWSURL(httpURL string) string {
|
||||
u, err := url.Parse(httpURL)
|
||||
if err != nil { return httpURL }
|
||||
if u.Scheme == "https" { u.Scheme = "wss" } else { u.Scheme = "ws" }
|
||||
return u.String()
|
||||
}
|
||||
|
||||
func randomBytes(n int) []byte {
|
||||
b := make([]byte, n)
|
||||
_, _ = rand.Read(b)
|
||||
return b
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user