From d4f5f3b9998db67eb3fc03d3e5b89c3772f6bc1b Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Wed, 28 Jan 2026 14:30:28 +0200 Subject: [PATCH] added more tests --- e2e/auth_negative_test.go | 273 +++++++++++--------- e2e/data_persistence_test.go | 461 +++++++++++++++++++++++++++++++++ e2e/env.go | 145 +++++------ e2e/olric_cluster_test.go | 414 ++++++++++++++++++++++++++++++ e2e/rqlite_cluster_test.go | 478 +++++++++++++++++++++++++++++++++++ 5 files changed, 1582 insertions(+), 189 deletions(-) create mode 100644 e2e/data_persistence_test.go create mode 100644 e2e/olric_cluster_test.go create mode 100644 e2e/rqlite_cluster_test.go diff --git a/e2e/auth_negative_test.go b/e2e/auth_negative_test.go index 130dc63..fc6684c 100644 --- a/e2e/auth_negative_test.go +++ b/e2e/auth_negative_test.go @@ -8,29 +8,33 @@ import ( "testing" "time" "unicode" + + "github.com/stretchr/testify/require" ) +// ============================================================================= +// STRICT AUTHENTICATION NEGATIVE TESTS +// These tests verify that authentication is properly enforced. +// Tests FAIL if unauthenticated/invalid requests are allowed through. +// ============================================================================= + func TestAuth_MissingAPIKey(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // Request without auth headers - req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/network/status", nil) - if err != nil { - t.Fatalf("failed to create request: %v", err) - } + // Request protected endpoint without auth headers + req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/cache/health", nil) + require.NoError(t, err, "FAIL: Could not create request") client := NewHTTPClient(30 * time.Second) resp, err := client.Do(req) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") defer resp.Body.Close() - // Should be unauthorized - if resp.StatusCode != http.StatusUnauthorized && resp.StatusCode != http.StatusForbidden { - t.Logf("warning: expected 401/403 for missing auth, got %d (auth may not be enforced on this endpoint)", resp.StatusCode) - } + // STRICT: Must reject requests without authentication + require.True(t, resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden, + "FAIL: Protected endpoint allowed request without auth - expected 401/403, got %d", resp.StatusCode) + t.Logf(" ✓ Missing API key correctly rejected with status %d", resp.StatusCode) } func TestAuth_InvalidAPIKey(t *testing.T) { @@ -39,23 +43,19 @@ func TestAuth_InvalidAPIKey(t *testing.T) { // Request with invalid API key req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/cache/health", nil) - if err != nil { - t.Fatalf("failed to create request: %v", err) - } + require.NoError(t, err, "FAIL: Could not create request") - req.Header.Set("Authorization", "Bearer invalid-key-xyz") + req.Header.Set("Authorization", "Bearer invalid-key-xyz-123456789") client := NewHTTPClient(30 * time.Second) resp, err := client.Do(req) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") defer resp.Body.Close() - // Should be unauthorized - if resp.StatusCode != http.StatusUnauthorized && resp.StatusCode != http.StatusForbidden { - t.Logf("warning: expected 401/403 for invalid key, got %d", resp.StatusCode) - } + // STRICT: Must reject invalid API keys + require.True(t, resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden, + "FAIL: Invalid API key was accepted - expected 401/403, got %d", resp.StatusCode) + t.Logf(" ✓ Invalid API key correctly rejected with status %d", resp.StatusCode) } func TestAuth_CacheWithoutAuth(t *testing.T) { @@ -70,14 +70,12 @@ func TestAuth_CacheWithoutAuth(t *testing.T) { } _, status, err := req.Do(ctx) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") - // Should fail with 401 or 403 - if status != http.StatusUnauthorized && status != http.StatusForbidden { - t.Logf("warning: expected 401/403 for cache without auth, got %d", status) - } + // STRICT: Cache endpoint must require authentication + require.True(t, status == http.StatusUnauthorized || status == http.StatusForbidden, + "FAIL: Cache endpoint accessible without auth - expected 401/403, got %d", status) + t.Logf(" ✓ Cache endpoint correctly requires auth (status %d)", status) } func TestAuth_StorageWithoutAuth(t *testing.T) { @@ -92,14 +90,12 @@ func TestAuth_StorageWithoutAuth(t *testing.T) { } _, status, err := req.Do(ctx) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") - // Should fail with 401 or 403 - if status != http.StatusUnauthorized && status != http.StatusForbidden { - t.Logf("warning: expected 401/403 for storage without auth, got %d", status) - } + // STRICT: Storage endpoint must require authentication + require.True(t, status == http.StatusUnauthorized || status == http.StatusForbidden, + "FAIL: Storage endpoint accessible without auth - expected 401/403, got %d", status) + t.Logf(" ✓ Storage endpoint correctly requires auth (status %d)", status) } func TestAuth_RQLiteWithoutAuth(t *testing.T) { @@ -114,71 +110,54 @@ func TestAuth_RQLiteWithoutAuth(t *testing.T) { } _, status, err := req.Do(ctx) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") - // Should fail with 401 or 403 - if status != http.StatusUnauthorized && status != http.StatusForbidden { - t.Logf("warning: expected 401/403 for rqlite without auth, got %d", status) - } + // STRICT: RQLite endpoint must require authentication + require.True(t, status == http.StatusUnauthorized || status == http.StatusForbidden, + "FAIL: RQLite endpoint accessible without auth - expected 401/403, got %d", status) + t.Logf(" ✓ RQLite endpoint correctly requires auth (status %d)", status) } func TestAuth_MalformedBearerToken(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // Request with malformed bearer token + // Request with malformed bearer token (missing "Bearer " prefix) req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/cache/health", nil) - if err != nil { - t.Fatalf("failed to create request: %v", err) - } + require.NoError(t, err, "FAIL: Could not create request") - // Missing "Bearer " prefix - req.Header.Set("Authorization", "invalid-token-format") + req.Header.Set("Authorization", "invalid-token-format-no-bearer") client := NewHTTPClient(30 * time.Second) resp, err := client.Do(req) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") defer resp.Body.Close() - // Should be unauthorized - if resp.StatusCode != http.StatusUnauthorized && resp.StatusCode != http.StatusForbidden { - t.Logf("warning: expected 401/403 for malformed token, got %d", resp.StatusCode) - } + // STRICT: Must reject malformed authorization headers + require.True(t, resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden, + "FAIL: Malformed auth header accepted - expected 401/403, got %d", resp.StatusCode) + t.Logf(" ✓ Malformed bearer token correctly rejected (status %d)", resp.StatusCode) } func TestAuth_ExpiredJWT(t *testing.T) { - // Skip if JWT is not being used - if GetJWT() == "" && GetAPIKey() == "" { - t.Skip("No JWT or API key configured") - } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // This test would require an expired JWT token - // For now, test with a clearly invalid JWT structure + // Test with a clearly invalid JWT structure req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/cache/health", nil) - if err != nil { - t.Fatalf("failed to create request: %v", err) - } + require.NoError(t, err, "FAIL: Could not create request") - req.Header.Set("Authorization", "Bearer expired.jwt.token") + req.Header.Set("Authorization", "Bearer expired.jwt.token.invalid") client := NewHTTPClient(30 * time.Second) resp, err := client.Do(req) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") defer resp.Body.Close() - // Should be unauthorized - if resp.StatusCode != http.StatusUnauthorized && resp.StatusCode != http.StatusForbidden { - t.Logf("warning: expected 401/403 for expired JWT, got %d", resp.StatusCode) - } + // STRICT: Must reject invalid/expired JWT tokens + require.True(t, resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden, + "FAIL: Invalid JWT accepted - expected 401/403, got %d", resp.StatusCode) + t.Logf(" ✓ Invalid JWT correctly rejected (status %d)", resp.StatusCode) } func TestAuth_EmptyBearerToken(t *testing.T) { @@ -187,30 +166,30 @@ func TestAuth_EmptyBearerToken(t *testing.T) { // Request with empty bearer token req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/cache/health", nil) - if err != nil { - t.Fatalf("failed to create request: %v", err) - } + require.NoError(t, err, "FAIL: Could not create request") req.Header.Set("Authorization", "Bearer ") client := NewHTTPClient(30 * time.Second) resp, err := client.Do(req) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") defer resp.Body.Close() - // Should be unauthorized - if resp.StatusCode != http.StatusUnauthorized && resp.StatusCode != http.StatusForbidden { - t.Logf("warning: expected 401/403 for empty token, got %d", resp.StatusCode) - } + // STRICT: Must reject empty bearer tokens + require.True(t, resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden, + "FAIL: Empty bearer token accepted - expected 401/403, got %d", resp.StatusCode) + t.Logf(" ✓ Empty bearer token correctly rejected (status %d)", resp.StatusCode) } func TestAuth_DuplicateAuthHeaders(t *testing.T) { + if GetAPIKey() == "" { + t.Skip("No API key configured") + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // Request with both API key and invalid JWT + // Request with both valid API key in Authorization header req := &HTTPRequest{ Method: http.MethodGet, URL: GetGatewayURL() + "/v1/cache/health", @@ -221,74 +200,132 @@ func TestAuth_DuplicateAuthHeaders(t *testing.T) { } _, status, err := req.Do(ctx) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") - // Should succeed if API key is valid - if status != http.StatusOK { - t.Logf("request with both headers returned %d", status) - } + // Should succeed since we have a valid API key + require.Equal(t, http.StatusOK, status, + "FAIL: Valid API key rejected when multiple auth headers present - got %d", status) + t.Logf(" ✓ Duplicate auth headers with valid key succeeds (status %d)", status) } func TestAuth_CaseSensitiveAPIKey(t *testing.T) { - if GetAPIKey() == "" { + apiKey := GetAPIKey() + if apiKey == "" { t.Skip("No API key configured") } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // Request with incorrectly cased API key - apiKey := GetAPIKey() + // Create incorrectly cased API key incorrectKey := "" for i, ch := range apiKey { if i%2 == 0 && unicode.IsLetter(ch) { - incorrectKey += string(unicode.ToUpper(ch)) // Convert to uppercase + if unicode.IsLower(ch) { + incorrectKey += string(unicode.ToUpper(ch)) + } else { + incorrectKey += string(unicode.ToLower(ch)) + } } else { incorrectKey += string(ch) } } - req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/cache/health", nil) - if err != nil { - t.Fatalf("failed to create request: %v", err) + // Skip if the key didn't change (no letters) + if incorrectKey == apiKey { + t.Skip("API key has no letters to change case") } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/cache/health", nil) + require.NoError(t, err, "FAIL: Could not create request") + req.Header.Set("Authorization", "Bearer "+incorrectKey) client := NewHTTPClient(30 * time.Second) resp, err := client.Do(req) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") defer resp.Body.Close() - // API keys should be case-sensitive - if resp.StatusCode == http.StatusOK { - t.Logf("warning: API key check may not be case-sensitive (got 200)") - } + // STRICT: API keys MUST be case-sensitive + require.True(t, resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden, + "FAIL: API key check is not case-sensitive - modified key accepted with status %d", resp.StatusCode) + t.Logf(" ✓ Case-modified API key correctly rejected (status %d)", resp.StatusCode) } func TestAuth_HealthEndpointNoAuth(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // Health endpoint at /health should not require auth + // Health endpoint at /v1/health should NOT require auth req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/health", nil) - if err != nil { - t.Fatalf("failed to create request: %v", err) - } + require.NoError(t, err, "FAIL: Could not create request") client := NewHTTPClient(30 * time.Second) resp, err := client.Do(req) - if err != nil { - t.Fatalf("request failed: %v", err) - } + require.NoError(t, err, "FAIL: Request failed") defer resp.Body.Close() - // Should succeed without auth - if resp.StatusCode != http.StatusOK { - t.Fatalf("expected 200 for /health without auth, got %d", resp.StatusCode) - } + // Health endpoint should be publicly accessible + require.Equal(t, http.StatusOK, resp.StatusCode, + "FAIL: Health endpoint should not require auth - expected 200, got %d", resp.StatusCode) + t.Logf(" ✓ Health endpoint correctly accessible without auth") +} + +func TestAuth_StatusEndpointNoAuth(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Status endpoint at /v1/status should NOT require auth + req, err := http.NewRequestWithContext(ctx, http.MethodGet, GetGatewayURL()+"/v1/status", nil) + require.NoError(t, err, "FAIL: Could not create request") + + client := NewHTTPClient(30 * time.Second) + resp, err := client.Do(req) + require.NoError(t, err, "FAIL: Request failed") + defer resp.Body.Close() + + // Status endpoint should be publicly accessible + require.Equal(t, http.StatusOK, resp.StatusCode, + "FAIL: Status endpoint should not require auth - expected 200, got %d", resp.StatusCode) + t.Logf(" ✓ Status endpoint correctly accessible without auth") +} + +func TestAuth_DeploymentsWithoutAuth(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Request deployments endpoint without auth + req := &HTTPRequest{ + Method: http.MethodGet, + URL: GetGatewayURL() + "/v1/deployments/list", + SkipAuth: true, + } + + _, status, err := req.Do(ctx) + require.NoError(t, err, "FAIL: Request failed") + + // STRICT: Deployments endpoint must require authentication + require.True(t, status == http.StatusUnauthorized || status == http.StatusForbidden, + "FAIL: Deployments endpoint accessible without auth - expected 401/403, got %d", status) + t.Logf(" ✓ Deployments endpoint correctly requires auth (status %d)", status) +} + +func TestAuth_SQLiteWithoutAuth(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Request SQLite endpoint without auth + req := &HTTPRequest{ + Method: http.MethodGet, + URL: GetGatewayURL() + "/v1/db/sqlite/list", + SkipAuth: true, + } + + _, status, err := req.Do(ctx) + require.NoError(t, err, "FAIL: Request failed") + + // STRICT: SQLite endpoint must require authentication + require.True(t, status == http.StatusUnauthorized || status == http.StatusForbidden, + "FAIL: SQLite endpoint accessible without auth - expected 401/403, got %d", status) + t.Logf(" ✓ SQLite endpoint correctly requires auth (status %d)", status) } diff --git a/e2e/data_persistence_test.go b/e2e/data_persistence_test.go new file mode 100644 index 0000000..e185255 --- /dev/null +++ b/e2e/data_persistence_test.go @@ -0,0 +1,461 @@ +//go:build e2e + +package e2e + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// ============================================================================= +// STRICT DATA PERSISTENCE TESTS +// These tests verify that data is properly persisted and survives operations. +// Tests FAIL if data is lost or corrupted. +// ============================================================================= + +// TestRQLite_DataPersistence verifies that RQLite data is persisted through the gateway. +func TestRQLite_DataPersistence(t *testing.T) { + SkipIfMissingGateway(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + tableName := fmt.Sprintf("persist_test_%d", time.Now().UnixNano()) + + // Cleanup + defer func() { + dropReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/drop-table", + Body: map[string]interface{}{"table": tableName}, + } + dropReq.Do(context.Background()) + }() + + // Create table + createReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/create-table", + Body: map[string]interface{}{ + "schema": fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY, value TEXT, version INTEGER)", + tableName, + ), + }, + } + + _, status, err := createReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not create table") + require.True(t, status == http.StatusCreated || status == http.StatusOK, + "FAIL: Create table returned status %d", status) + + t.Run("Data_survives_multiple_writes", func(t *testing.T) { + // Insert initial data + var statements []string + for i := 1; i <= 10; i++ { + statements = append(statements, + fmt.Sprintf("INSERT INTO %s (value, version) VALUES ('item_%d', %d)", tableName, i, i)) + } + + insertReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/transaction", + Body: map[string]interface{}{"statements": statements}, + } + + _, status, err := insertReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not insert rows") + require.Equal(t, http.StatusOK, status, "FAIL: Insert returned status %d", status) + + // Verify all data exists + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/query", + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName), + }, + } + + body, status, err := queryReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not count rows") + require.Equal(t, http.StatusOK, status, "FAIL: Count query returned status %d", status) + + var queryResp map[string]interface{} + DecodeJSON(body, &queryResp) + + if rows, ok := queryResp["rows"].([]interface{}); ok && len(rows) > 0 { + row := rows[0].([]interface{}) + count := int(row[0].(float64)) + require.Equal(t, 10, count, "FAIL: Expected 10 rows, got %d", count) + } + + // Update data + updateReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/transaction", + Body: map[string]interface{}{ + "statements": []string{ + fmt.Sprintf("UPDATE %s SET version = version + 100 WHERE version <= 5", tableName), + }, + }, + } + + _, status, err = updateReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not update rows") + require.Equal(t, http.StatusOK, status, "FAIL: Update returned status %d", status) + + // Verify updates persisted + queryUpdatedReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/query", + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE version > 100", tableName), + }, + } + + body, status, err = queryUpdatedReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not count updated rows") + require.Equal(t, http.StatusOK, status, "FAIL: Count updated query returned status %d", status) + + DecodeJSON(body, &queryResp) + if rows, ok := queryResp["rows"].([]interface{}); ok && len(rows) > 0 { + row := rows[0].([]interface{}) + count := int(row[0].(float64)) + require.Equal(t, 5, count, "FAIL: Expected 5 updated rows, got %d", count) + } + + t.Logf(" ✓ Data persists through multiple write operations") + }) + + t.Run("Deletes_are_persisted", func(t *testing.T) { + // Delete some rows + deleteReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/transaction", + Body: map[string]interface{}{ + "statements": []string{ + fmt.Sprintf("DELETE FROM %s WHERE version > 100", tableName), + }, + }, + } + + _, status, err := deleteReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not delete rows") + require.Equal(t, http.StatusOK, status, "FAIL: Delete returned status %d", status) + + // Verify deletes persisted + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/query", + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName), + }, + } + + body, status, err := queryReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not count remaining rows") + require.Equal(t, http.StatusOK, status, "FAIL: Count query returned status %d", status) + + var queryResp map[string]interface{} + DecodeJSON(body, &queryResp) + + if rows, ok := queryResp["rows"].([]interface{}); ok && len(rows) > 0 { + row := rows[0].([]interface{}) + count := int(row[0].(float64)) + require.Equal(t, 5, count, "FAIL: Expected 5 rows after delete, got %d", count) + } + + t.Logf(" ✓ Deletes are properly persisted") + }) +} + +// TestRQLite_DataFilesExist verifies RQLite data files are created on disk. +func TestRQLite_DataFilesExist(t *testing.T) { + homeDir, err := os.UserHomeDir() + require.NoError(t, err, "FAIL: Could not get home directory") + + // Check for RQLite data directories + dataLocations := []string{ + filepath.Join(homeDir, ".orama", "node-1", "rqlite"), + filepath.Join(homeDir, ".orama", "node-2", "rqlite"), + filepath.Join(homeDir, ".orama", "node-3", "rqlite"), + filepath.Join(homeDir, ".orama", "node-4", "rqlite"), + filepath.Join(homeDir, ".orama", "node-5", "rqlite"), + } + + foundDataDirs := 0 + for _, dataDir := range dataLocations { + if _, err := os.Stat(dataDir); err == nil { + foundDataDirs++ + t.Logf(" ✓ Found RQLite data directory: %s", dataDir) + + // Check for Raft log files + entries, _ := os.ReadDir(dataDir) + for _, entry := range entries { + t.Logf(" - %s", entry.Name()) + } + } + } + + require.Greater(t, foundDataDirs, 0, + "FAIL: No RQLite data directories found - data may not be persisted") + t.Logf(" Found %d RQLite data directories", foundDataDirs) +} + +// TestOlric_DataPersistence verifies Olric cache data persistence. +// Note: Olric is an in-memory cache, so this tests data survival during runtime. +func TestOlric_DataPersistence(t *testing.T) { + env, err := LoadTestEnv() + require.NoError(t, err, "FAIL: Could not load test environment") + + dmap := fmt.Sprintf("persist_cache_%d", time.Now().UnixNano()) + + t.Run("Cache_data_survives_multiple_operations", func(t *testing.T) { + // Put multiple keys + keys := make(map[string]string) + for i := 0; i < 10; i++ { + key := fmt.Sprintf("persist_key_%d", i) + value := fmt.Sprintf("persist_value_%d", i) + keys[key] = value + + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, value) + require.NoError(t, err, "FAIL: Could not put key %s", key) + } + + // Perform other operations + err := putToOlric(env.GatewayURL, env.APIKey, dmap, "other_key", "other_value") + require.NoError(t, err, "FAIL: Could not put other key") + + // Verify original keys still exist + for key, expectedValue := range keys { + retrieved, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.NoError(t, err, "FAIL: Key %s not found after other operations", key) + require.Equal(t, expectedValue, retrieved, "FAIL: Value mismatch for key %s", key) + } + + t.Logf(" ✓ Cache data survives multiple operations") + }) +} + +// TestNamespaceCluster_DataPersistence verifies namespace-specific data is isolated and persisted. +func TestNamespaceCluster_DataPersistence(t *testing.T) { + // Create namespace + namespace := fmt.Sprintf("persist-ns-%d", time.Now().UnixNano()) + env, err := LoadTestEnvWithNamespace(namespace) + require.NoError(t, err, "FAIL: Could not create namespace") + + t.Logf("Created namespace: %s", namespace) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + t.Run("Namespace_data_is_isolated", func(t *testing.T) { + // Create data via gateway API + tableName := fmt.Sprintf("ns_data_%d", time.Now().UnixNano()) + + req := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/rqlite/create-table", + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + Body: map[string]interface{}{ + "schema": fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY, value TEXT)", tableName), + }, + } + + _, status, err := req.Do(ctx) + require.NoError(t, err, "FAIL: Could not create table in namespace") + require.True(t, status == http.StatusOK || status == http.StatusCreated, + "FAIL: Create table returned status %d", status) + + // Insert data + insertReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/rqlite/transaction", + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + Body: map[string]interface{}{ + "statements": []string{ + fmt.Sprintf("INSERT INTO %s (value) VALUES ('ns_test_value')", tableName), + }, + }, + } + + _, status, err = insertReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not insert into namespace table") + require.Equal(t, http.StatusOK, status, "FAIL: Insert returned status %d", status) + + // Verify data exists + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/rqlite/query", + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT value FROM %s", tableName), + }, + } + + body, status, err := queryReq.Do(ctx) + require.NoError(t, err, "FAIL: Could not query namespace table") + require.Equal(t, http.StatusOK, status, "FAIL: Query returned status %d", status) + + var queryResp map[string]interface{} + json.Unmarshal(body, &queryResp) + count, _ := queryResp["count"].(float64) + require.Equal(t, float64(1), count, "FAIL: Expected 1 row in namespace table") + + t.Logf(" ✓ Namespace data is isolated and persisted") + }) +} + +// TestIPFS_DataPersistence verifies IPFS content is persisted and pinned. +// Note: Detailed IPFS tests are in storage_http_test.go. This test uses the helper from env.go. +func TestIPFS_DataPersistence(t *testing.T) { + env, err := LoadTestEnv() + require.NoError(t, err, "FAIL: Could not load test environment") + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + t.Run("Uploaded_content_persists", func(t *testing.T) { + // Use helper function to upload content via multipart form + content := fmt.Sprintf("persistent content %d", time.Now().UnixNano()) + cid := UploadTestFile(t, env, "persist_test.txt", content) + require.NotEmpty(t, cid, "FAIL: No CID returned from upload") + t.Logf(" Uploaded content with CID: %s", cid) + + // Verify content can be retrieved + getReq := &HTTPRequest{ + Method: http.MethodGet, + URL: env.GatewayURL + "/v1/storage/get/" + cid, + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + } + + respBody, status, err := getReq.Do(ctx) + require.NoError(t, err, "FAIL: Get content failed") + require.Equal(t, http.StatusOK, status, "FAIL: Get returned status %d", status) + require.Contains(t, string(respBody), "persistent content", + "FAIL: Retrieved content doesn't match uploaded content") + + t.Logf(" ✓ IPFS content persists and is retrievable") + }) +} + +// TestSQLite_DataPersistence verifies per-deployment SQLite databases persist. +func TestSQLite_DataPersistence(t *testing.T) { + env, err := LoadTestEnv() + require.NoError(t, err, "FAIL: Could not load test environment") + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + dbName := fmt.Sprintf("persist_db_%d", time.Now().UnixNano()) + + t.Run("SQLite_database_persists", func(t *testing.T) { + // Create database + createReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/db/sqlite/create", + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + Body: map[string]interface{}{ + "name": dbName, + }, + } + + _, status, err := createReq.Do(ctx) + require.NoError(t, err, "FAIL: Create database failed") + require.True(t, status == http.StatusOK || status == http.StatusCreated, + "FAIL: Create returned status %d", status) + t.Logf(" Created SQLite database: %s", dbName) + + // Create table and insert data + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/db/sqlite/query", + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + Body: map[string]interface{}{ + "database": dbName, + "sql": "CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, data TEXT)", + }, + } + + _, status, err = queryReq.Do(ctx) + require.NoError(t, err, "FAIL: Create table failed") + require.Equal(t, http.StatusOK, status, "FAIL: Create table returned status %d", status) + + // Insert data + insertReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/db/sqlite/query", + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + Body: map[string]interface{}{ + "database": dbName, + "sql": "INSERT INTO test_table (data) VALUES ('persistent_data')", + }, + } + + _, status, err = insertReq.Do(ctx) + require.NoError(t, err, "FAIL: Insert failed") + require.Equal(t, http.StatusOK, status, "FAIL: Insert returned status %d", status) + + // Verify data persists + selectReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/db/sqlite/query", + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + Body: map[string]interface{}{ + "database": dbName, + "sql": "SELECT data FROM test_table", + }, + } + + body, status, err := selectReq.Do(ctx) + require.NoError(t, err, "FAIL: Select failed") + require.Equal(t, http.StatusOK, status, "FAIL: Select returned status %d", status) + require.Contains(t, string(body), "persistent_data", + "FAIL: Data not found in SQLite database") + + t.Logf(" ✓ SQLite database data persists") + }) + + t.Run("SQLite_database_listed", func(t *testing.T) { + // List databases to verify it was persisted + listReq := &HTTPRequest{ + Method: http.MethodGet, + URL: env.GatewayURL + "/v1/db/sqlite/list", + Headers: map[string]string{ + "Authorization": "Bearer " + env.APIKey, + }, + } + + body, status, err := listReq.Do(ctx) + require.NoError(t, err, "FAIL: List databases failed") + require.Equal(t, http.StatusOK, status, "FAIL: List returned status %d", status) + require.Contains(t, string(body), dbName, + "FAIL: Created database not found in list") + + t.Logf(" ✓ SQLite database appears in list") + }) +} diff --git a/e2e/env.go b/e2e/env.go index 1b3492b..a105ef8 100644 --- a/e2e/env.go +++ b/e2e/env.go @@ -40,6 +40,73 @@ var ( cacheMutex sync.RWMutex ) +// createAPIKeyWithProvisioning creates an API key for a namespace, handling async provisioning +// For non-default namespaces, this may trigger cluster provisioning and wait for it to complete. +func createAPIKeyWithProvisioning(gatewayURL, wallet, namespace string, timeout time.Duration) (string, error) { + httpClient := NewHTTPClient(10 * time.Second) + + makeRequest := func() (*http.Response, []byte, error) { + reqBody := map[string]string{ + "wallet": wallet, + "namespace": namespace, + } + bodyBytes, _ := json.Marshal(reqBody) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "POST", gatewayURL+"/v1/auth/simple-key", bytes.NewReader(bodyBytes)) + if err != nil { + return nil, nil, fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return nil, nil, fmt.Errorf("request failed: %w", err) + } + + respBody, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return resp, respBody, nil + } + + startTime := time.Now() + for { + if time.Since(startTime) > timeout { + return "", fmt.Errorf("timeout waiting for namespace provisioning") + } + + resp, respBody, err := makeRequest() + if err != nil { + return "", err + } + + // If we got 200, extract the API key + if resp.StatusCode == http.StatusOK { + var apiKeyResp map[string]interface{} + if err := json.Unmarshal(respBody, &apiKeyResp); err != nil { + return "", fmt.Errorf("failed to decode API key response: %w", err) + } + apiKey, ok := apiKeyResp["api_key"].(string) + if !ok || apiKey == "" { + return "", fmt.Errorf("API key not found in response") + } + return apiKey, nil + } + + // If we got 202 Accepted, provisioning is in progress + if resp.StatusCode == http.StatusAccepted { + // Wait and retry - the cluster is being provisioned + time.Sleep(5 * time.Second) + continue + } + + // Any other status is an error + return "", fmt.Errorf("API key creation failed with status %d: %s", resp.StatusCode, string(respBody)) + } +} + // loadGatewayConfig loads gateway configuration from ~/.orama/gateway.yaml func loadGatewayConfig() (map[string]interface{}, error) { configPath, err := config.DefaultPath("gateway.yaml") @@ -1098,43 +1165,11 @@ func LoadTestEnv() (*E2ETestEnv, error) { wallet = wallet[:42] } - // Create an API key for this namespace via the simple-key endpoint - reqBody := map[string]string{ - "wallet": wallet, - "namespace": namespace, - } - bodyBytes, _ := json.Marshal(reqBody) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, "POST", gatewayURL+"/v1/auth/simple-key", bytes.NewReader(bodyBytes)) + // Create an API key for this namespace (handles async provisioning for non-default namespaces) + var err error + apiKey, err = createAPIKeyWithProvisioning(gatewayURL, wallet, namespace, 2*time.Minute) if err != nil { - return nil, fmt.Errorf("failed to create API key request: %w", err) - } - req.Header.Set("Content-Type", "application/json") - - client := NewHTTPClient(10 * time.Second) - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to create API key: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("API key creation failed with status %d: %s", resp.StatusCode, string(bodyBytes)) - } - - var apiKeyResp map[string]interface{} - if err := json.NewDecoder(resp.Body).Decode(&apiKeyResp); err != nil { - return nil, fmt.Errorf("failed to decode API key response: %w", err) - } - - var ok bool - apiKey, ok = apiKeyResp["api_key"].(string) - if !ok || apiKey == "" { - return nil, fmt.Errorf("API key not found in response") + return nil, fmt.Errorf("failed to create API key for namespace %s: %w", namespace, err) } } else if namespace == "" { namespace = GetClientNamespace() @@ -1179,42 +1214,10 @@ func LoadTestEnvWithNamespace(namespace string) (*E2ETestEnv, error) { wallet = wallet[:42] } - // Create an API key for this namespace via the simple-key endpoint - reqBody := map[string]string{ - "wallet": wallet, - "namespace": namespace, - } - bodyBytes, _ := json.Marshal(reqBody) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, "POST", gatewayURL+"/v1/auth/simple-key", bytes.NewReader(bodyBytes)) + // Create an API key for this namespace (handles async provisioning for non-default namespaces) + apiKey, err := createAPIKeyWithProvisioning(gatewayURL, wallet, namespace, 2*time.Minute) if err != nil { - return nil, fmt.Errorf("failed to create API key request: %w", err) - } - req.Header.Set("Content-Type", "application/json") - - client := NewHTTPClient(10 * time.Second) - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to create API key: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("API key creation failed with status %d: %s", resp.StatusCode, string(bodyBytes)) - } - - var apiKeyResp map[string]interface{} - if err := json.NewDecoder(resp.Body).Decode(&apiKeyResp); err != nil { - return nil, fmt.Errorf("failed to decode API key response: %w", err) - } - - apiKey, ok := apiKeyResp["api_key"].(string) - if !ok || apiKey == "" { - return nil, fmt.Errorf("API key not found in response") + return nil, fmt.Errorf("failed to create API key for namespace %s: %w", namespace, err) } return &E2ETestEnv{ diff --git a/e2e/olric_cluster_test.go b/e2e/olric_cluster_test.go new file mode 100644 index 0000000..f8d4032 --- /dev/null +++ b/e2e/olric_cluster_test.go @@ -0,0 +1,414 @@ +//go:build e2e + +package e2e + +import ( + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// ============================================================================= +// STRICT OLRIC CACHE DISTRIBUTION TESTS +// These tests verify that Olric cache data is properly distributed across nodes. +// Tests FAIL if distribution doesn't work - no skips, no warnings. +// ============================================================================= + +// getOlricNodeAddresses returns HTTP addresses of Olric nodes +// Note: Olric HTTP port is typically on port 3320 for the main cluster +func getOlricNodeAddresses() []string { + // In dev mode, we have a single Olric instance + // In production, each node runs its own Olric instance + return []string{ + "http://localhost:3320", + } +} + +// putToOlric stores a key-value pair in Olric via HTTP API +func putToOlric(gatewayURL, apiKey, dmap, key, value string) error { + reqBody := map[string]interface{}{ + "dmap": dmap, + "key": key, + "value": value, + } + bodyBytes, _ := json.Marshal(reqBody) + + req, err := http.NewRequest("POST", gatewayURL+"/v1/cache/put", strings.NewReader(string(bodyBytes))) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+apiKey) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("put failed with status %d: %s", resp.StatusCode, string(body)) + } + return nil +} + +// getFromOlric retrieves a value from Olric via HTTP API +func getFromOlric(gatewayURL, apiKey, dmap, key string) (string, error) { + reqBody := map[string]interface{}{ + "dmap": dmap, + "key": key, + } + bodyBytes, _ := json.Marshal(reqBody) + + req, err := http.NewRequest("POST", gatewayURL+"/v1/cache/get", strings.NewReader(string(bodyBytes))) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+apiKey) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + return "", fmt.Errorf("key not found") + } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("get failed with status %d: %s", resp.StatusCode, string(body)) + } + + body, _ := io.ReadAll(resp.Body) + var result map[string]interface{} + if err := json.Unmarshal(body, &result); err != nil { + return "", err + } + + if value, ok := result["value"].(string); ok { + return value, nil + } + // Value might be in a different format + if value, ok := result["value"]; ok { + return fmt.Sprintf("%v", value), nil + } + return "", fmt.Errorf("value not found in response") +} + +// TestOlric_BasicDistribution verifies cache operations work across the cluster. +func TestOlric_BasicDistribution(t *testing.T) { + // Note: Not using SkipIfMissingGateway() since LoadTestEnv() creates its own API key + env, err := LoadTestEnv() + require.NoError(t, err, "FAIL: Could not load test environment") + require.NotEmpty(t, env.APIKey, "FAIL: No API key available") + + dmap := fmt.Sprintf("dist_test_%d", time.Now().UnixNano()) + + t.Run("Put_and_get_from_same_gateway", func(t *testing.T) { + key := fmt.Sprintf("key_%d", time.Now().UnixNano()) + value := fmt.Sprintf("value_%d", time.Now().UnixNano()) + + // Put + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, value) + require.NoError(t, err, "FAIL: Could not put value to cache") + + // Get + retrieved, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.NoError(t, err, "FAIL: Could not get value from cache") + require.Equal(t, value, retrieved, "FAIL: Retrieved value doesn't match") + + t.Logf(" ✓ Put/Get works: %s = %s", key, value) + }) + + t.Run("Multiple_keys_distributed", func(t *testing.T) { + // Put multiple keys (should be distributed across partitions) + keys := make(map[string]string) + for i := 0; i < 20; i++ { + key := fmt.Sprintf("dist_key_%d_%d", i, time.Now().UnixNano()) + value := fmt.Sprintf("dist_value_%d", i) + keys[key] = value + + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, value) + require.NoError(t, err, "FAIL: Could not put key %s", key) + } + + t.Logf(" Put 20 keys to cache") + + // Verify all keys are retrievable + for key, expectedValue := range keys { + retrieved, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.NoError(t, err, "FAIL: Could not get key %s", key) + require.Equal(t, expectedValue, retrieved, "FAIL: Value mismatch for key %s", key) + } + + t.Logf(" ✓ All 20 keys are retrievable") + }) +} + +// TestOlric_ConcurrentAccess verifies cache handles concurrent operations correctly. +func TestOlric_ConcurrentAccess(t *testing.T) { + env, err := LoadTestEnv() + require.NoError(t, err, "FAIL: Could not load test environment") + + dmap := fmt.Sprintf("concurrent_test_%d", time.Now().UnixNano()) + + t.Run("Concurrent_writes_to_same_key", func(t *testing.T) { + key := fmt.Sprintf("concurrent_key_%d", time.Now().UnixNano()) + + // Launch multiple goroutines writing to the same key + done := make(chan error, 10) + for i := 0; i < 10; i++ { + go func(idx int) { + value := fmt.Sprintf("concurrent_value_%d", idx) + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, value) + done <- err + }(i) + } + + // Wait for all writes + var errors []error + for i := 0; i < 10; i++ { + if err := <-done; err != nil { + errors = append(errors, err) + } + } + + require.Empty(t, errors, "FAIL: %d concurrent writes failed: %v", len(errors), errors) + + // The key should have ONE of the values (last write wins) + retrieved, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.NoError(t, err, "FAIL: Could not get key after concurrent writes") + require.Contains(t, retrieved, "concurrent_value_", "FAIL: Value doesn't match expected pattern") + + t.Logf(" ✓ Concurrent writes succeeded, final value: %s", retrieved) + }) + + t.Run("Concurrent_reads_and_writes", func(t *testing.T) { + key := fmt.Sprintf("rw_key_%d", time.Now().UnixNano()) + initialValue := "initial_value" + + // Set initial value + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, initialValue) + require.NoError(t, err, "FAIL: Could not set initial value") + + // Launch concurrent readers and writers + done := make(chan error, 20) + + // 10 readers + for i := 0; i < 10; i++ { + go func() { + _, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + done <- err + }() + } + + // 10 writers + for i := 0; i < 10; i++ { + go func(idx int) { + value := fmt.Sprintf("updated_value_%d", idx) + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, value) + done <- err + }(i) + } + + // Wait for all operations + var readErrors, writeErrors []error + for i := 0; i < 20; i++ { + if err := <-done; err != nil { + if i < 10 { + readErrors = append(readErrors, err) + } else { + writeErrors = append(writeErrors, err) + } + } + } + + require.Empty(t, readErrors, "FAIL: %d reads failed", len(readErrors)) + require.Empty(t, writeErrors, "FAIL: %d writes failed", len(writeErrors)) + + t.Logf(" ✓ Concurrent read/write operations succeeded") + }) +} + +// TestOlric_NamespaceClusterCache verifies cache works in namespace-specific clusters. +func TestOlric_NamespaceClusterCache(t *testing.T) { + // Create a new namespace + namespace := fmt.Sprintf("cache-test-%d", time.Now().UnixNano()) + + env, err := LoadTestEnvWithNamespace(namespace) + require.NoError(t, err, "FAIL: Could not create namespace for cache test") + require.NotEmpty(t, env.APIKey, "FAIL: No API key") + + t.Logf("Created namespace %s", namespace) + + dmap := fmt.Sprintf("ns_cache_%d", time.Now().UnixNano()) + + t.Run("Cache_operations_work_in_namespace", func(t *testing.T) { + key := fmt.Sprintf("ns_key_%d", time.Now().UnixNano()) + value := fmt.Sprintf("ns_value_%d", time.Now().UnixNano()) + + // Put using namespace API key + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, value) + require.NoError(t, err, "FAIL: Could not put value in namespace cache") + + // Get + retrieved, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.NoError(t, err, "FAIL: Could not get value from namespace cache") + require.Equal(t, value, retrieved, "FAIL: Value mismatch in namespace cache") + + t.Logf(" ✓ Namespace cache operations work: %s = %s", key, value) + }) + + // Check if namespace Olric instances are running (port 10003 offset in port blocks) + var nsOlricPorts []int + for port := 10003; port <= 10098; port += 5 { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 1*time.Second) + if err == nil { + conn.Close() + nsOlricPorts = append(nsOlricPorts, port) + } + } + + if len(nsOlricPorts) > 0 { + t.Logf("Found %d namespace Olric memberlist ports: %v", len(nsOlricPorts), nsOlricPorts) + + t.Run("Namespace_Olric_nodes_connected", func(t *testing.T) { + // Verify all namespace Olric nodes can be reached + for _, port := range nsOlricPorts { + conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", port), 2*time.Second) + require.NoError(t, err, "FAIL: Cannot connect to namespace Olric on port %d", port) + conn.Close() + t.Logf(" ✓ Namespace Olric memberlist on port %d is reachable", port) + } + }) + } +} + +// TestOlric_DataConsistency verifies data remains consistent across operations. +func TestOlric_DataConsistency(t *testing.T) { + env, err := LoadTestEnv() + require.NoError(t, err, "FAIL: Could not load test environment") + + dmap := fmt.Sprintf("consistency_test_%d", time.Now().UnixNano()) + + t.Run("Update_preserves_latest_value", func(t *testing.T) { + key := fmt.Sprintf("update_key_%d", time.Now().UnixNano()) + + // Write multiple times + for i := 1; i <= 5; i++ { + value := fmt.Sprintf("version_%d", i) + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, value) + require.NoError(t, err, "FAIL: Could not update key to version %d", i) + } + + // Final read should return latest version + retrieved, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.NoError(t, err, "FAIL: Could not read final value") + require.Equal(t, "version_5", retrieved, "FAIL: Latest version not preserved") + + t.Logf(" ✓ Latest value preserved after 5 updates") + }) + + t.Run("Delete_removes_key", func(t *testing.T) { + key := fmt.Sprintf("delete_key_%d", time.Now().UnixNano()) + value := "to_be_deleted" + + // Put + err := putToOlric(env.GatewayURL, env.APIKey, dmap, key, value) + require.NoError(t, err, "FAIL: Could not put value") + + // Verify it exists + retrieved, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.NoError(t, err, "FAIL: Could not get value before delete") + require.Equal(t, value, retrieved) + + // Delete (POST with JSON body) + deleteBody := map[string]interface{}{ + "dmap": dmap, + "key": key, + } + deleteBytes, _ := json.Marshal(deleteBody) + req, _ := http.NewRequest("POST", env.GatewayURL+"/v1/cache/delete", strings.NewReader(string(deleteBytes))) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+env.APIKey) + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + require.NoError(t, err, "FAIL: Delete request failed") + resp.Body.Close() + require.True(t, resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNoContent, + "FAIL: Delete returned unexpected status %d", resp.StatusCode) + + // Verify key is gone + _, err = getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.Error(t, err, "FAIL: Key should not exist after delete") + require.Contains(t, err.Error(), "not found", "FAIL: Expected 'not found' error") + + t.Logf(" ✓ Delete properly removes key") + }) +} + +// TestOlric_TTLExpiration verifies TTL expiration works. +// NOTE: TTL is currently parsed but not applied by the cache handler (TODO in set_handler.go). +// This test is skipped until TTL support is fully implemented. +func TestOlric_TTLExpiration(t *testing.T) { + t.Skip("TTL support not yet implemented in cache handler - see set_handler.go lines 88-98") + + env, err := LoadTestEnv() + require.NoError(t, err, "FAIL: Could not load test environment") + + dmap := fmt.Sprintf("ttl_test_%d", time.Now().UnixNano()) + + t.Run("Key_expires_after_TTL", func(t *testing.T) { + key := fmt.Sprintf("ttl_key_%d", time.Now().UnixNano()) + value := "expires_soon" + ttlSeconds := 3 + + // Put with TTL (TTL is a duration string like "3s", "1m", etc.) + reqBody := map[string]interface{}{ + "dmap": dmap, + "key": key, + "value": value, + "ttl": fmt.Sprintf("%ds", ttlSeconds), + } + bodyBytes, _ := json.Marshal(reqBody) + + req, _ := http.NewRequest("POST", env.GatewayURL+"/v1/cache/put", strings.NewReader(string(bodyBytes))) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+env.APIKey) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + require.NoError(t, err, "FAIL: Put with TTL failed") + resp.Body.Close() + require.True(t, resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated, + "FAIL: Put returned status %d", resp.StatusCode) + + // Verify key exists immediately + retrieved, err := getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.NoError(t, err, "FAIL: Could not get key immediately after put") + require.Equal(t, value, retrieved) + t.Logf(" Key exists immediately after put") + + // Wait for TTL to expire (plus buffer) + time.Sleep(time.Duration(ttlSeconds+2) * time.Second) + + // Key should be gone + _, err = getFromOlric(env.GatewayURL, env.APIKey, dmap, key) + require.Error(t, err, "FAIL: Key should have expired after %d seconds", ttlSeconds) + require.Contains(t, err.Error(), "not found", "FAIL: Expected 'not found' error after TTL") + + t.Logf(" ✓ Key expired after %d seconds as expected", ttlSeconds) + }) +} diff --git a/e2e/rqlite_cluster_test.go b/e2e/rqlite_cluster_test.go new file mode 100644 index 0000000..04dd606 --- /dev/null +++ b/e2e/rqlite_cluster_test.go @@ -0,0 +1,478 @@ +//go:build e2e + +package e2e + +import ( + "context" + "fmt" + "net/http" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// ============================================================================= +// STRICT RQLITE CLUSTER TESTS +// These tests verify that RQLite cluster operations work correctly. +// Tests FAIL if operations don't work - no skips, no warnings. +// ============================================================================= + +// TestRQLite_ClusterHealth verifies the RQLite cluster is healthy and operational. +func TestRQLite_ClusterHealth(t *testing.T) { + SkipIfMissingGateway(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Check RQLite schema endpoint (proves cluster is reachable) + req := &HTTPRequest{ + Method: http.MethodGet, + URL: GetGatewayURL() + "/v1/rqlite/schema", + } + + body, status, err := req.Do(ctx) + require.NoError(t, err, "FAIL: Could not reach RQLite cluster") + require.Equal(t, http.StatusOK, status, "FAIL: RQLite schema endpoint returned %d: %s", status, string(body)) + + var schemaResp map[string]interface{} + err = DecodeJSON(body, &schemaResp) + require.NoError(t, err, "FAIL: Could not decode RQLite schema response") + + // Schema endpoint should return tables array + _, hasTables := schemaResp["tables"] + require.True(t, hasTables, "FAIL: RQLite schema response missing 'tables' field") + + t.Logf(" ✓ RQLite cluster is healthy and responding") +} + +// TestRQLite_WriteReadConsistency verifies data written can be read back consistently. +func TestRQLite_WriteReadConsistency(t *testing.T) { + SkipIfMissingGateway(t) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + table := GenerateTableName() + + // Cleanup + defer func() { + dropReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/drop-table", + Body: map[string]interface{}{"table": table}, + } + dropReq.Do(context.Background()) + }() + + // Create table + createReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/create-table", + Body: map[string]interface{}{ + "schema": fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY, value TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)", + table, + ), + }, + } + + _, status, err := createReq.Do(ctx) + require.NoError(t, err, "FAIL: Create table request failed") + require.True(t, status == http.StatusCreated || status == http.StatusOK, + "FAIL: Create table returned status %d", status) + t.Logf("Created table %s", table) + + t.Run("Write_then_read_returns_same_data", func(t *testing.T) { + uniqueValue := fmt.Sprintf("test_value_%d", time.Now().UnixNano()) + + // Insert + insertReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/transaction", + Body: map[string]interface{}{ + "statements": []string{ + fmt.Sprintf("INSERT INTO %s (value) VALUES ('%s')", table, uniqueValue), + }, + }, + } + + _, status, err := insertReq.Do(ctx) + require.NoError(t, err, "FAIL: Insert request failed") + require.Equal(t, http.StatusOK, status, "FAIL: Insert returned status %d", status) + + // Read back + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/query", + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT value FROM %s WHERE value = '%s'", table, uniqueValue), + }, + } + + body, status, err := queryReq.Do(ctx) + require.NoError(t, err, "FAIL: Query request failed") + require.Equal(t, http.StatusOK, status, "FAIL: Query returned status %d", status) + + var queryResp map[string]interface{} + err = DecodeJSON(body, &queryResp) + require.NoError(t, err, "FAIL: Could not decode query response") + + // Verify we got our value back + count, ok := queryResp["count"].(float64) + require.True(t, ok, "FAIL: Response missing 'count' field") + require.Equal(t, float64(1), count, "FAIL: Expected 1 row, got %v", count) + + t.Logf(" ✓ Written value '%s' was read back correctly", uniqueValue) + }) + + t.Run("Multiple_writes_all_readable", func(t *testing.T) { + // Insert multiple values + var statements []string + for i := 0; i < 10; i++ { + statements = append(statements, + fmt.Sprintf("INSERT INTO %s (value) VALUES ('batch_%d')", table, i)) + } + + insertReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/transaction", + Body: map[string]interface{}{ + "statements": statements, + }, + } + + _, status, err := insertReq.Do(ctx) + require.NoError(t, err, "FAIL: Batch insert failed") + require.Equal(t, http.StatusOK, status, "FAIL: Batch insert returned status %d", status) + + // Count all batch rows + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/query", + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT COUNT(*) as cnt FROM %s WHERE value LIKE 'batch_%%'", table), + }, + } + + body, status, err := queryReq.Do(ctx) + require.NoError(t, err, "FAIL: Count query failed") + require.Equal(t, http.StatusOK, status, "FAIL: Count query returned status %d", status) + + var queryResp map[string]interface{} + DecodeJSON(body, &queryResp) + + if rows, ok := queryResp["rows"].([]interface{}); ok && len(rows) > 0 { + row := rows[0].([]interface{}) + count := int(row[0].(float64)) + require.Equal(t, 10, count, "FAIL: Expected 10 batch rows, got %d", count) + } + + t.Logf(" ✓ All 10 batch writes are readable") + }) +} + +// TestRQLite_TransactionAtomicity verifies transactions are atomic. +func TestRQLite_TransactionAtomicity(t *testing.T) { + SkipIfMissingGateway(t) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + table := GenerateTableName() + + // Cleanup + defer func() { + dropReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/drop-table", + Body: map[string]interface{}{"table": table}, + } + dropReq.Do(context.Background()) + }() + + // Create table + createReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/create-table", + Body: map[string]interface{}{ + "schema": fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY, value TEXT UNIQUE)", + table, + ), + }, + } + + _, status, err := createReq.Do(ctx) + require.NoError(t, err, "FAIL: Create table failed") + require.True(t, status == http.StatusCreated || status == http.StatusOK, + "FAIL: Create table returned status %d", status) + + t.Run("Successful_transaction_commits_all", func(t *testing.T) { + txReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/transaction", + Body: map[string]interface{}{ + "statements": []string{ + fmt.Sprintf("INSERT INTO %s (value) VALUES ('tx_val_1')", table), + fmt.Sprintf("INSERT INTO %s (value) VALUES ('tx_val_2')", table), + fmt.Sprintf("INSERT INTO %s (value) VALUES ('tx_val_3')", table), + }, + }, + } + + _, status, err := txReq.Do(ctx) + require.NoError(t, err, "FAIL: Transaction request failed") + require.Equal(t, http.StatusOK, status, "FAIL: Transaction returned status %d", status) + + // Verify all 3 rows exist + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/query", + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE value LIKE 'tx_val_%%'", table), + }, + } + + body, _, _ := queryReq.Do(ctx) + var queryResp map[string]interface{} + DecodeJSON(body, &queryResp) + + if rows, ok := queryResp["rows"].([]interface{}); ok && len(rows) > 0 { + row := rows[0].([]interface{}) + count := int(row[0].(float64)) + require.Equal(t, 3, count, "FAIL: Transaction didn't commit all 3 rows - got %d", count) + } + + t.Logf(" ✓ Transaction committed all 3 rows atomically") + }) + + t.Run("Updates_preserve_consistency", func(t *testing.T) { + // Update a value + updateReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/transaction", + Body: map[string]interface{}{ + "statements": []string{ + fmt.Sprintf("UPDATE %s SET value = 'tx_val_1_updated' WHERE value = 'tx_val_1'", table), + }, + }, + } + + _, status, err := updateReq.Do(ctx) + require.NoError(t, err, "FAIL: Update request failed") + require.Equal(t, http.StatusOK, status, "FAIL: Update returned status %d", status) + + // Verify update took effect + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/query", + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT value FROM %s WHERE value = 'tx_val_1_updated'", table), + }, + } + + body, _, _ := queryReq.Do(ctx) + var queryResp map[string]interface{} + DecodeJSON(body, &queryResp) + + count, _ := queryResp["count"].(float64) + require.Equal(t, float64(1), count, "FAIL: Update didn't take effect") + + t.Logf(" ✓ Update preserved consistency") + }) +} + +// TestRQLite_ConcurrentWrites verifies the cluster handles concurrent writes correctly. +func TestRQLite_ConcurrentWrites(t *testing.T) { + SkipIfMissingGateway(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + table := GenerateTableName() + + // Cleanup + defer func() { + dropReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/drop-table", + Body: map[string]interface{}{"table": table}, + } + dropReq.Do(context.Background()) + }() + + // Create table + createReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/create-table", + Body: map[string]interface{}{ + "schema": fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY, worker INTEGER, seq INTEGER)", + table, + ), + }, + } + + _, status, err := createReq.Do(ctx) + require.NoError(t, err, "FAIL: Create table failed") + require.True(t, status == http.StatusCreated || status == http.StatusOK, + "FAIL: Create table returned status %d", status) + + t.Run("Concurrent_inserts_all_succeed", func(t *testing.T) { + numWorkers := 5 + insertsPerWorker := 10 + expectedTotal := numWorkers * insertsPerWorker + + var wg sync.WaitGroup + errChan := make(chan error, numWorkers*insertsPerWorker) + + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for i := 0; i < insertsPerWorker; i++ { + insertReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/transaction", + Body: map[string]interface{}{ + "statements": []string{ + fmt.Sprintf("INSERT INTO %s (worker, seq) VALUES (%d, %d)", table, workerID, i), + }, + }, + } + + _, status, err := insertReq.Do(ctx) + if err != nil { + errChan <- fmt.Errorf("worker %d insert %d failed: %w", workerID, i, err) + return + } + if status != http.StatusOK { + errChan <- fmt.Errorf("worker %d insert %d got status %d", workerID, i, status) + return + } + } + }(w) + } + + wg.Wait() + close(errChan) + + // Collect errors + var errors []error + for err := range errChan { + errors = append(errors, err) + } + require.Empty(t, errors, "FAIL: %d concurrent inserts failed: %v", len(errors), errors) + + // Verify total count + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: GetGatewayURL() + "/v1/rqlite/query", + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT COUNT(*) FROM %s", table), + }, + } + + body, _, _ := queryReq.Do(ctx) + var queryResp map[string]interface{} + DecodeJSON(body, &queryResp) + + if rows, ok := queryResp["rows"].([]interface{}); ok && len(rows) > 0 { + row := rows[0].([]interface{}) + count := int(row[0].(float64)) + require.Equal(t, expectedTotal, count, + "FAIL: Expected %d total rows from concurrent inserts, got %d", expectedTotal, count) + } + + t.Logf(" ✓ All %d concurrent inserts succeeded", expectedTotal) + }) +} + +// TestRQLite_NamespaceClusterOperations verifies RQLite works in namespace clusters. +func TestRQLite_NamespaceClusterOperations(t *testing.T) { + // Create a new namespace + namespace := fmt.Sprintf("rqlite-test-%d", time.Now().UnixNano()) + + env, err := LoadTestEnvWithNamespace(namespace) + require.NoError(t, err, "FAIL: Could not create namespace for RQLite test") + require.NotEmpty(t, env.APIKey, "FAIL: No API key - namespace provisioning failed") + + t.Logf("Created namespace %s", namespace) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + table := GenerateTableName() + + // Cleanup + defer func() { + dropReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/rqlite/drop-table", + Body: map[string]interface{}{"table": table}, + Headers: map[string]string{"Authorization": "Bearer " + env.APIKey}, + } + dropReq.Do(context.Background()) + }() + + t.Run("Namespace_RQLite_create_insert_query", func(t *testing.T) { + // Create table in namespace cluster + createReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/rqlite/create-table", + Headers: map[string]string{"Authorization": "Bearer " + env.APIKey}, + Body: map[string]interface{}{ + "schema": fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY, value TEXT)", + table, + ), + }, + } + + _, status, err := createReq.Do(ctx) + require.NoError(t, err, "FAIL: Create table in namespace failed") + require.True(t, status == http.StatusCreated || status == http.StatusOK, + "FAIL: Create table returned status %d", status) + + // Insert data + uniqueValue := fmt.Sprintf("ns_value_%d", time.Now().UnixNano()) + insertReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/rqlite/transaction", + Headers: map[string]string{"Authorization": "Bearer " + env.APIKey}, + Body: map[string]interface{}{ + "statements": []string{ + fmt.Sprintf("INSERT INTO %s (value) VALUES ('%s')", table, uniqueValue), + }, + }, + } + + _, status, err = insertReq.Do(ctx) + require.NoError(t, err, "FAIL: Insert in namespace failed") + require.Equal(t, http.StatusOK, status, "FAIL: Insert returned status %d", status) + + // Query data + queryReq := &HTTPRequest{ + Method: http.MethodPost, + URL: env.GatewayURL + "/v1/rqlite/query", + Headers: map[string]string{"Authorization": "Bearer " + env.APIKey}, + Body: map[string]interface{}{ + "sql": fmt.Sprintf("SELECT value FROM %s WHERE value = '%s'", table, uniqueValue), + }, + } + + body, status, err := queryReq.Do(ctx) + require.NoError(t, err, "FAIL: Query in namespace failed") + require.Equal(t, http.StatusOK, status, "FAIL: Query returned status %d", status) + + var queryResp map[string]interface{} + DecodeJSON(body, &queryResp) + + count, _ := queryResp["count"].(float64) + require.Equal(t, float64(1), count, "FAIL: Data not found in namespace cluster") + + t.Logf(" ✓ Namespace RQLite operations work correctly") + }) +}