diff --git a/cmd/cli/main.go b/cmd/cli/main.go index e115113..cb3db02 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "fmt" + "io" "log" + "net/http" "os" "os/exec" "strconv" @@ -131,19 +133,10 @@ func handleHealth() { } func handlePeers() { - client, err := createClient() + // Query the gateway directly for peer information + peers, err := getPeersFromGateway() if err != nil { - fmt.Fprintf(os.Stderr, "Failed to create client: %v\n", err) - os.Exit(1) - } - defer client.Disconnect() - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - peers, err := client.Network().GetPeers(ctx) - if err != nil { - fmt.Fprintf(os.Stderr, "Failed to get peers: %v\n", err) + fmt.Fprintf(os.Stderr, "Failed to get peers from gateway: %v\n", err) os.Exit(1) } @@ -289,6 +282,52 @@ func handlePubSub(args []string) { } } +func getPeersFromGateway() ([]client.PeerInfo, error) { + // Get gateway URL + gatewayURL := auth.GetDefaultGatewayURL() + + // Get credentials for authentication + credentials, err := auth.GetOrPromptForCredentials(gatewayURL) + if err != nil { + return nil, fmt.Errorf("authentication failed: %w", err) + } + + // Create HTTP client with timeout + httpClient := &http.Client{ + Timeout: timeout, + } + + // Create request to gateway's peers endpoint + req, err := http.NewRequest("GET", gatewayURL+"/v1/network/peers", nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Add authentication header - try X-API-Key first as it's more explicit + req.Header.Set("X-API-Key", credentials.APIKey) + + // Make the request + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to make request: %w", err) + } + defer resp.Body.Close() + + // Check response status + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("gateway returned status %d: %s", resp.StatusCode, string(body)) + } + + // Parse response + var peers []client.PeerInfo + if err := json.NewDecoder(resp.Body).Decode(&peers); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return peers, nil +} + func ensureAuthenticated() *auth.Credentials { gatewayURL := auth.GetDefaultGatewayURL() diff --git a/e2e/database_creation_test.go b/e2e/database_creation_test.go new file mode 100644 index 0000000..a4c0cc4 --- /dev/null +++ b/e2e/database_creation_test.go @@ -0,0 +1,305 @@ +package e2e + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/config" + "github.com/DeBrosOfficial/network/pkg/node" + "go.uber.org/zap" +) + +// TestSingleNodeDatabaseCreation tests creating a database with replication factor 1 +func TestSingleNodeDatabaseCreation(t *testing.T) { + if testing.Short() { + t.Skip("Skipping e2e test in short mode") + } + + // Setup test environment + testDir := filepath.Join(os.TempDir(), fmt.Sprintf("debros_test_single_%d", time.Now().Unix())) + defer os.RemoveAll(testDir) + + logger, _ := zap.NewDevelopment() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + // Start node + cfg := config.DefaultConfig() + cfg.Node.DataDir = filepath.Join(testDir, "node1") + cfg.P2P.ListenAddresses = []string{"/ip4/127.0.0.1/tcp/14001"} + cfg.Database.ReplicationFactor = 1 + cfg.Database.MaxDatabases = 10 + cfg.Database.PortRangeHTTPStart = 15001 + cfg.Database.PortRangeHTTPEnd = 15010 + cfg.Database.PortRangeRaftStart = 17001 + cfg.Database.PortRangeRaftEnd = 17010 + + n, err := node.NewNode(cfg, logger) + if err != nil { + t.Fatalf("Failed to create node: %v", err) + } + + if err := n.Start(ctx); err != nil { + t.Fatalf("Failed to start node: %v", err) + } + defer n.Stop() + + // Wait for node to be ready + time.Sleep(2 * time.Second) + + // Create client + cli, err := client.NewClient(ctx, client.ClientConfig{ + AppName: "testapp", + BootstrapPeers: []string{n.Host().Addrs()[0].String() + "/p2p/" + n.Host().ID().String()}, + }) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + defer cli.Close() + + // Create database + db := cli.Database("testdb") + + // Write data + _, err = db.WriteOne("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + if err != nil { + t.Fatalf("Failed to create table: %v", err) + } + + _, err = db.WriteOne("INSERT INTO users (name) VALUES ('Alice')") + if err != nil { + t.Fatalf("Failed to insert data: %v", err) + } + + // Read data back + rows, err := db.QueryOne("SELECT * FROM users") + if err != nil { + t.Fatalf("Failed to query data: %v", err) + } + + if !rows.Next() { + t.Fatal("Expected at least one row") + } + + var id int + var name string + if err := rows.Scan(&id, &name); err != nil { + t.Fatalf("Failed to scan row: %v", err) + } + + if name != "Alice" { + t.Errorf("Expected name 'Alice', got '%s'", name) + } + + t.Log("Single node database creation test passed") +} + +// TestThreeNodeDatabaseCreation tests creating a database with replication factor 3 +func TestThreeNodeDatabaseCreation(t *testing.T) { + if testing.Short() { + t.Skip("Skipping e2e test in short mode") + } + + // Setup test environment + testDir := filepath.Join(os.TempDir(), fmt.Sprintf("debros_test_three_%d", time.Now().Unix())) + defer os.RemoveAll(testDir) + + logger, _ := zap.NewDevelopment() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Start 3 nodes + nodes := make([]*node.Node, 3) + for i := 0; i < 3; i++ { + cfg := config.DefaultConfig() + cfg.Node.DataDir = filepath.Join(testDir, fmt.Sprintf("node%d", i+1)) + cfg.P2P.ListenAddresses = []string{fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 14001+i)} + cfg.Database.ReplicationFactor = 3 + cfg.Database.MaxDatabases = 10 + cfg.Database.PortRangeHTTPStart = 15001 + (i * 100) + cfg.Database.PortRangeHTTPEnd = 15010 + (i * 100) + cfg.Database.PortRangeRaftStart = 17001 + (i * 100) + cfg.Database.PortRangeRaftEnd = 17010 + (i * 100) + + // Connect to first node + if i > 0 { + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cfg.P2P.BootstrapPeers = []string{bootstrapAddr} + } + + n, err := node.NewNode(cfg, logger) + if err != nil { + t.Fatalf("Failed to create node %d: %v", i+1, err) + } + + if err := n.Start(ctx); err != nil { + t.Fatalf("Failed to start node %d: %v", i+1, err) + } + defer n.Stop() + + nodes[i] = n + } + + // Wait for nodes to discover each other + time.Sleep(5 * time.Second) + + // Create client connected to first node + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cli, err := client.NewClient(ctx, client.ClientConfig{ + AppName: "testapp", + BootstrapPeers: []string{bootstrapAddr}, + }) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + defer cli.Close() + + // Create database + db := cli.Database("testdb") + + // Write data + _, err = db.WriteOne("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + if err != nil { + t.Fatalf("Failed to create table: %v", err) + } + + _, err = db.WriteOne("INSERT INTO users (name) VALUES ('Alice')") + if err != nil { + t.Fatalf("Failed to insert data: %v", err) + } + + // Wait for replication + time.Sleep(2 * time.Second) + + // Read from all nodes to verify replication + // Note: This would require connecting to each node separately + // For now, just verify we can read from the first node + rows, err := db.QueryOne("SELECT * FROM users") + if err != nil { + t.Fatalf("Failed to query data: %v", err) + } + + if !rows.Next() { + t.Fatal("Expected at least one row") + } + + var id int + var name string + if err := rows.Scan(&id, &name); err != nil { + t.Fatalf("Failed to scan row: %v", err) + } + + if name != "Alice" { + t.Errorf("Expected name 'Alice', got '%s'", name) + } + + t.Log("Three node database creation test passed") +} + +// TestMultipleDatabases tests creating multiple isolated databases +func TestMultipleDatabases(t *testing.T) { + if testing.Short() { + t.Skip("Skipping e2e test in short mode") + } + + // Setup test environment + testDir := filepath.Join(os.TempDir(), fmt.Sprintf("debros_test_multi_%d", time.Now().Unix())) + defer os.RemoveAll(testDir) + + logger, _ := zap.NewDevelopment() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Start 3 nodes + nodes := make([]*node.Node, 3) + for i := 0; i < 3; i++ { + cfg := config.DefaultConfig() + cfg.Node.DataDir = filepath.Join(testDir, fmt.Sprintf("node%d", i+1)) + cfg.P2P.ListenAddresses = []string{fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 14001+i)} + cfg.Database.ReplicationFactor = 3 + cfg.Database.MaxDatabases = 20 + cfg.Database.PortRangeHTTPStart = 15001 + (i * 200) + cfg.Database.PortRangeHTTPEnd = 15050 + (i * 200) + cfg.Database.PortRangeRaftStart = 17001 + (i * 200) + cfg.Database.PortRangeRaftEnd = 17050 + (i * 200) + + if i > 0 { + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cfg.P2P.BootstrapPeers = []string{bootstrapAddr} + } + + n, err := node.NewNode(cfg, logger) + if err != nil { + t.Fatalf("Failed to create node %d: %v", i+1, err) + } + + if err := n.Start(ctx); err != nil { + t.Fatalf("Failed to start node %d: %v", i+1, err) + } + defer n.Stop() + + nodes[i] = n + } + + time.Sleep(5 * time.Second) + + // Create client + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cli, err := client.NewClient(ctx, client.ClientConfig{ + AppName: "testapp", + BootstrapPeers: []string{bootstrapAddr}, + }) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + defer cli.Close() + + // Create multiple databases + databases := []string{"users_db", "products_db", "orders_db"} + for _, dbName := range databases { + db := cli.Database(dbName) + + // Create table specific to this database + _, err = db.WriteOne(fmt.Sprintf("CREATE TABLE %s_data (id INTEGER PRIMARY KEY, value TEXT)", dbName)) + if err != nil { + t.Fatalf("Failed to create table in %s: %v", dbName, err) + } + + // Insert data + _, err = db.WriteOne(fmt.Sprintf("INSERT INTO %s_data (value) VALUES ('%s_value')", dbName, dbName)) + if err != nil { + t.Fatalf("Failed to insert data in %s: %v", dbName, err) + } + } + + // Verify isolation - each database should only have its own data + for _, dbName := range databases { + db := cli.Database(dbName) + + rows, err := db.QueryOne(fmt.Sprintf("SELECT value FROM %s_data", dbName)) + if err != nil { + t.Fatalf("Failed to query %s: %v", dbName, err) + } + + if !rows.Next() { + t.Fatalf("Expected data in %s", dbName) + } + + var value string + if err := rows.Scan(&value); err != nil { + t.Fatalf("Failed to scan row from %s: %v", dbName, err) + } + + expectedValue := fmt.Sprintf("%s_value", dbName) + if value != expectedValue { + t.Errorf("Expected value '%s' in %s, got '%s'", expectedValue, dbName, value) + } + } + + t.Log("Multiple databases test passed") +} diff --git a/e2e/hibernation_test.go b/e2e/hibernation_test.go new file mode 100644 index 0000000..d4f3664 --- /dev/null +++ b/e2e/hibernation_test.go @@ -0,0 +1,329 @@ +package e2e + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/config" + "github.com/DeBrosOfficial/network/pkg/node" + "go.uber.org/zap" +) + +// TestHibernationCycle tests that databases hibernate after idle timeout +func TestHibernationCycle(t *testing.T) { + if testing.Short() { + t.Skip("Skipping e2e test in short mode") + } + + // Setup test environment + testDir := filepath.Join(os.TempDir(), fmt.Sprintf("debros_test_hibernate_%d", time.Now().Unix())) + defer os.RemoveAll(testDir) + + logger, _ := zap.NewDevelopment() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Start 3 nodes with short hibernation timeout + nodes := make([]*node.Node, 3) + for i := 0; i < 3; i++ { + cfg := config.DefaultConfig() + cfg.Node.DataDir = filepath.Join(testDir, fmt.Sprintf("node%d", i+1)) + cfg.P2P.ListenAddresses = []string{fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 14001+i)} + cfg.Database.ReplicationFactor = 3 + cfg.Database.MaxDatabases = 10 + cfg.Database.HibernationTimeout = 10 * time.Second // Short timeout for testing + cfg.Database.PortRangeHTTPStart = 15001 + (i * 100) + cfg.Database.PortRangeHTTPEnd = 15010 + (i * 100) + cfg.Database.PortRangeRaftStart = 17001 + (i * 100) + cfg.Database.PortRangeRaftEnd = 17010 + (i * 100) + + if i > 0 { + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cfg.P2P.BootstrapPeers = []string{bootstrapAddr} + } + + n, err := node.NewNode(cfg, logger) + if err != nil { + t.Fatalf("Failed to create node %d: %v", i+1, err) + } + + if err := n.Start(ctx); err != nil { + t.Fatalf("Failed to start node %d: %v", i+1, err) + } + defer n.Stop() + + nodes[i] = n + } + + time.Sleep(5 * time.Second) + + // Create client + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cli, err := client.NewClient(ctx, client.ClientConfig{ + AppName: "testapp", + BootstrapPeers: []string{bootstrapAddr}, + }) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + defer cli.Close() + + // Create database and write data + db := cli.Database("testdb") + + _, err = db.WriteOne("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + if err != nil { + t.Fatalf("Failed to create table: %v", err) + } + + _, err = db.WriteOne("INSERT INTO users (name) VALUES ('Alice')") + if err != nil { + t.Fatalf("Failed to insert data: %v", err) + } + + t.Log("Data written, waiting for hibernation...") + + // Wait for hibernation (timeout + grace period) + time.Sleep(20 * time.Second) + + t.Log("Hibernation period elapsed, database should be hibernating") + + // Note: In a real test, we would check the metadata store to verify hibernation status + // For now, we just verify the data directory still exists + for i := 0; i < 3; i++ { + dbDir := filepath.Join(testDir, fmt.Sprintf("node%d/testapp_testdb", i+1)) + if _, err := os.Stat(dbDir); os.IsNotExist(err) { + t.Errorf("Expected database directory to exist on node %d after hibernation", i+1) + } + } + + t.Log("Hibernation cycle test passed") +} + +// TestWakeUpCycle tests that hibernated databases wake up on access +func TestWakeUpCycle(t *testing.T) { + if testing.Short() { + t.Skip("Skipping e2e test in short mode") + } + + // Setup test environment + testDir := filepath.Join(os.TempDir(), fmt.Sprintf("debros_test_wakeup_%d", time.Now().Unix())) + defer os.RemoveAll(testDir) + + logger, _ := zap.NewDevelopment() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Start 3 nodes with short hibernation timeout + nodes := make([]*node.Node, 3) + for i := 0; i < 3; i++ { + cfg := config.DefaultConfig() + cfg.Node.DataDir = filepath.Join(testDir, fmt.Sprintf("node%d", i+1)) + cfg.P2P.ListenAddresses = []string{fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 14001+i)} + cfg.Database.ReplicationFactor = 3 + cfg.Database.MaxDatabases = 10 + cfg.Database.HibernationTimeout = 10 * time.Second + cfg.Database.PortRangeHTTPStart = 15001 + (i * 100) + cfg.Database.PortRangeHTTPEnd = 15010 + (i * 100) + cfg.Database.PortRangeRaftStart = 17001 + (i * 100) + cfg.Database.PortRangeRaftEnd = 17010 + (i * 100) + + if i > 0 { + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cfg.P2P.BootstrapPeers = []string{bootstrapAddr} + } + + n, err := node.NewNode(cfg, logger) + if err != nil { + t.Fatalf("Failed to create node %d: %v", i+1, err) + } + + if err := n.Start(ctx); err != nil { + t.Fatalf("Failed to start node %d: %v", i+1, err) + } + defer n.Stop() + + nodes[i] = n + } + + time.Sleep(5 * time.Second) + + // Create client + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cli, err := client.NewClient(ctx, client.ClientConfig{ + AppName: "testapp", + BootstrapPeers: []string{bootstrapAddr}, + }) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + defer cli.Close() + + // Create database and write data + db := cli.Database("testdb") + + _, err = db.WriteOne("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)") + if err != nil { + t.Fatalf("Failed to create table: %v", err) + } + + _, err = db.WriteOne("INSERT INTO users (name) VALUES ('Alice')") + if err != nil { + t.Fatalf("Failed to insert initial data: %v", err) + } + + t.Log("Waiting for hibernation...") + time.Sleep(20 * time.Second) + + t.Log("Attempting to wake up database by querying...") + + // Query should trigger wake-up + startTime := time.Now() + rows, err := db.QueryOne("SELECT * FROM users") + wakeupDuration := time.Since(startTime) + + if err != nil { + t.Fatalf("Failed to query after hibernation (wake-up failed): %v", err) + } + + t.Logf("Wake-up took %v", wakeupDuration) + + // Verify data persisted + if !rows.Next() { + t.Fatal("Expected at least one row after wake-up") + } + + var id int + var name string + if err := rows.Scan(&id, &name); err != nil { + t.Fatalf("Failed to scan row: %v", err) + } + + if name != "Alice" { + t.Errorf("Expected name 'Alice' after wake-up, got '%s'", name) + } + + // Verify we can write new data + _, err = db.WriteOne("INSERT INTO users (name) VALUES ('Bob')") + if err != nil { + t.Fatalf("Failed to insert data after wake-up: %v", err) + } + + // Verify both records exist + rows, err = db.QueryOne("SELECT COUNT(*) FROM users") + if err != nil { + t.Fatalf("Failed to count rows: %v", err) + } + + if !rows.Next() { + t.Fatal("Expected count result") + } + + var count int + if err := rows.Scan(&count); err != nil { + t.Fatalf("Failed to scan count: %v", err) + } + + if count != 2 { + t.Errorf("Expected 2 rows after wake-up and insert, got %d", count) + } + + t.Log("Wake-up cycle test passed") +} + +// TestConcurrentHibernation tests multiple databases hibernating simultaneously +func TestConcurrentHibernation(t *testing.T) { + if testing.Short() { + t.Skip("Skipping e2e test in short mode") + } + + // Setup test environment + testDir := filepath.Join(os.TempDir(), fmt.Sprintf("debros_test_concurrent_hibernate_%d", time.Now().Unix())) + defer os.RemoveAll(testDir) + + logger, _ := zap.NewDevelopment() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Start 3 nodes + nodes := make([]*node.Node, 3) + for i := 0; i < 3; i++ { + cfg := config.DefaultConfig() + cfg.Node.DataDir = filepath.Join(testDir, fmt.Sprintf("node%d", i+1)) + cfg.P2P.ListenAddresses = []string{fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 14001+i)} + cfg.Database.ReplicationFactor = 3 + cfg.Database.MaxDatabases = 20 + cfg.Database.HibernationTimeout = 10 * time.Second + cfg.Database.PortRangeHTTPStart = 15001 + (i * 200) + cfg.Database.PortRangeHTTPEnd = 15050 + (i * 200) + cfg.Database.PortRangeRaftStart = 17001 + (i * 200) + cfg.Database.PortRangeRaftEnd = 17050 + (i * 200) + + if i > 0 { + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cfg.P2P.BootstrapPeers = []string{bootstrapAddr} + } + + n, err := node.NewNode(cfg, logger) + if err != nil { + t.Fatalf("Failed to create node %d: %v", i+1, err) + } + + if err := n.Start(ctx); err != nil { + t.Fatalf("Failed to start node %d: %v", i+1, err) + } + defer n.Stop() + + nodes[i] = n + } + + time.Sleep(5 * time.Second) + + // Create client + bootstrapAddr := nodes[0].Host().Addrs()[0].String() + "/p2p/" + nodes[0].Host().ID().String() + cli, err := client.NewClient(ctx, client.ClientConfig{ + AppName: "testapp", + BootstrapPeers: []string{bootstrapAddr}, + }) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + defer cli.Close() + + // Create multiple databases + dbNames := []string{"db1", "db2", "db3"} + for _, dbName := range dbNames { + db := cli.Database(dbName) + _, err = db.WriteOne(fmt.Sprintf("CREATE TABLE %s_data (id INTEGER PRIMARY KEY, value TEXT)", dbName)) + if err != nil { + t.Fatalf("Failed to create table in %s: %v", dbName, err) + } + + _, err = db.WriteOne(fmt.Sprintf("INSERT INTO %s_data (value) VALUES ('%s_value')", dbName, dbName)) + if err != nil { + t.Fatalf("Failed to insert data in %s: %v", dbName, err) + } + } + + t.Log("All databases created, waiting for concurrent hibernation...") + time.Sleep(20 * time.Second) + + t.Log("Hibernation period elapsed") + + // Verify all data directories still exist + for _, dbName := range dbNames { + for i := 0; i < 3; i++ { + dbDir := filepath.Join(testDir, fmt.Sprintf("node%d/testapp_%s", i+1, dbName)) + if _, err := os.Stat(dbDir); os.IsNotExist(err) { + t.Errorf("Expected database directory for %s to exist on node %d", dbName, i+1) + } + } + } + + t.Log("Concurrent hibernation test passed") +} diff --git a/mantests/01_create_table.sh b/mantests/01_create_table.sh new file mode 100755 index 0000000..93eb433 --- /dev/null +++ b/mantests/01_create_table.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +# Test 1: Create Table +# This test creates a new table in the "testdb" database + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +echo "=========================================" +echo "Test 1: Create Table" +echo "=========================================" +echo "" +echo "Creating 'users' table in 'testdb'..." +echo "" + +# Make the request and capture both response and status +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/create-table" \ + -H "X-API-Key: ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "schema": "CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) + +# Extract HTTP status +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +# Extract JSON response (everything before HTTP_STATUS) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') + +# Display the JSON response formatted +echo "$JSON_RESPONSE" | jq '.' + +# Display HTTP status +echo "" +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "=========================================" +echo "Test 1 Complete" +echo "=========================================" + diff --git a/mantests/02_insert_data.sh b/mantests/02_insert_data.sh new file mode 100755 index 0000000..b5d6108 --- /dev/null +++ b/mantests/02_insert_data.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +# Test 2: Insert Data +# This test inserts data into the users table + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +echo "=========================================" +echo "Test 2: Insert Data" +echo "=========================================" +echo "" +echo "Inserting users into 'testdb'..." +echo "" + +# Insert Alice +echo "Inserting Alice..." +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "INSERT INTO users (name, email) VALUES (?, ?)", + "args": ["Alice", "alice@example.com"] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" + +# Insert Bob +echo "Inserting Bob..." +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "INSERT INTO users (name, email) VALUES (?, ?)", + "args": ["Bob", "bob@example.com"] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" + +# Insert Charlie +echo "Inserting Charlie..." +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "INSERT INTO users (name, email) VALUES (?, ?)", + "args": ["Charlie", "charlie@example.com"] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "=========================================" +echo "Test 2 Complete" +echo "=========================================" + diff --git a/mantests/03_query_data.sh b/mantests/03_query_data.sh new file mode 100755 index 0000000..d75069d --- /dev/null +++ b/mantests/03_query_data.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +# Test 3: Query Data +# This test queries data from the users table + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +echo "=========================================" +echo "Test 3: Query Data" +echo "=========================================" +echo "" +echo "Querying all users from 'testdb'..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/query" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "SELECT * FROM users ORDER BY id" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "Querying specific user (Alice)..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/query" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "SELECT * FROM users WHERE name = ?", + "args": ["Alice"] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "Counting users..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/query" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "SELECT COUNT(*) as count FROM users" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "=========================================" +echo "Test 3 Complete" +echo "=========================================" + diff --git a/mantests/04_execute_sql.sh b/mantests/04_execute_sql.sh new file mode 100755 index 0000000..b6b117a --- /dev/null +++ b/mantests/04_execute_sql.sh @@ -0,0 +1,107 @@ +#!/bin/bash + +# Test 4: Execute SQL +# This test executes various SQL operations + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +echo "=========================================" +echo "Test 4: Execute SQL" +echo "=========================================" +echo "" +echo "Creating products table..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, price REAL, stock INTEGER DEFAULT 0)" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "Inserting products..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", + "args": ["Laptop", 999.99, 10] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", + "args": ["Mouse", 29.99, 50] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "Updating product stock..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "UPDATE products SET stock = stock - 1 WHERE name = ?", + "args": ["Laptop"] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "Querying products..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/query" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "SELECT * FROM products" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "=========================================" +echo "Test 4 Complete" +echo "=========================================" + diff --git a/mantests/05_transaction.sh b/mantests/05_transaction.sh new file mode 100755 index 0000000..bf82ed2 --- /dev/null +++ b/mantests/05_transaction.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +# Test 5: Transaction +# This test executes multiple SQL statements in a transaction + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +echo "=========================================" +echo "Test 5: Transaction" +echo "=========================================" +echo "" +echo "Executing transaction (insert multiple orders)..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/transaction" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "queries": [ + "CREATE TABLE IF NOT EXISTS orders (id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, product_id INTEGER, quantity INTEGER, total REAL)", + "INSERT INTO orders (user_id, product_id, quantity, total) VALUES (1, 1, 2, 1999.98)", + "INSERT INTO orders (user_id, product_id, quantity, total) VALUES (2, 2, 5, 149.95)", + "INSERT INTO orders (user_id, product_id, quantity, total) VALUES (1, 2, 1, 29.99)" + ] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "Querying orders..." +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/query" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "testdb", + "sql": "SELECT * FROM orders" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "=========================================" +echo "Test 5 Complete" +echo "=========================================" + diff --git a/mantests/06_get_schema.sh b/mantests/06_get_schema.sh new file mode 100755 index 0000000..4b432bf --- /dev/null +++ b/mantests/06_get_schema.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Test 6: Get Schema +# This test retrieves the schema of a database + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +echo "=========================================" +echo "Test 6: Get Schema" +echo "=========================================" +echo "" +echo "Getting schema for 'testdb'..." +echo "" + +RESPONSE=$(curl -X GET "${GATEWAY_URL}/v1/database/schema?database=testdb" \ + -H "Authorization: Bearer ${API_KEY}" \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "=========================================" +echo "Test 6 Complete" +echo "=========================================" + diff --git a/mantests/07_multiple_databases.sh b/mantests/07_multiple_databases.sh new file mode 100755 index 0000000..1d56cde --- /dev/null +++ b/mantests/07_multiple_databases.sh @@ -0,0 +1,122 @@ +#!/bin/bash + +# Test 7: Multiple Databases +# This test creates and uses multiple isolated databases + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +# Helper function to make curl requests with proper JSON parsing +make_request() { + local method="$1" + local url="$2" + local data="$3" + + RESPONSE=$(curl -X "$method" "$url" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + ${data:+-d "$data"} \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) + + HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) + JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') + echo "$JSON_RESPONSE" | jq '.' + echo "HTTP Status: $HTTP_STATUS" +} + +echo "=========================================" +echo "Test 7: Multiple Databases" +echo "=========================================" +echo "" + +# Create users database +echo "Creating 'users_db' with users table..." +make_request "POST" "${GATEWAY_URL}/v1/database/create-table" '{ + "database": "users_db", + "schema": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)" + }' + +echo "" + +# Insert into users database +echo "Inserting into users_db..." +make_request "POST" "${GATEWAY_URL}/v1/database/exec" '{ + "database": "users_db", + "sql": "INSERT INTO users (name) VALUES (?)", + "args": ["User from users_db"] + }' + +echo "" + +# Create products database +echo "Creating 'products_db' with products table..." +make_request "POST" "${GATEWAY_URL}/v1/database/create-table" '{ + "database": "products_db", + "schema": "CREATE TABLE products (id INTEGER PRIMARY KEY, name TEXT, price REAL)" + }' + +echo "" + +# Insert into products database +echo "Inserting into products_db..." +make_request "POST" "${GATEWAY_URL}/v1/database/exec" '{ + "database": "products_db", + "sql": "INSERT INTO products (name, price) VALUES (?, ?)", + "args": ["Product from products_db", 99.99] + }' + +echo "" + +# Create orders database +echo "Creating 'orders_db' with orders table..." +make_request "POST" "${GATEWAY_URL}/v1/database/create-table" '{ + "database": "orders_db", + "schema": "CREATE TABLE orders (id INTEGER PRIMARY KEY, order_number TEXT)" + }' + +echo "" + +# Insert into orders database +echo "Inserting into orders_db..." +make_request "POST" "${GATEWAY_URL}/v1/database/exec" '{ + "database": "orders_db", + "sql": "INSERT INTO orders (order_number) VALUES (?)", + "args": ["ORD-001"] + }' + +echo "" +echo "=========================================" +echo "Verifying Data Isolation" +echo "=========================================" +echo "" + +# Query each database +echo "Querying users_db..." +make_request "POST" "${GATEWAY_URL}/v1/database/query" '{ + "database": "users_db", + "sql": "SELECT * FROM users" + }' + +echo "" + +echo "Querying products_db..." +make_request "POST" "${GATEWAY_URL}/v1/database/query" '{ + "database": "products_db", + "sql": "SELECT * FROM products" + }' + +echo "" + +echo "Querying orders_db..." +make_request "POST" "${GATEWAY_URL}/v1/database/query" '{ + "database": "orders_db", + "sql": "SELECT * FROM orders" + }' + +echo "" +echo "=========================================" +echo "Test 7 Complete" +echo "Expected: Each database contains only its own data" +echo "=========================================" + diff --git a/mantests/08_hibernation_test.sh b/mantests/08_hibernation_test.sh new file mode 100755 index 0000000..bd39355 --- /dev/null +++ b/mantests/08_hibernation_test.sh @@ -0,0 +1,136 @@ +#!/bin/bash + +# Test 8: Hibernation and Wake-Up +# This test verifies hibernation and wake-up behavior +# NOTE: This test requires nodes to be configured with a short hibernation timeout + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +echo "=========================================" +echo "Test 8: Hibernation and Wake-Up" +echo "=========================================" +echo "" +echo "NOTE: This test requires nodes configured with hibernation_timeout" +echo " Default is 60 seconds. Adjust wait times accordingly." +echo "" + +# Create database and insert data +echo "Creating 'hibernate_test_db' and inserting data..." +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/create-table" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "hibernate_test_db", + "schema": "CREATE TABLE test_data (id INTEGER PRIMARY KEY, value TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "hibernate_test_db", + "sql": "INSERT INTO test_data (value) VALUES (?)", + "args": ["Initial data before hibernation"] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "Data inserted. Database is now active." +echo "" +echo "Waiting for hibernation timeout (default: 60 seconds)..." +echo "You can monitor node logs for 'Database is idle' messages." +echo "" + +# Wait for hibernation (adjust based on your config) +HIBERNATION_TIMEOUT=70 +for i in $(seq $HIBERNATION_TIMEOUT -10 0); do + echo -ne "Waiting ${i} seconds...\r" + sleep 10 +done + +echo "" +echo "" +echo "Hibernation period elapsed. Database should be hibernating now." +echo "" +echo "Attempting to query (this should trigger wake-up)..." +echo "" + +# Measure wake-up time +START_TIME=$(date +%s) + +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/query" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "hibernate_test_db", + "sql": "SELECT * FROM test_data" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +END_TIME=$(date +%s) +WAKE_TIME=$((END_TIME - START_TIME)) + +echo "" +echo "Query completed in ${WAKE_TIME} seconds" +echo "Expected: < 10 seconds for wake-up" +echo "" + +# Verify data persisted +echo "Inserting new data after wake-up..." +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "hibernate_test_db", + "sql": "INSERT INTO test_data (value) VALUES (?)", + "args": ["Data after wake-up"] + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" + +echo "Querying all data..." +RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/query" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "database": "hibernate_test_db", + "sql": "SELECT * FROM test_data ORDER BY id" + }' \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) +HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) +JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') +echo "$JSON_RESPONSE" | jq '.' +echo "HTTP Status: $HTTP_STATUS" + +echo "" +echo "=========================================" +echo "Test 8 Complete" +echo "Expected: Both records present (data persisted through hibernation)" +echo "=========================================" + diff --git a/mantests/09_stress_test.sh b/mantests/09_stress_test.sh new file mode 100755 index 0000000..e2256b2 --- /dev/null +++ b/mantests/09_stress_test.sh @@ -0,0 +1,93 @@ +#!/bin/bash + +# Test 9: Stress Test - Create Many Databases +# This test creates multiple databases to test capacity and distribution + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:6001" + +echo "=========================================" +echo "Test 9: Stress Test - Multiple Databases" +echo "=========================================" +echo "" +echo "Creating 10 databases with data..." +echo "" + +for i in {1..10}; do + DB_NAME="stress_test_db_${i}" + echo "Creating ${DB_NAME}..." + + # Create table + RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/create-table" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d "{ + \"database\": \"${DB_NAME}\", + \"schema\": \"CREATE TABLE data (id INTEGER PRIMARY KEY, value TEXT, db_number INTEGER)\" + }" \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) + HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) + JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') + echo "$JSON_RESPONSE" | jq -c '.' + echo "HTTP Status: $HTTP_STATUS" + + # Insert data + RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/exec" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d "{ + \"database\": \"${DB_NAME}\", + \"sql\": \"INSERT INTO data (value, db_number) VALUES (?, ?)\", + \"args\": [\"Data from database ${i}\", ${i}] + }" \ + -w "\nHTTP_STATUS:%{http_code}" \ + -s) + HTTP_STATUS=$(echo "$RESPONSE" | grep "HTTP_STATUS:" | cut -d: -f2) + JSON_RESPONSE=$(echo "$RESPONSE" | sed '/HTTP_STATUS:/d') + echo "$JSON_RESPONSE" | jq -c '.' + echo "HTTP Status: $HTTP_STATUS" + + echo "" + + # Small delay to avoid overwhelming the system + sleep 2 +done + +echo "=========================================" +echo "Verifying all databases..." +echo "=========================================" +echo "" + +SUCCESS_COUNT=0 +FAIL_COUNT=0 + +for i in {1..10}; do + DB_NAME="stress_test_db_${i}" + echo "Querying ${DB_NAME}..." + + RESPONSE=$(curl -X POST "${GATEWAY_URL}/v1/database/query" \ + -H "Authorization: Bearer ${API_KEY}" \ + -H "Content-Type: application/json" \ + -d "{ + \"database\": \"${DB_NAME}\", + \"sql\": \"SELECT * FROM data WHERE db_number = ${i}\" + }" \ + -s) + + if echo "$RESPONSE" | jq -e '.rows | length > 0' > /dev/null 2>&1; then + echo "✓ ${DB_NAME} OK" + SUCCESS_COUNT=$((SUCCESS_COUNT + 1)) + else + echo "✗ ${DB_NAME} FAILED" + FAIL_COUNT=$((FAIL_COUNT + 1)) + fi +done + +echo "" +echo "=========================================" +echo "Test 9 Complete" +echo "Success: ${SUCCESS_COUNT}/10" +echo "Failed: ${FAIL_COUNT}/10" +echo "=========================================" + diff --git a/mantests/README.md b/mantests/README.md new file mode 100644 index 0000000..58b5b07 --- /dev/null +++ b/mantests/README.md @@ -0,0 +1,79 @@ +# Manual Testing Scripts + +This directory contains manual test scripts for testing the dynamic database clustering feature through the Gateway API. + +## Prerequisites + +1. **Start the Gateway**: + ```bash + cd /Users/penguin/dev/debros/network + make run-gateway + ``` + +2. **Start at least 3 nodes** (in separate terminals): + ```bash + # Terminal 1 + make run-node + + # Terminal 2 + make run-node2 + + # Terminal 3 + make run-node3 + ``` + +3. **Set your API key**: + The scripts use the API key: `ak_L1zF6g7Np1dSRyy-zp_cXFfA:default` + +## Test Scripts + +### Basic Tests +- `01_create_table.sh` - Create a table in a database +- `02_insert_data.sh` - Insert data into a table +- `03_query_data.sh` - Query data from a table +- `04_execute_sql.sh` - Execute arbitrary SQL +- `05_transaction.sh` - Execute a transaction +- `06_get_schema.sh` - Get database schema + +### Advanced Tests +- `07_multiple_databases.sh` - Test multiple isolated databases +- `08_hibernation_test.sh` - Test hibernation and wake-up +- `09_stress_test.sh` - Create many databases + +### Utility Scripts +- `cleanup.sh` - Clean up test databases +- `run_all_tests.sh` - Run all tests in sequence + +## Usage + +Make scripts executable: +```bash +chmod +x mantests/*.sh +``` + +Run individual test: +```bash +./mantests/01_create_table.sh +``` + +Run all tests: +```bash +./mantests/run_all_tests.sh +``` + +## Expected Results + +All scripts should return HTTP 200 status codes and appropriate JSON responses. Check the output for: +- Success messages +- Returned data matching expectations +- No errors in the JSON responses + +## Troubleshooting + +If tests fail: +1. Ensure gateway is running on `http://localhost:8080` +2. Ensure at least 3 nodes are running +3. Check that nodes have discovered each other (wait 10 seconds after startup) +4. Verify API key is valid +5. Check gateway and node logs for errors + diff --git a/mantests/cleanup.sh b/mantests/cleanup.sh new file mode 100755 index 0000000..1c75a32 --- /dev/null +++ b/mantests/cleanup.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +# Cleanup Script +# This script helps clean up test databases +# NOTE: There's no DELETE endpoint yet, so this is informational + +API_KEY="ak_L1zF6g7Np1dSRyy-zp_cXFfA:default" +GATEWAY_URL="http://localhost:8080" + +echo "=========================================" +echo "Cleanup Information" +echo "=========================================" +echo "" +echo "Test databases created:" +echo " - testdb" +echo " - users_db" +echo " - products_db" +echo " - orders_db" +echo " - hibernate_test_db" +echo " - stress_test_db_1 through stress_test_db_10" +echo "" +echo "To clean up:" +echo "1. Stop all nodes" +echo "2. Remove data directories:" +echo " rm -rf data/bootstrap/testapp_*" +echo " rm -rf data/node/testapp_*" +echo " rm -rf data/node2/testapp_*" +echo "3. Restart nodes" +echo "" +echo "Or to keep data but test fresh:" +echo " - Use different database names" +echo " - Use DROP TABLE statements" +echo "" +echo "=========================================" + diff --git a/mantests/run_all_tests.sh b/mantests/run_all_tests.sh new file mode 100755 index 0000000..bd7fcbe --- /dev/null +++ b/mantests/run_all_tests.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +# Run All Tests +# This script runs all manual tests in sequence + +echo "=========================================" +echo "Running All Manual Tests" +echo "=========================================" +echo "" +echo "Prerequisites:" +echo " - Gateway running on http://localhost:8080" +echo " - At least 3 nodes running" +echo " - Nodes have discovered each other" +echo "" +read -p "Press Enter to continue or Ctrl+C to cancel..." +echo "" + +# Array of test scripts +TESTS=( + "01_create_table.sh" + "02_insert_data.sh" + "03_query_data.sh" + "04_execute_sql.sh" + "05_transaction.sh" + "06_get_schema.sh" + "07_multiple_databases.sh" + "09_stress_test.sh" +) + +# Note: Skipping 08_hibernation_test.sh as it requires long wait times + +PASSED=0 +FAILED=0 + +for test in "${TESTS[@]}"; do + echo "" + echo "=========================================" + echo "Running: $test" + echo "=========================================" + + if bash "mantests/$test"; then + PASSED=$((PASSED + 1)) + echo "✓ $test PASSED" + else + FAILED=$((FAILED + 1)) + echo "✗ $test FAILED" + fi + + echo "" + echo "Waiting 3 seconds before next test..." + sleep 3 +done + +echo "" +echo "=========================================" +echo "All Tests Complete" +echo "=========================================" +echo "Passed: $PASSED" +echo "Failed: $FAILED" +echo "" +echo "Note: Test 08 (hibernation) was skipped due to long wait times." +echo "Run it manually if needed: ./mantests/08_hibernation_test.sh" +echo "" +echo "=========================================" + diff --git a/pkg/config/config.go b/pkg/config/config.go index 5c90a5a..d939454 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -38,6 +38,12 @@ type DatabaseConfig struct { PortRangeHTTPEnd int `yaml:"port_range_http_end"` // HTTP port range end PortRangeRaftStart int `yaml:"port_range_raft_start"` // Raft port range start PortRangeRaftEnd int `yaml:"port_range_raft_end"` // Raft port range end + + // System database (always-on, holds API keys, wallets, etc.) + SystemDatabaseName string `yaml:"system_database_name"` // Name of the system database (default: "_system") + SystemHTTPPort int `yaml:"rqlite_port"` // Fixed HTTP port for _system database + SystemRaftPort int `yaml:"rqlite_raft_port"` // Fixed Raft port for _system database + MigrationsPath string `yaml:"migrations_path"` // Path to SQL migrations directory } // DiscoveryConfig contains peer discovery configuration @@ -111,10 +117,16 @@ func DefaultConfig() *Config { PortRangeHTTPEnd: 5999, PortRangeRaftStart: 7001, PortRangeRaftEnd: 7999, + + // System database + SystemDatabaseName: "_system", + SystemHTTPPort: 5001, + SystemRaftPort: 7001, + MigrationsPath: "./migrations", }, Discovery: DiscoveryConfig{ BootstrapPeers: []string{ - "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWJSkwRXX1PYA5g2bkAnKani7frhygZp5iY6U853JJPb7K", + "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWD2GoaF6Y6XYZ9d3xKpYqGPsjcKseRGFMnpvHMjScx8w7", // "/ip4/217.76.54.168/tcp/4001/p2p/12D3KooWDp7xeShVY9uHfqNVPSsJeCKUatAviFZV8Y1joox5nUvx", // "/ip4/217.76.54.178/tcp/4001/p2p/12D3KooWKZnirPwNT4URtNSWK45f6vLkEs4xyUZ792F8Uj1oYnm1", // "/ip4/51.83.128.181/tcp/4001/p2p/12D3KooWBn2Zf1R8v9pEfmz7hDZ5b3oADxfejA3zJBYzKRCzgvhR", diff --git a/pkg/gateway/auth_handlers.go b/pkg/gateway/auth_handlers.go index ceec8e7..7c4ddc3 100644 --- a/pkg/gateway/auth_handlers.go +++ b/pkg/gateway/auth_handlers.go @@ -12,7 +12,9 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/client" + "github.com/DeBrosOfficial/network/pkg/logging" ethcrypto "github.com/ethereum/go-ethereum/crypto" + "go.uber.org/zap" ) func (g *Gateway) whoamiHandler(w http.ResponseWriter, r *http.Request) { @@ -409,7 +411,15 @@ func (g *Gateway) apiKeyToJWTHandler(w http.ResponseWriter, r *http.Request) { internalCtx := client.WithInternalAuth(ctx) q := "SELECT namespaces.name FROM api_keys JOIN namespaces ON api_keys.namespace_id = namespaces.id WHERE api_keys.key = ? LIMIT 1" res, err := db.Query(internalCtx, q, key) - if err != nil || res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + if err != nil { + // Database connectivity error - return 503 + g.logger.ComponentError(logging.ComponentDatabase, "Failed to query API key", + zap.Error(err)) + writeError(w, http.StatusServiceUnavailable, "database unavailable") + return + } + if res == nil || res.Count == 0 || len(res.Rows) == 0 || len(res.Rows[0]) == 0 { + // API key not found in database - return 401 writeError(w, http.StatusUnauthorized, "invalid API key") return } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 42945d3..20abfff 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -4,11 +4,15 @@ import ( "context" "crypto/rand" "crypto/rsa" + "fmt" + "os" "strconv" + "strings" "time" "github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/logging" + "github.com/multiformats/go-multiaddr" "go.uber.org/zap" ) @@ -17,10 +21,6 @@ type Config struct { ListenAddr string ClientNamespace string BootstrapPeers []string - - // Optional DSN for rqlite database/sql driver, e.g. "http://localhost:4001" - // If empty, defaults to "http://localhost:4001". - RQLiteDSN string } type Gateway struct { @@ -32,16 +32,77 @@ type Gateway struct { keyID string } +// deriveRQLiteEndpoints extracts IP addresses from bootstrap peer multiaddrs +// and constructs RQLite HTTP endpoints using the fixed system database port (5001) +func deriveRQLiteEndpoints(bootstrapPeers []string, systemHTTPPort int) []string { + if systemHTTPPort == 0 { + systemHTTPPort = 5001 // default + } + + endpoints := make([]string, 0) + seen := make(map[string]bool) + + for _, peerAddr := range bootstrapPeers { + ma, err := multiaddr.NewMultiaddr(peerAddr) + if err != nil { + continue + } + + // Extract IP address from multiaddr + var ip string + multiaddr.ForEach(ma, func(c multiaddr.Component) bool { + if c.Protocol().Code == multiaddr.P_IP4 { + ip = c.Value() + return false // stop iteration + } + if c.Protocol().Code == multiaddr.P_IP6 { + ip = "[" + c.Value() + "]" // IPv6 needs brackets + return false + } + return true + }) + + if ip != "" && !seen[ip] { + endpoint := fmt.Sprintf("http://%s:%d", ip, systemHTTPPort) + endpoints = append(endpoints, endpoint) + seen[ip] = true + } + } + + return endpoints +} + // New creates and initializes a new Gateway instance func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.ComponentInfo(logging.ComponentGeneral, "Building client config...") // Build client config from gateway cfg - cliCfg := client.DefaultClientConfig(cfg.ClientNamespace) + // Gateway uses the system database for API keys, wallets, etc. + cliCfg := client.DefaultClientConfig("_system") + cliCfg.DatabaseName = "_system" // Override to use system database directly if len(cfg.BootstrapPeers) > 0 { cliCfg.BootstrapPeers = cfg.BootstrapPeers } + // Derive RQLite endpoints from bootstrap peers + // Check for env override first + if envDSN := strings.TrimSpace(os.Getenv("GATEWAY_RQLITE_DSN")); envDSN != "" { + cliCfg.DatabaseEndpoints = strings.Split(envDSN, ",") + for i, ep := range cliCfg.DatabaseEndpoints { + cliCfg.DatabaseEndpoints[i] = strings.TrimSpace(ep) + } + logger.ComponentInfo(logging.ComponentGeneral, "Using RQLite endpoints from GATEWAY_RQLITE_DSN env", + zap.Strings("endpoints", cliCfg.DatabaseEndpoints)) + } else { + // Auto-derive from bootstrap peers + system port (5001) + // This will try port 5001 on each peer (works for single-node and distributed clusters) + // For multi-node localhost, set GATEWAY_RQLITE_DSN to the actual ports + cliCfg.DatabaseEndpoints = deriveRQLiteEndpoints(cfg.BootstrapPeers, 5001) + logger.ComponentInfo(logging.ComponentGeneral, "Derived RQLite endpoints from bootstrap peers", + zap.Strings("endpoints", cliCfg.DatabaseEndpoints), + zap.String("note", "For multi-node localhost, set GATEWAY_RQLITE_DSN env to actual ports")) + } + logger.ComponentInfo(logging.ComponentGeneral, "Creating network client...") c, err := client.NewClient(cliCfg) if err != nil { diff --git a/pkg/rqlite/cluster_handlers.go b/pkg/rqlite/cluster_handlers.go index cfb4b2a..aca6712 100644 --- a/pkg/rqlite/cluster_handlers.go +++ b/pkg/rqlite/cluster_handlers.go @@ -33,8 +33,35 @@ func (cm *ClusterManager) handleCreateRequest(msg *MetadataMessage) error { return nil } - // Allocate ports - ports, err := cm.portManager.AllocatePortPair(req.DatabaseName) + // Allocate ports - prefer fixed ports for system database, fall back to dynamic + var ports PortPair + var err error + + systemDBName := cm.config.SystemDatabaseName + if systemDBName == "" { + systemDBName = "_system" + } + + if req.DatabaseName == systemDBName && cm.config.SystemHTTPPort > 0 { + // Try to use fixed ports for system database first + ports = PortPair{ + HTTPPort: cm.config.SystemHTTPPort, + RaftPort: cm.config.SystemRaftPort, + } + err = cm.portManager.AllocateSpecificPortPair(req.DatabaseName, ports) + if err != nil { + // Fixed ports unavailable (likely multi-node localhost) - use dynamic + cm.logger.Info("Fixed system ports unavailable, using dynamic allocation", + zap.String("database", req.DatabaseName), + zap.Int("attempted_http", ports.HTTPPort), + zap.Int("attempted_raft", ports.RaftPort)) + ports, err = cm.portManager.AllocatePortPair(req.DatabaseName) + } + } else { + // Use dynamic ports for other databases + ports, err = cm.portManager.AllocatePortPair(req.DatabaseName) + } + if err != nil { cm.logger.Warn("Cannot allocate ports for database", zap.String("database", req.DatabaseName), diff --git a/pkg/rqlite/cluster_manager.go b/pkg/rqlite/cluster_manager.go index cf928e0..4d328ea 100644 --- a/pkg/rqlite/cluster_manager.go +++ b/pkg/rqlite/cluster_manager.go @@ -71,12 +71,19 @@ func (cm *ClusterManager) Start() error { zap.String("node_id", cm.nodeID), zap.Int("max_databases", cm.config.MaxDatabases)) + cm.metadataStore.SetLogger(cm.logger.With(zap.String("component", "metadata_store"))) + // Subscribe to metadata topic metadataTopic := "/debros/metadata/v1" if err := cm.pubsubAdapter.Subscribe(cm.ctx, metadataTopic, cm.handleMetadataMessage); err != nil { return fmt.Errorf("failed to subscribe to metadata topic: %w", err) } + // Initialize system database + if err := cm.initializeSystemDatabase(); err != nil { + return fmt.Errorf("failed to initialize system database: %w", err) + } + // Announce node capacity go cm.announceCapacityPeriodically() @@ -128,8 +135,8 @@ func (cm *ClusterManager) handleMetadataMessage(topic string, data []byte) error return nil } - // Skip messages from self - if msg.NodeID == cm.nodeID { + // Skip messages from self (except DATABASE_CREATE_CONFIRM which coordinator needs to process) + if msg.NodeID == cm.nodeID && msg.Type != MsgDatabaseCreateConfirm { return nil } @@ -191,6 +198,54 @@ func (cm *ClusterManager) CreateDatabase(dbName string, replicationFactor int) e cm.coordinatorRegistry.Register(coordinator) defer cm.coordinatorRegistry.Remove(dbName) + // Check if this node can also participate + cm.mu.RLock() + currentCount := len(cm.activeClusters) + cm.mu.RUnlock() + + if currentCount < cm.config.MaxDatabases { + // This node can host - add self-response + systemDBName := cm.config.SystemDatabaseName + if systemDBName == "" { + systemDBName = "_system" + } + + var selfPorts PortPair + var portErr error + + if dbName == systemDBName && cm.config.SystemHTTPPort > 0 { + // Try fixed ports for system database + selfPorts = PortPair{ + HTTPPort: cm.config.SystemHTTPPort, + RaftPort: cm.config.SystemRaftPort, + } + portErr = cm.portManager.AllocateSpecificPortPair(dbName, selfPorts) + if portErr != nil { + // Fixed ports unavailable - use dynamic + cm.logger.Info("Fixed system ports unavailable on requester, using dynamic", + zap.String("database", dbName)) + selfPorts, portErr = cm.portManager.AllocatePortPair(dbName) + } + } else { + // Dynamic ports for non-system databases + selfPorts, portErr = cm.portManager.AllocatePortPair(dbName) + } + + if portErr == nil { + // Add self as a candidate + selfResponse := DatabaseCreateResponse{ + DatabaseName: dbName, + NodeID: cm.nodeID, + AvailablePorts: selfPorts, + } + coordinator.AddResponse(selfResponse) + cm.logger.Debug("Added self as candidate for database", + zap.String("database", dbName), + zap.Int("http_port", selfPorts.HTTPPort), + zap.Int("raft_port", selfPorts.RaftPort)) + } + } + // Broadcast create request req := DatabaseCreateRequest{ DatabaseName: dbName, @@ -230,13 +285,13 @@ func (cm *ClusterManager) CreateDatabase(dbName string, replicationFactor int) e zap.String("database", dbName), zap.Int("count", len(selectedResponses))) - // Determine if this node is the coordinator (lowest ID among responders) - allNodeIDs := make([]string, len(selectedResponses)) - for i, resp := range selectedResponses { - allNodeIDs[i] = resp.NodeID - } - coordinatorID := SelectCoordinator(allNodeIDs) - isCoordinator := coordinatorID == cm.nodeID + // The requesting node is always the coordinator for its own request + // This ensures deterministic coordination and avoids race conditions + isCoordinator := true + + cm.logger.Info("This node is the requester and will coordinate", + zap.String("database", dbName), + zap.String("requester_node", cm.nodeID)) if isCoordinator { cm.logger.Info("This node is coordinator, broadcasting confirmation", @@ -526,3 +581,168 @@ func (cm *ClusterManager) reconcileOrphanedData() { cm.logger.Info("Orphaned data reconciliation complete", zap.Int("orphans_found", orphanCount)) } + +// initializeSystemDatabase creates and starts the system database on this node +func (cm *ClusterManager) initializeSystemDatabase() error { + systemDBName := cm.config.SystemDatabaseName + if systemDBName == "" { + systemDBName = "_system" + } + + cm.logger.Info("Initializing system database", + zap.String("database", systemDBName), + zap.Int("replication_factor", cm.config.ReplicationFactor)) + + // Wait longer for nodes to discover each other + cm.logger.Info("Waiting for peer discovery before system database creation...") + time.Sleep(5 * time.Second) + + // Check if system database already exists in metadata + existingDB := cm.metadataStore.GetDatabase(systemDBName) + if existingDB != nil { + cm.logger.Info("System database already exists in metadata, waiting for it to become active", + zap.String("database", systemDBName)) + } else { + // Only create if we don't see it in metadata yet + cm.logger.Info("Creating system database", + zap.String("database", systemDBName)) + + // Try creating with retries (important for system database) + maxRetries := 3 + var lastErr error + for attempt := 1; attempt <= maxRetries; attempt++ { + // Check again if it was created by another node + if cm.metadataStore.GetDatabase(systemDBName) != nil { + cm.logger.Info("System database now exists (created by another node)") + break + } + + lastErr = cm.CreateDatabase(systemDBName, cm.config.ReplicationFactor) + if lastErr == nil { + break + } + + cm.logger.Warn("System database creation attempt failed", + zap.Int("attempt", attempt), + zap.Int("max_retries", maxRetries), + zap.Error(lastErr)) + + if attempt < maxRetries { + // Wait before retry to allow more nodes to join + time.Sleep(3 * time.Second) + } + } + + if lastErr != nil { + cm.logger.Warn("System database creation completed with errors (may be created by another node)", + zap.Error(lastErr)) + } + } + + // Wait for system database to become active (longer timeout) + maxWait := 60 * time.Second + checkInterval := 500 * time.Millisecond + startTime := time.Now() + + for { + if time.Since(startTime) > maxWait { + // Don't fail startup - system database might be created later + cm.logger.Warn("Timeout waiting for system database, continuing startup", + zap.String("database", systemDBName), + zap.Duration("waited", time.Since(startTime))) + return nil // Return nil to allow node to start + } + + cm.mu.RLock() + instance, exists := cm.activeClusters[systemDBName] + cm.mu.RUnlock() + + if exists && instance.Status == StatusActive { + cm.logger.Info("System database is active", + zap.String("database", systemDBName)) + + // Run migrations if configured + if cm.config.MigrationsPath != "" { + if err := cm.runMigrations(systemDBName); err != nil { + cm.logger.Error("Failed to run migrations on system database", + zap.Error(err)) + // Don't fail startup, just log the error + } + } + + return nil + } + + time.Sleep(checkInterval) + } +} + +// runMigrations executes SQL migrations on a database +func (cm *ClusterManager) runMigrations(dbName string) error { + cm.logger.Info("Running migrations", + zap.String("database", dbName), + zap.String("migrations_path", cm.config.MigrationsPath)) + + cm.mu.RLock() + instance, exists := cm.activeClusters[dbName] + cm.mu.RUnlock() + + if !exists { + return fmt.Errorf("database %s not found in active clusters", dbName) + } + + conn := instance.Connection + if conn == nil { + return fmt.Errorf("no connection available for database %s", dbName) + } + + // Read migration files + files, err := filepath.Glob(filepath.Join(cm.config.MigrationsPath, "*.sql")) + if err != nil { + return fmt.Errorf("failed to read migration files: %w", err) + } + + if len(files) == 0 { + cm.logger.Info("No migration files found", + zap.String("path", cm.config.MigrationsPath)) + return nil + } + + // Sort files to ensure consistent order + // Files are expected to be named like 001_initial.sql, 002_core.sql, etc. + // filepath.Glob already returns them sorted + + cm.logger.Info("Found migration files", + zap.Int("count", len(files))) + + // Execute each migration file + for _, file := range files { + cm.logger.Info("Executing migration", + zap.String("file", filepath.Base(file))) + + content, err := os.ReadFile(file) + if err != nil { + return fmt.Errorf("failed to read migration file %s: %w", file, err) + } + + // Execute the migration using Execute (no automatic transaction wrapping) + // Migration files already contain BEGIN/COMMIT + _, err = conn.Execute(string(content)) + if err != nil { + cm.logger.Error("Migration failed", + zap.String("file", filepath.Base(file)), + zap.Error(err)) + // Continue with other migrations even if one fails + // (tables might already exist from previous runs) + continue + } + + cm.logger.Info("Migration completed", + zap.String("file", filepath.Base(file))) + } + + cm.logger.Info("All migrations completed", + zap.String("database", dbName)) + + return nil +} diff --git a/pkg/rqlite/consensus_test.go b/pkg/rqlite/consensus_test.go new file mode 100644 index 0000000..bf5f352 --- /dev/null +++ b/pkg/rqlite/consensus_test.go @@ -0,0 +1,78 @@ +package rqlite + +import ( + "testing" +) + +func TestSelectCoordinator_SingleNode(t *testing.T) { + nodeIDs := []string{"node1"} + coordinator := SelectCoordinator(nodeIDs) + + if coordinator != "node1" { + t.Errorf("Expected node1, got %s", coordinator) + } +} + +func TestSelectCoordinator_MultipleNodes(t *testing.T) { + nodeIDs := []string{"node3", "node1", "node2"} + coordinator := SelectCoordinator(nodeIDs) + + // Should select lowest lexicographical ID + if coordinator != "node1" { + t.Errorf("Expected node1 (lowest ID), got %s", coordinator) + } +} + +func TestSelectCoordinator_EmptyList(t *testing.T) { + nodeIDs := []string{} + coordinator := SelectCoordinator(nodeIDs) + + if coordinator != "" { + t.Errorf("Expected empty string for empty list, got %s", coordinator) + } +} + +func TestSelectCoordinator_Deterministic(t *testing.T) { + nodeIDs := []string{"nodeZ", "nodeA", "nodeM", "nodeB"} + + // Run multiple times + results := make(map[string]int) + for i := 0; i < 10; i++ { + coordinator := SelectCoordinator(nodeIDs) + results[coordinator]++ + } + + // Should always return the same result + if len(results) != 1 { + t.Errorf("Expected deterministic result, got multiple: %v", results) + } + + // Should be nodeA + if _, exists := results["nodeA"]; !exists { + t.Errorf("Expected nodeA to be selected, got %v", results) + } +} + +func TestIsCoordinator_True(t *testing.T) { + nodeIDs := []string{"node3", "node1", "node2"} + + if !IsCoordinator("node1", nodeIDs) { + t.Error("Expected node1 to be coordinator") + } +} + +func TestIsCoordinator_False(t *testing.T) { + nodeIDs := []string{"node3", "node1", "node2"} + + if IsCoordinator("node2", nodeIDs) { + t.Error("Expected node2 to NOT be coordinator") + } +} + +func TestIsCoordinator_EmptyList(t *testing.T) { + nodeIDs := []string{} + + if IsCoordinator("node1", nodeIDs) { + t.Error("Expected false for empty node list") + } +} diff --git a/pkg/rqlite/coordinator_test.go b/pkg/rqlite/coordinator_test.go new file mode 100644 index 0000000..29ecfb9 --- /dev/null +++ b/pkg/rqlite/coordinator_test.go @@ -0,0 +1,242 @@ +package rqlite + +import ( + "context" + "testing" + "time" + + "go.uber.org/zap" +) + +func TestCreateCoordinator_AddResponse(t *testing.T) { + logger := zap.NewNop() + coordinator := NewCreateCoordinator("testdb", 3, "node1", logger) + + response := DatabaseCreateResponse{ + DatabaseName: "testdb", + NodeID: "node2", + AvailablePorts: PortPair{ + HTTPPort: 5001, + RaftPort: 7001, + }, + } + + coordinator.AddResponse(response) + + responses := coordinator.GetResponses() + if len(responses) != 1 { + t.Errorf("Expected 1 response, got %d", len(responses)) + } + + if responses[0].NodeID != "node2" { + t.Errorf("Expected node2, got %s", responses[0].NodeID) + } +} + +func TestCreateCoordinator_SelectNodes(t *testing.T) { + logger := zap.NewNop() + coordinator := NewCreateCoordinator("testdb", 3, "node1", logger) + + // Add more responses than needed + for i := 1; i <= 5; i++ { + response := DatabaseCreateResponse{ + DatabaseName: "testdb", + NodeID: string(rune('A' + i)), + AvailablePorts: PortPair{ + HTTPPort: 5000 + i, + RaftPort: 7000 + i, + }, + } + coordinator.AddResponse(response) + } + + selected := coordinator.SelectNodes() + + // Should select exactly 3 nodes + if len(selected) != 3 { + t.Errorf("Expected 3 selected nodes, got %d", len(selected)) + } + + // Verify deterministic selection (should be first 3 added) + expectedNodes := []string{"B", "C", "D"} + for i, node := range selected { + if node.NodeID != expectedNodes[i] { + t.Errorf("Expected node %s at position %d, got %s", expectedNodes[i], i, node.NodeID) + } + } +} + +func TestCreateCoordinator_SelectNodes_InsufficientResponses(t *testing.T) { + logger := zap.NewNop() + coordinator := NewCreateCoordinator("testdb", 3, "node1", logger) + + // Add only 2 responses + coordinator.AddResponse(DatabaseCreateResponse{ + DatabaseName: "testdb", + NodeID: "node2", + AvailablePorts: PortPair{HTTPPort: 5001, RaftPort: 7001}, + }) + coordinator.AddResponse(DatabaseCreateResponse{ + DatabaseName: "testdb", + NodeID: "node3", + AvailablePorts: PortPair{HTTPPort: 5002, RaftPort: 7002}, + }) + + selected := coordinator.SelectNodes() + + // Should return all available responses even if less than requested + if len(selected) != 2 { + t.Errorf("Expected 2 selected nodes, got %d", len(selected)) + } +} + +func TestCreateCoordinator_WaitForResponses_Success(t *testing.T) { + logger := zap.NewNop() + coordinator := NewCreateCoordinator("testdb", 2, "node1", logger) + + // Add responses in goroutine + go func() { + time.Sleep(100 * time.Millisecond) + coordinator.AddResponse(DatabaseCreateResponse{ + DatabaseName: "testdb", + NodeID: "node2", + AvailablePorts: PortPair{HTTPPort: 5001, RaftPort: 7001}, + }) + coordinator.AddResponse(DatabaseCreateResponse{ + DatabaseName: "testdb", + NodeID: "node3", + AvailablePorts: PortPair{HTTPPort: 5002, RaftPort: 7002}, + }) + }() + + ctx := context.Background() + err := coordinator.WaitForResponses(ctx, 2*time.Second) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + responses := coordinator.GetResponses() + if len(responses) != 2 { + t.Errorf("Expected 2 responses after wait, got %d", len(responses)) + } +} + +func TestCreateCoordinator_WaitForResponses_Timeout(t *testing.T) { + logger := zap.NewNop() + coordinator := NewCreateCoordinator("testdb", 3, "node1", logger) + + // Add only 1 response + coordinator.AddResponse(DatabaseCreateResponse{ + DatabaseName: "testdb", + NodeID: "node2", + AvailablePorts: PortPair{HTTPPort: 5001, RaftPort: 7001}, + }) + + ctx := context.Background() + err := coordinator.WaitForResponses(ctx, 500*time.Millisecond) + + // Should timeout since we need 3 but only have 1 + if err == nil { + t.Error("Expected timeout error, got nil") + } +} + +func TestCreateCoordinator_WaitForResponses_ContextCanceled(t *testing.T) { + logger := zap.NewNop() + coordinator := NewCreateCoordinator("testdb", 3, "node1", logger) + + ctx, cancel := context.WithCancel(context.Background()) + + // Cancel context immediately + cancel() + + err := coordinator.WaitForResponses(ctx, 5*time.Second) + if err == nil { + t.Error("Expected context canceled error, got nil") + } +} + +func TestCoordinatorRegistry_Register(t *testing.T) { + registry := NewCoordinatorRegistry() + logger := zap.NewNop() + + coordinator := NewCreateCoordinator("testdb", 3, "node1", logger) + registry.Register(coordinator) + + retrieved := registry.Get("testdb") + if retrieved == nil { + t.Fatal("Expected to retrieve coordinator, got nil") + } + + if retrieved.dbName != "testdb" { + t.Errorf("Expected database name testdb, got %s", retrieved.dbName) + } +} + +func TestCoordinatorRegistry_Remove(t *testing.T) { + registry := NewCoordinatorRegistry() + logger := zap.NewNop() + + coordinator := NewCreateCoordinator("testdb", 3, "node1", logger) + registry.Register(coordinator) + + // Verify it's there + if registry.Get("testdb") == nil { + t.Fatal("Expected coordinator to be registered") + } + + // Remove it + registry.Remove("testdb") + + // Verify it's gone + if registry.Get("testdb") != nil { + t.Error("Expected coordinator to be removed") + } +} + +func TestCoordinatorRegistry_GetNonexistent(t *testing.T) { + registry := NewCoordinatorRegistry() + + retrieved := registry.Get("nonexistent") + if retrieved != nil { + t.Error("Expected nil for nonexistent coordinator") + } +} + +func TestCoordinatorRegistry_MultipleCoordinators(t *testing.T) { + registry := NewCoordinatorRegistry() + logger := zap.NewNop() + + coord1 := NewCreateCoordinator("db1", 3, "node1", logger) + coord2 := NewCreateCoordinator("db2", 3, "node1", logger) + coord3 := NewCreateCoordinator("db3", 3, "node1", logger) + + registry.Register(coord1) + registry.Register(coord2) + registry.Register(coord3) + + // Verify all registered + if registry.Get("db1") == nil { + t.Error("Expected db1 coordinator") + } + if registry.Get("db2") == nil { + t.Error("Expected db2 coordinator") + } + if registry.Get("db3") == nil { + t.Error("Expected db3 coordinator") + } + + // Remove one + registry.Remove("db2") + + // Verify others still there + if registry.Get("db1") == nil { + t.Error("Expected db1 coordinator to still exist") + } + if registry.Get("db2") != nil { + t.Error("Expected db2 coordinator to be removed") + } + if registry.Get("db3") == nil { + t.Error("Expected db3 coordinator to still exist") + } +} diff --git a/pkg/rqlite/instance_test.go b/pkg/rqlite/instance_test.go new file mode 100644 index 0000000..029df16 --- /dev/null +++ b/pkg/rqlite/instance_test.go @@ -0,0 +1,156 @@ +package rqlite + +import ( + "testing" + "time" + + "go.uber.org/zap" +) + +func TestRQLiteInstance_Create(t *testing.T) { + logger := zap.NewNop() + ports := PortPair{HTTPPort: 5001, RaftPort: 7001} + + instance := NewRQLiteInstance( + "testdb", + ports, + "/tmp/test_data", + "127.0.0.1", + "127.0.0.1", + logger, + ) + + if instance.DatabaseName != "testdb" { + t.Errorf("Expected database name 'testdb', got '%s'", instance.DatabaseName) + } + + if instance.Ports.HTTPPort != 5001 { + t.Errorf("Expected HTTP port 5001, got %d", instance.Ports.HTTPPort) + } + + if instance.Ports.RaftPort != 7001 { + t.Errorf("Expected Raft port 7001, got %d", instance.Ports.RaftPort) + } + + if instance.Status != StatusInitializing { + t.Errorf("Expected status initializing, got %s", instance.Status) + } + + expectedDataDir := "/tmp/test_data/testdb/rqlite" + if instance.DataDir != expectedDataDir { + t.Errorf("Expected data dir '%s', got '%s'", expectedDataDir, instance.DataDir) + } +} + +func TestRQLiteInstance_IsIdle(t *testing.T) { + logger := zap.NewNop() + ports := PortPair{HTTPPort: 5001, RaftPort: 7001} + + instance := NewRQLiteInstance( + "testdb", + ports, + "/tmp/test_data", + "127.0.0.1", + "127.0.0.1", + logger, + ) + + // Set LastQuery to old timestamp + instance.LastQuery = time.Now().Add(-2 * time.Minute) + + // Check with 1 minute timeout - should be idle + if !instance.IsIdle(1 * time.Minute) { + t.Error("Expected instance to be idle after 2 minutes with 1 minute timeout") + } + + // Check with 3 minute timeout - should NOT be idle + if instance.IsIdle(3 * time.Minute) { + t.Error("Expected instance to NOT be idle with 3 minute timeout") + } + + // Update LastQuery to now + instance.LastQuery = time.Now() + + // Should not be idle anymore + if instance.IsIdle(1 * time.Minute) { + t.Error("Expected instance to NOT be idle after updating LastQuery") + } + + // Zero timeout should always return false (hibernation disabled) + instance.LastQuery = time.Now().Add(-10 * time.Hour) + if instance.IsIdle(0) { + t.Error("Expected IsIdle to return false when timeout is 0 (disabled)") + } +} + +func TestRQLiteInstance_GetConnection(t *testing.T) { + logger := zap.NewNop() + ports := PortPair{HTTPPort: 5001, RaftPort: 7001} + + instance := NewRQLiteInstance( + "testdb", + ports, + "/tmp/test_data", + "127.0.0.1", + "127.0.0.1", + logger, + ) + + // Set LastQuery to old time + oldTime := time.Now().Add(-1 * time.Hour) + instance.LastQuery = oldTime + + // GetConnection should update LastQuery + _ = instance.GetConnection() + + if instance.LastQuery.Before(oldTime.Add(59 * time.Minute)) { + t.Error("Expected GetConnection to update LastQuery timestamp") + } +} + +// Note: Start/Stop tests require rqlite binary and are more suitable for integration tests +// They would look like this: +// +// func TestRQLiteInstance_StartStop(t *testing.T) { +// if testing.Short() { +// t.Skip("Skipping integration test") +// } +// +// logger := zap.NewNop() +// ports := PortPair{HTTPPort: 15001, RaftPort: 17001} +// +// instance := NewRQLiteInstance( +// "testdb", +// ports, +// "/tmp/test_rqlite_start_stop", +// "127.0.0.1", +// "127.0.0.1", +// logger, +// ) +// +// ctx := context.Background() +// err := instance.Start(ctx, true, "") +// if err != nil { +// t.Fatalf("Failed to start instance: %v", err) +// } +// +// // Verify HTTP endpoint is responsive +// resp, err := http.Get(fmt.Sprintf("http://localhost:%d/status", ports.HTTPPort)) +// if err != nil { +// t.Fatalf("HTTP endpoint not responsive: %v", err) +// } +// resp.Body.Close() +// +// // Stop instance +// err = instance.Stop() +// if err != nil { +// t.Fatalf("Failed to stop instance: %v", err) +// } +// +// // Verify process terminated +// time.Sleep(1 * time.Second) +// _, err = http.Get(fmt.Sprintf("http://localhost:%d/status", ports.HTTPPort)) +// if err == nil { +// t.Error("Expected HTTP endpoint to be unreachable after stop") +// } +// } diff --git a/pkg/rqlite/metadata.go b/pkg/rqlite/metadata.go index ae03fe8..ce33b44 100644 --- a/pkg/rqlite/metadata.go +++ b/pkg/rqlite/metadata.go @@ -3,6 +3,8 @@ package rqlite import ( "sync" "time" + + "go.uber.org/zap" ) // DatabaseStatus represents the state of a database cluster @@ -56,6 +58,7 @@ type MetadataStore struct { databases map[string]*DatabaseMetadata // key = database name nodes map[string]*NodeCapacity // key = node ID mu sync.RWMutex + logger *zap.Logger } // NewMetadataStore creates a new metadata store @@ -63,9 +66,17 @@ func NewMetadataStore() *MetadataStore { return &MetadataStore{ databases: make(map[string]*DatabaseMetadata), nodes: make(map[string]*NodeCapacity), + logger: zap.NewNop(), // Default no-op logger } } +// SetLogger sets the logger for the metadata store +func (ms *MetadataStore) SetLogger(logger *zap.Logger) { + ms.mu.Lock() + defer ms.mu.Unlock() + ms.logger = logger +} + // GetDatabase retrieves metadata for a database func (ms *MetadataStore) GetDatabase(name string) *DatabaseMetadata { ms.mu.RLock() diff --git a/pkg/rqlite/metadata_test.go b/pkg/rqlite/metadata_test.go new file mode 100644 index 0000000..806d7be --- /dev/null +++ b/pkg/rqlite/metadata_test.go @@ -0,0 +1,239 @@ +package rqlite + +import ( + "sync" + "testing" + "time" +) + +func TestMetadataStore_GetSetDatabase(t *testing.T) { + store := NewMetadataStore() + + dbMeta := &DatabaseMetadata{ + DatabaseName: "testdb", + NodeIDs: []string{"node1", "node2", "node3"}, + PortMappings: map[string]PortPair{ + "node1": {HTTPPort: 5001, RaftPort: 7001}, + "node2": {HTTPPort: 5002, RaftPort: 7002}, + "node3": {HTTPPort: 5003, RaftPort: 7003}, + }, + Status: StatusActive, + CreatedAt: time.Now(), + LastAccessed: time.Now(), + VectorClock: NewVectorClock(), + } + + store.UpsertDatabase(dbMeta) + + retrieved := store.GetDatabase("testdb") + if retrieved == nil { + t.Fatal("Expected to retrieve database, got nil") + } + + if retrieved.DatabaseName != "testdb" { + t.Errorf("Expected database name testdb, got %s", retrieved.DatabaseName) + } + + if len(retrieved.NodeIDs) != 3 { + t.Errorf("Expected 3 nodes, got %d", len(retrieved.NodeIDs)) + } + + if retrieved.Status != StatusActive { + t.Errorf("Expected status active, got %s", retrieved.Status) + } +} + +func TestMetadataStore_DeleteDatabase(t *testing.T) { + store := NewMetadataStore() + + dbMeta := &DatabaseMetadata{ + DatabaseName: "testdb", + NodeIDs: []string{"node1"}, + PortMappings: make(map[string]PortPair), + Status: StatusActive, + VectorClock: NewVectorClock(), + } + + store.UpsertDatabase(dbMeta) + + // Verify it's there + if store.GetDatabase("testdb") == nil { + t.Fatal("Expected database to exist") + } + + // Delete it + store.DeleteDatabase("testdb") + + // Verify it's gone + if store.GetDatabase("testdb") != nil { + t.Error("Expected database to be deleted") + } +} + +func TestMetadataStore_ListDatabases(t *testing.T) { + store := NewMetadataStore() + + // Add multiple databases + for i := 1; i <= 5; i++ { + dbMeta := &DatabaseMetadata{ + DatabaseName: string(rune('a' + i)), + NodeIDs: []string{"node1"}, + PortMappings: make(map[string]PortPair), + Status: StatusActive, + VectorClock: NewVectorClock(), + } + store.UpsertDatabase(dbMeta) + } + + databases := store.GetAllDatabases() + if len(databases) != 5 { + t.Errorf("Expected 5 databases, got %d", len(databases)) + } +} + +func TestMetadataStore_ConcurrentAccess(t *testing.T) { + store := NewMetadataStore() + var wg sync.WaitGroup + + // Spawn multiple goroutines for concurrent writes + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + dbMeta := &DatabaseMetadata{ + DatabaseName: string(rune('a' + id)), + NodeIDs: []string{"node1"}, + PortMappings: make(map[string]PortPair), + Status: StatusActive, + VectorClock: NewVectorClock(), + } + store.UpsertDatabase(dbMeta) + }(i) + } + + // Spawn multiple goroutines for concurrent reads + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + _ = store.GetDatabase(string(rune('a' + id))) + _ = store.GetAllDatabases() + }(i) + } + + wg.Wait() + + // Verify all databases were added + databases := store.GetAllDatabases() + if len(databases) != 10 { + t.Errorf("Expected 10 databases after concurrent writes, got %d", len(databases)) + } +} + +func TestMetadataStore_NodeCapacity(t *testing.T) { + store := NewMetadataStore() + + nodeCapacity := &NodeCapacity{ + NodeID: "node1", + MaxDatabases: 100, + CurrentDatabases: 5, + AvailablePortRange: PortRange{ + HTTPStart: 5001, + HTTPEnd: 5999, + RaftStart: 7001, + RaftEnd: 7999, + }, + LastHealthCheck: time.Now(), + IsHealthy: true, + } + + store.UpsertNodeCapacity(nodeCapacity) + + retrieved := store.GetNode("node1") + if retrieved == nil { + t.Fatal("Expected to retrieve node capacity, got nil") + } + + if retrieved.MaxDatabases != 100 { + t.Errorf("Expected max databases 100, got %d", retrieved.MaxDatabases) + } + + if retrieved.CurrentDatabases != 5 { + t.Errorf("Expected current databases 5, got %d", retrieved.CurrentDatabases) + } + + if !retrieved.IsHealthy { + t.Error("Expected node to be healthy") + } +} + +func TestMetadataStore_UpdateNodeCapacity(t *testing.T) { + store := NewMetadataStore() + + nodeCapacity := &NodeCapacity{ + NodeID: "node1", + MaxDatabases: 100, + CurrentDatabases: 5, + IsHealthy: true, + } + + store.UpsertNodeCapacity(nodeCapacity) + + // Update capacity + nodeCapacity.CurrentDatabases = 10 + nodeCapacity.IsHealthy = false + store.UpsertNodeCapacity(nodeCapacity) + + retrieved := store.GetNode("node1") + if retrieved.CurrentDatabases != 10 { + t.Errorf("Expected current databases 10, got %d", retrieved.CurrentDatabases) + } + + if retrieved.IsHealthy { + t.Error("Expected node to be unhealthy after update") + } +} + +func TestMetadataStore_ListNodes(t *testing.T) { + store := NewMetadataStore() + + // Add multiple nodes + for i := 1; i <= 3; i++ { + nodeCapacity := &NodeCapacity{ + NodeID: string(rune('A' + i)), + MaxDatabases: 100, + CurrentDatabases: i * 5, + IsHealthy: true, + } + store.UpsertNodeCapacity(nodeCapacity) + } + + nodes := store.GetAllNodeCapacities() + if len(nodes) != 3 { + t.Errorf("Expected 3 nodes, got %d", len(nodes)) + } +} + +func TestMetadataStore_UpdateDatabase(t *testing.T) { + store := NewMetadataStore() + + dbMeta := &DatabaseMetadata{ + DatabaseName: "testdb", + NodeIDs: []string{"node1"}, + PortMappings: make(map[string]PortPair), + Status: StatusInitializing, + VectorClock: NewVectorClock(), + } + + store.UpsertDatabase(dbMeta) + + // Update status + dbMeta.Status = StatusActive + dbMeta.LastAccessed = time.Now() + store.UpsertDatabase(dbMeta) + + retrieved := store.GetDatabase("testdb") + if retrieved.Status != StatusActive { + t.Errorf("Expected status active, got %s", retrieved.Status) + } +} diff --git a/pkg/rqlite/ports_test.go b/pkg/rqlite/ports_test.go new file mode 100644 index 0000000..2ef1da6 --- /dev/null +++ b/pkg/rqlite/ports_test.go @@ -0,0 +1,194 @@ +package rqlite + +import ( + "testing" +) + +func TestPortManager_AllocatePortPair(t *testing.T) { + pm := NewPortManager( + PortRange{Start: 5001, End: 5010}, + PortRange{Start: 7001, End: 7010}, + ) + + ports, err := pm.AllocatePortPair("testdb") + if err != nil { + t.Fatalf("Failed to allocate port pair: %v", err) + } + + // Verify HTTP port in range + if ports.HTTPPort < 5001 || ports.HTTPPort > 5010 { + t.Errorf("HTTP port %d out of range [5001-5010]", ports.HTTPPort) + } + + // Verify Raft port in range + if ports.RaftPort < 7001 || ports.RaftPort > 7010 { + t.Errorf("Raft port %d out of range [7001-7010]", ports.RaftPort) + } + + // Verify ports are different + if ports.HTTPPort == ports.RaftPort { + t.Error("HTTP and Raft ports should be different") + } +} + +func TestPortManager_ReleasePortPair(t *testing.T) { + pm := NewPortManager( + PortRange{Start: 5001, End: 5010}, + PortRange{Start: 7001, End: 7010}, + ) + + // Allocate + ports, err := pm.AllocatePortPair("testdb") + if err != nil { + t.Fatalf("Failed to allocate port pair: %v", err) + } + + // Release + pm.ReleasePortPair(ports) + + // Should be able to allocate again + ports2, err := pm.AllocatePortPair("testdb2") + if err != nil { + t.Fatalf("Failed to allocate after release: %v", err) + } + + // Might get same ports back (that's OK) + if ports2.HTTPPort == 0 || ports2.RaftPort == 0 { + t.Error("Got zero ports after release and reallocation") + } +} + +func TestPortManager_IsPortAllocated(t *testing.T) { + pm := NewPortManager( + PortRange{Start: 5001, End: 5010}, + PortRange{Start: 7001, End: 7010}, + ) + + // Initially not allocated + if pm.IsPortAllocated(5001) { + t.Error("Port should not be allocated initially") + } + + // Allocate + ports, err := pm.AllocatePortPair("testdb") + if err != nil { + t.Fatalf("Failed to allocate: %v", err) + } + + // Should be allocated now + if !pm.IsPortAllocated(ports.HTTPPort) { + t.Error("HTTP port should be allocated") + } + if !pm.IsPortAllocated(ports.RaftPort) { + t.Error("Raft port should be allocated") + } + + // Release + pm.ReleasePortPair(ports) + + // Should not be allocated anymore + if pm.IsPortAllocated(ports.HTTPPort) { + t.Error("HTTP port should not be allocated after release") + } + if pm.IsPortAllocated(ports.RaftPort) { + t.Error("Raft port should not be allocated after release") + } +} + +func TestPortManager_AllocateSpecificPorts(t *testing.T) { + pm := NewPortManager( + PortRange{Start: 5001, End: 5010}, + PortRange{Start: 7001, End: 7010}, + ) + + specificPorts := PortPair{HTTPPort: 5005, RaftPort: 7005} + + // Allocate specific ports + err := pm.AllocateSpecificPorts("testdb", specificPorts) + if err != nil { + t.Fatalf("Failed to allocate specific ports: %v", err) + } + + // Verify allocated + if !pm.IsPortAllocated(5005) { + t.Error("Port 5005 should be allocated") + } + if !pm.IsPortAllocated(7005) { + t.Error("Port 7005 should be allocated") + } + + // Try to allocate same ports again - should fail + err = pm.AllocateSpecificPorts("testdb2", specificPorts) + if err == nil { + t.Error("Expected error when allocating already-allocated ports") + } +} + +func TestPortManager_Exhaustion(t *testing.T) { + // Very small range + pm := NewPortManager( + PortRange{Start: 5001, End: 5002}, // Only 2 ports + PortRange{Start: 7001, End: 7002}, // Only 2 ports + ) + + // Allocate first pair (uses 2 ports) + _, err := pm.AllocatePortPair("db1") + if err != nil { + t.Fatalf("First allocation should succeed: %v", err) + } + + // Try to allocate second pair - might fail due to limited ports + // Note: This test is probabilistic due to random selection + // In a real scenario with only 2 ports per range, we can only fit 1 database + _, err = pm.AllocatePortPair("db2") + // We expect this to eventually fail after retries + // The actual behavior depends on random selection + + // For a more deterministic test, allocate specific ports + pm2 := NewPortManager( + PortRange{Start: 5001, End: 5002}, + PortRange{Start: 7001, End: 7002}, + ) + + _ = pm2.AllocateSpecificPorts("db1", PortPair{HTTPPort: 5001, RaftPort: 7001}) + _ = pm2.AllocateSpecificPorts("db2", PortPair{HTTPPort: 5002, RaftPort: 7002}) + + // Now exhausted + err = pm2.AllocateSpecificPorts("db3", PortPair{HTTPPort: 5001, RaftPort: 7001}) + if err == nil { + t.Error("Expected error when ports exhausted") + } +} + +func TestPortManager_MultipleDatabases(t *testing.T) { + pm := NewPortManager( + PortRange{Start: 5001, End: 5020}, + PortRange{Start: 7001, End: 7020}, + ) + + databases := []string{"db1", "db2", "db3", "db4", "db5"} + allocatedPorts := make(map[int]bool) + + for _, db := range databases { + ports, err := pm.AllocatePortPair(db) + if err != nil { + t.Fatalf("Failed to allocate for %s: %v", db, err) + } + + // Verify no port conflicts + if allocatedPorts[ports.HTTPPort] { + t.Errorf("HTTP port %d already allocated", ports.HTTPPort) + } + if allocatedPorts[ports.RaftPort] { + t.Errorf("Raft port %d already allocated", ports.RaftPort) + } + + allocatedPorts[ports.HTTPPort] = true + allocatedPorts[ports.RaftPort] = true + } + + // Should have allocated 10 unique ports (5 HTTP + 5 Raft) + if len(allocatedPorts) != 10 { + t.Errorf("Expected 10 unique ports, got %d", len(allocatedPorts)) + } +} diff --git a/pkg/rqlite/pubsub_messages_test.go b/pkg/rqlite/pubsub_messages_test.go new file mode 100644 index 0000000..a79cdc5 --- /dev/null +++ b/pkg/rqlite/pubsub_messages_test.go @@ -0,0 +1,246 @@ +package rqlite + +import ( + "encoding/json" + "testing" + "time" +) + +func TestMarshalUnmarshalMetadataMessage_DatabaseCreateRequest(t *testing.T) { + payload := DatabaseCreateRequest{ + DatabaseName: "testdb", + RequesterNodeID: "node123", + ReplicationFactor: 3, + } + + data, err := MarshalMetadataMessage(MsgDatabaseCreateRequest, "node123", payload) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + + var decoded DatabaseCreateRequest + msg, err := UnmarshalMetadataMessage(data, &decoded) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + + if msg.Type != MsgDatabaseCreateRequest { + t.Errorf("Expected type %s, got %s", MsgDatabaseCreateRequest, msg.Type) + } + + if decoded.DatabaseName != payload.DatabaseName { + t.Errorf("Expected database name %s, got %s", payload.DatabaseName, decoded.DatabaseName) + } + + if decoded.ReplicationFactor != payload.ReplicationFactor { + t.Errorf("Expected replication factor %d, got %d", payload.ReplicationFactor, decoded.ReplicationFactor) + } +} + +func TestMarshalUnmarshalMetadataMessage_DatabaseCreateResponse(t *testing.T) { + payload := DatabaseCreateResponse{ + DatabaseName: "testdb", + NodeID: "node456", + AvailablePorts: PortPair{ + HTTPPort: 5001, + RaftPort: 7001, + }, + } + + data, err := MarshalMetadataMessage(MsgDatabaseCreateResponse, "node456", payload) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + + var decoded DatabaseCreateResponse + msg, err := UnmarshalMetadataMessage(data, &decoded) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + + if msg.Type != MsgDatabaseCreateResponse { + t.Errorf("Expected type %s, got %s", MsgDatabaseCreateResponse, msg.Type) + } + + if decoded.AvailablePorts.HTTPPort != 5001 { + t.Errorf("Expected HTTP port 5001, got %d", decoded.AvailablePorts.HTTPPort) + } +} + +func TestMarshalUnmarshalMetadataMessage_DatabaseCreateConfirm(t *testing.T) { + payload := DatabaseCreateConfirm{ + DatabaseName: "testdb", + SelectedNodes: []NodeAssignment{ + {NodeID: "node1", HTTPPort: 5001, RaftPort: 7001, Role: "leader"}, + {NodeID: "node2", HTTPPort: 5002, RaftPort: 7002, Role: "follower"}, + {NodeID: "node3", HTTPPort: 5003, RaftPort: 7003, Role: "follower"}, + }, + CoordinatorNodeID: "node1", + } + + data, err := MarshalMetadataMessage(MsgDatabaseCreateConfirm, "node1", payload) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + + var decoded DatabaseCreateConfirm + msg, err := UnmarshalMetadataMessage(data, &decoded) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + + if len(decoded.SelectedNodes) != 3 { + t.Errorf("Expected 3 nodes, got %d", len(decoded.SelectedNodes)) + } + + if decoded.SelectedNodes[0].Role != "leader" { + t.Errorf("Expected first node to be leader, got %s", decoded.SelectedNodes[0].Role) + } +} + +func TestMarshalUnmarshalMetadataMessage_DatabaseStatusUpdate(t *testing.T) { + payload := DatabaseStatusUpdate{ + DatabaseName: "testdb", + NodeID: "node123", + Status: StatusActive, + HTTPPort: 5001, + } + + data, err := MarshalMetadataMessage(MsgDatabaseStatusUpdate, "node123", payload) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + + var decoded DatabaseStatusUpdate + msg, err := UnmarshalMetadataMessage(data, &decoded) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + + if decoded.Status != StatusActive { + t.Errorf("Expected status active, got %s", decoded.Status) + } +} + +func TestMarshalUnmarshalMetadataMessage_NodeCapacityAnnouncement(t *testing.T) { + payload := NodeCapacityAnnouncement{ + NodeID: "node123", + MaxDatabases: 100, + CurrentDatabases: 5, + PortRangeHTTP: PortRange{Start: 5001, End: 5999}, + PortRangeRaft: PortRange{Start: 7001, End: 7999}, + } + + data, err := MarshalMetadataMessage(MsgNodeCapacityAnnouncement, "node123", payload) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + + var decoded NodeCapacityAnnouncement + msg, err := UnmarshalMetadataMessage(data, &decoded) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + + if decoded.MaxDatabases != 100 { + t.Errorf("Expected max databases 100, got %d", decoded.MaxDatabases) + } + + if decoded.CurrentDatabases != 5 { + t.Errorf("Expected current databases 5, got %d", decoded.CurrentDatabases) + } +} + +func TestMarshalUnmarshalMetadataMessage_DatabaseIdleNotification(t *testing.T) { + now := time.Now() + payload := DatabaseIdleNotification{ + DatabaseName: "testdb", + NodeID: "node123", + LastActivity: now, + } + + data, err := MarshalMetadataMessage(MsgDatabaseIdleNotification, "node123", payload) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + + var decoded DatabaseIdleNotification + msg, err := UnmarshalMetadataMessage(data, &decoded) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + + // Time comparison with some tolerance + if decoded.LastActivity.Unix() != now.Unix() { + t.Errorf("Expected last activity %v, got %v", now, decoded.LastActivity) + } +} + +func TestMarshalUnmarshalMetadataMessage_NodeReplacementNeeded(t *testing.T) { + payload := NodeReplacementNeeded{ + DatabaseName: "testdb", + FailedNodeID: "node3", + CurrentNodes: []string{"node1", "node2"}, + ReplicationFactor: 3, + } + + data, err := MarshalMetadataMessage(MsgNodeReplacementNeeded, "node1", payload) + if err != nil { + t.Fatalf("Failed to marshal: %v", err) + } + + var decoded NodeReplacementNeeded + msg, err := UnmarshalMetadataMessage(data, &decoded) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + + if decoded.FailedNodeID != "node3" { + t.Errorf("Expected failed node node3, got %s", decoded.FailedNodeID) + } + + if len(decoded.CurrentNodes) != 2 { + t.Errorf("Expected 2 current nodes, got %d", len(decoded.CurrentNodes)) + } +} + +func TestMetadataMessage_UnmarshalPayload(t *testing.T) { + payload := DatabaseCreateRequest{ + DatabaseName: "testdb", + RequesterNodeID: "node123", + ReplicationFactor: 3, + } + + payloadBytes, _ := json.Marshal(payload) + msg := &MetadataMessage{ + Type: MsgDatabaseCreateRequest, + Timestamp: time.Now(), + NodeID: "node123", + Payload: payloadBytes, + } + + var decoded DatabaseCreateRequest + err := msg.UnmarshalPayload(&decoded) + if err != nil { + t.Fatalf("Failed to unmarshal payload: %v", err) + } + + if decoded.DatabaseName != "testdb" { + t.Errorf("Expected database name testdb, got %s", decoded.DatabaseName) + } +} + +func TestMetadataMessage_EmptyPayload(t *testing.T) { + msg := &MetadataMessage{ + Type: MsgNodeHealthPing, + Timestamp: time.Now(), + NodeID: "node123", + Payload: nil, + } + + var decoded struct{} + err := msg.UnmarshalPayload(&decoded) + if err != nil { + t.Fatalf("Expected no error for empty payload, got %v", err) + } +} diff --git a/pkg/rqlite/vector_clock_test.go b/pkg/rqlite/vector_clock_test.go new file mode 100644 index 0000000..b4f2c2d --- /dev/null +++ b/pkg/rqlite/vector_clock_test.go @@ -0,0 +1,134 @@ +package rqlite + +import ( + "testing" +) + +func TestVectorClock_Increment(t *testing.T) { + vc := NewVectorClock() + + // Initial state + if vc["nodeA"] != 0 { + t.Errorf("Expected initial value 0, got %d", vc["nodeA"]) + } + + // First increment + vc.Increment("nodeA") + if vc["nodeA"] != 1 { + t.Errorf("Expected value 1 after first increment, got %d", vc["nodeA"]) + } + + // Second increment + vc.Increment("nodeA") + if vc["nodeA"] != 2 { + t.Errorf("Expected value 2 after second increment, got %d", vc["nodeA"]) + } + + // Different node + vc.Increment("nodeB") + if vc["nodeB"] != 1 { + t.Errorf("Expected nodeB value 1, got %d", vc["nodeB"]) + } + if vc["nodeA"] != 2 { + t.Errorf("Expected nodeA value still 2, got %d", vc["nodeA"]) + } +} + +func TestVectorClock_Merge(t *testing.T) { + vc1 := NewVectorClock() + vc1["nodeA"] = 5 + vc1["nodeB"] = 3 + + vc2 := NewVectorClock() + vc2["nodeB"] = 7 + vc2["nodeC"] = 2 + + vc1.Merge(vc2) + + // Should take max values + if vc1["nodeA"] != 5 { + t.Errorf("Expected nodeA=5, got %d", vc1["nodeA"]) + } + if vc1["nodeB"] != 7 { + t.Errorf("Expected nodeB=7 (max of 3 and 7), got %d", vc1["nodeB"]) + } + if vc1["nodeC"] != 2 { + t.Errorf("Expected nodeC=2, got %d", vc1["nodeC"]) + } +} + +func TestVectorClock_Compare_StrictlyLess(t *testing.T) { + vc1 := NewVectorClock() + vc1["nodeA"] = 1 + vc1["nodeB"] = 2 + + vc2 := NewVectorClock() + vc2["nodeA"] = 2 + vc2["nodeB"] = 3 + + result := vc1.Compare(vc2) + if result != -1 { + t.Errorf("Expected -1 (strictly less), got %d", result) + } +} + +func TestVectorClock_Compare_StrictlyGreater(t *testing.T) { + vc1 := NewVectorClock() + vc1["nodeA"] = 5 + vc1["nodeB"] = 4 + + vc2 := NewVectorClock() + vc2["nodeA"] = 3 + vc2["nodeB"] = 2 + + result := vc1.Compare(vc2) + if result != 1 { + t.Errorf("Expected 1 (strictly greater), got %d", result) + } +} + +func TestVectorClock_Compare_Concurrent(t *testing.T) { + vc1 := NewVectorClock() + vc1["nodeA"] = 5 + vc1["nodeB"] = 2 + + vc2 := NewVectorClock() + vc2["nodeA"] = 3 + vc2["nodeB"] = 4 + + result := vc1.Compare(vc2) + if result != 0 { + t.Errorf("Expected 0 (concurrent), got %d", result) + } +} + +func TestVectorClock_Compare_Identical(t *testing.T) { + vc1 := NewVectorClock() + vc1["nodeA"] = 5 + vc1["nodeB"] = 3 + + vc2 := NewVectorClock() + vc2["nodeA"] = 5 + vc2["nodeB"] = 3 + + result := vc1.Compare(vc2) + if result != 0 { + t.Errorf("Expected 0 (identical), got %d", result) + } +} + +func TestVectorClock_String(t *testing.T) { + vc := NewVectorClock() + vc["nodeA"] = 5 + vc["nodeB"] = 3 + + str := vc.String() + // Should contain both nodes + if str == "" { + t.Error("Expected non-empty string representation") + } + // Basic format check + if str[0] != '{' || str[len(str)-1] != '}' { + t.Errorf("Expected string wrapped in braces, got %s", str) + } +}