From 4d05ae696b81d4a4cf098335a2b77a212cd2559d Mon Sep 17 00:00:00 2001 From: anonpenguin23 Date: Thu, 16 Oct 2025 10:56:59 +0300 Subject: [PATCH] Add admin handlers for database creation and metadata management - Introduced `admin_handlers.go` to handle database creation requests via HTTP, including validation and response handling. - Implemented `db_metadata.go` to manage database metadata caching and synchronization with a pubsub subscriber. - Updated `gateway.go` to initialize the metadata cache and start the metadata subscriber in the background. - Added new route for database creation in `routes.go` to expose the new functionality. - Enhanced cluster management to support system database auto-joining and improved metadata handling for database operations. --- pkg/gateway/admin_handlers.go | 136 +++++++++++++++++++++++++++++++ pkg/gateway/db_metadata.go | 125 ++++++++++++++++++++++++++++ pkg/gateway/gateway.go | 31 ++++--- pkg/gateway/routes.go | 4 +- pkg/rqlite/cluster_handlers.go | 14 ++-- pkg/rqlite/cluster_manager.go | 144 ++++++++++++++++++++++++++++++++- 6 files changed, 433 insertions(+), 21 deletions(-) create mode 100644 pkg/gateway/admin_handlers.go create mode 100644 pkg/gateway/db_metadata.go diff --git a/pkg/gateway/admin_handlers.go b/pkg/gateway/admin_handlers.go new file mode 100644 index 0000000..b4f16e5 --- /dev/null +++ b/pkg/gateway/admin_handlers.go @@ -0,0 +1,136 @@ +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/DeBrosOfficial/network/pkg/logging" + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// CreateDatabaseRequest is the request body for database creation +type CreateDatabaseRequest struct { + Database string `json:"database"` + ReplicationFactor int `json:"replication_factor,omitempty"` // defaults to 3 +} + +// CreateDatabaseResponse is the response for database creation +type CreateDatabaseResponse struct { + Status string `json:"status"` + Database string `json:"database"` + Message string `json:"message,omitempty"` + Error string `json:"error,omitempty"` +} + +// databaseCreateHandler handles database creation requests via pubsub +func (g *Gateway) databaseCreateHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var req CreateDatabaseRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + g.respondJSON(w, http.StatusBadRequest, CreateDatabaseResponse{ + Status: "error", + Error: "Invalid request body", + }) + return + } + + if req.Database == "" { + g.respondJSON(w, http.StatusBadRequest, CreateDatabaseResponse{ + Status: "error", + Error: "database field is required", + }) + return + } + + // Default replication factor + if req.ReplicationFactor == 0 { + req.ReplicationFactor = 3 + } + + // Check if database already exists in metadata + if existing := g.dbMetaCache.Get(req.Database); existing != nil { + g.respondJSON(w, http.StatusConflict, CreateDatabaseResponse{ + Status: "exists", + Database: req.Database, + Message: "Database already exists", + }) + return + } + + g.logger.ComponentInfo(logging.ComponentGeneral, "Creating database via gateway", + zap.String("database", req.Database), + zap.Int("replication_factor", req.ReplicationFactor)) + + // Publish DATABASE_CREATE_REQUEST via pubsub + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + // We need to get the node ID to act as requester + // For now, use a placeholder - the actual node will coordinate + createReq := rqlite.DatabaseCreateRequest{ + DatabaseName: req.Database, + RequesterNodeID: "gateway", // Gateway is requesting on behalf of client + ReplicationFactor: req.ReplicationFactor, + } + + msgData, err := rqlite.MarshalMetadataMessage(rqlite.MsgDatabaseCreateRequest, "gateway", createReq) + if err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "Failed to marshal create request", + zap.Error(err)) + g.respondJSON(w, http.StatusInternalServerError, CreateDatabaseResponse{ + Status: "error", + Error: fmt.Sprintf("Failed to create database: %v", err), + }) + return + } + + // Publish to metadata topic + metadataTopic := "/debros/metadata/v1" + if err := g.client.PubSub().Publish(ctx, metadataTopic, msgData); err != nil { + g.logger.ComponentError(logging.ComponentGeneral, "Failed to publish create request", + zap.Error(err)) + g.respondJSON(w, http.StatusInternalServerError, CreateDatabaseResponse{ + Status: "error", + Error: fmt.Sprintf("Failed to publish create request: %v", err), + }) + return + } + + // Wait briefly for metadata sync (3 seconds) + waitCtx, waitCancel := context.WithTimeout(ctx, 3*time.Second) + defer waitCancel() + + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-waitCtx.Done(): + // Timeout - database creation is async, return accepted status + g.respondJSON(w, http.StatusAccepted, CreateDatabaseResponse{ + Status: "accepted", + Database: req.Database, + Message: "Database creation initiated, it may take a few seconds to become available", + }) + return + case <-ticker.C: + // Check if metadata arrived + if metadata := g.dbMetaCache.Get(req.Database); metadata != nil { + g.respondJSON(w, http.StatusOK, CreateDatabaseResponse{ + Status: "created", + Database: req.Database, + Message: "Database created successfully", + }) + return + } + } + } +} diff --git a/pkg/gateway/db_metadata.go b/pkg/gateway/db_metadata.go new file mode 100644 index 0000000..ab43c83 --- /dev/null +++ b/pkg/gateway/db_metadata.go @@ -0,0 +1,125 @@ +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/DeBrosOfficial/network/pkg/logging" + "github.com/DeBrosOfficial/network/pkg/rqlite" + "go.uber.org/zap" +) + +// DatabaseMetadataCache manages per-database metadata for routing +type DatabaseMetadataCache struct { + cache map[string]*rqlite.DatabaseMetadata + mu sync.RWMutex + logger *logging.ColoredLogger +} + +// NewDatabaseMetadataCache creates a new metadata cache +func NewDatabaseMetadataCache(logger *logging.ColoredLogger) *DatabaseMetadataCache { + return &DatabaseMetadataCache{ + cache: make(map[string]*rqlite.DatabaseMetadata), + logger: logger, + } +} + +// Update updates metadata for a database (vector clock aware) +func (dmc *DatabaseMetadataCache) Update(metadata *rqlite.DatabaseMetadata) { + if metadata == nil { + return + } + + dmc.mu.Lock() + defer dmc.mu.Unlock() + + existing, exists := dmc.cache[metadata.DatabaseName] + if !exists || metadata.Version > existing.Version { + dmc.cache[metadata.DatabaseName] = metadata + dmc.logger.ComponentDebug(logging.ComponentGeneral, "Updated database metadata", + zap.String("database", metadata.DatabaseName), + zap.Uint64("version", metadata.Version)) + } +} + +// Get retrieves metadata for a database +func (dmc *DatabaseMetadataCache) Get(dbName string) *rqlite.DatabaseMetadata { + dmc.mu.RLock() + defer dmc.mu.RUnlock() + return dmc.cache[dbName] +} + +// ResolveEndpoints returns RQLite HTTP endpoints for a database (leader first) +func (dmc *DatabaseMetadataCache) ResolveEndpoints(dbName string) []string { + dmc.mu.RLock() + defer dmc.mu.RUnlock() + + metadata, exists := dmc.cache[dbName] + if !exists { + return nil + } + + endpoints := make([]string, 0, len(metadata.NodeIDs)) + + // Add leader first + if metadata.LeaderNodeID != "" { + if ports, ok := metadata.PortMappings[metadata.LeaderNodeID]; ok { + endpoint := fmt.Sprintf("http://127.0.0.1:%d", ports.HTTPPort) + endpoints = append(endpoints, endpoint) + } + } + + // Add followers + for _, nodeID := range metadata.NodeIDs { + if nodeID == metadata.LeaderNodeID { + continue // Already added + } + if ports, ok := metadata.PortMappings[nodeID]; ok { + endpoint := fmt.Sprintf("http://127.0.0.1:%d", ports.HTTPPort) + endpoints = append(endpoints, endpoint) + } + } + + return endpoints +} + +// StartMetadataSubscriber subscribes to the metadata topic and updates the cache +func (g *Gateway) StartMetadataSubscriber(ctx context.Context) error { + metadataTopic := "/debros/metadata/v1" + + g.logger.ComponentInfo(logging.ComponentGeneral, "Subscribing to metadata topic", + zap.String("topic", metadataTopic)) + + handler := func(topic string, data []byte) error { + // Parse metadata message + var msg rqlite.MetadataMessage + if err := json.Unmarshal(data, &msg); err != nil { + g.logger.ComponentDebug(logging.ComponentGeneral, "Failed to parse metadata message", + zap.Error(err)) + return nil // Don't fail on parse errors + } + + // Only process METADATA_SYNC messages + if msg.Type != rqlite.MsgMetadataSync { + return nil + } + + // Extract database metadata + var syncMsg rqlite.MetadataSync + if err := msg.UnmarshalPayload(&syncMsg); err != nil { + g.logger.ComponentDebug(logging.ComponentGeneral, "Failed to unmarshal metadata sync", + zap.Error(err)) + return nil + } + + if syncMsg.Metadata != nil { + g.dbMetaCache.Update(syncMsg.Metadata) + } + + return nil + } + + return g.client.PubSub().Subscribe(ctx, metadataTopic, handler) +} diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 20abfff..e3f8dd1 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -24,12 +24,13 @@ type Config struct { } type Gateway struct { - logger *logging.ColoredLogger - cfg *Config - client client.NetworkClient - startedAt time.Time - signingKey *rsa.PrivateKey - keyID string + logger *logging.ColoredLogger + cfg *Config + client client.NetworkClient + startedAt time.Time + signingKey *rsa.PrivateKey + keyID string + dbMetaCache *DatabaseMetadataCache } // deriveRQLiteEndpoints extracts IP addresses from bootstrap peer multiaddrs @@ -123,12 +124,22 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) { logger.ComponentInfo(logging.ComponentGeneral, "Creating gateway instance...") gw := &Gateway{ - logger: logger, - cfg: cfg, - client: c, - startedAt: time.Now(), + logger: logger, + cfg: cfg, + client: c, + startedAt: time.Now(), + dbMetaCache: NewDatabaseMetadataCache(logger), } + logger.ComponentInfo(logging.ComponentGeneral, "Starting metadata subscriber...") + // Start metadata subscriber in background + go func() { + ctx := context.Background() + if err := gw.StartMetadataSubscriber(ctx); err != nil { + logger.ComponentWarn(logging.ComponentGeneral, "failed to start metadata subscriber", zap.Error(err)) + } + }() + logger.ComponentInfo(logging.ComponentGeneral, "Generating RSA signing key...") // Generate local RSA signing key for JWKS/JWT (ephemeral for now) if key, err := rsa.GenerateKey(rand.Reader, 2048); err == nil { diff --git a/pkg/gateway/routes.go b/pkg/gateway/routes.go index 89f1b57..d4aadf1 100644 --- a/pkg/gateway/routes.go +++ b/pkg/gateway/routes.go @@ -45,7 +45,9 @@ func (g *Gateway) Routes() http.Handler { mux.HandleFunc("/v1/database/schema", g.databaseSchemaHandler) mux.HandleFunc("/v1/database/create-table", g.databaseCreateTableHandler) mux.HandleFunc("/v1/database/drop-table", g.databaseDropTableHandler) - mux.HandleFunc("/v1/database/list", g.databaseListHandler) + + // admin endpoints + mux.HandleFunc("/v1/admin/databases/create", g.databaseCreateHandler) return g.withMiddleware(mux) } diff --git a/pkg/rqlite/cluster_handlers.go b/pkg/rqlite/cluster_handlers.go index cf429b9..8ef2a32 100644 --- a/pkg/rqlite/cluster_handlers.go +++ b/pkg/rqlite/cluster_handlers.go @@ -25,7 +25,14 @@ func (cm *ClusterManager) handleCreateRequest(msg *MetadataMessage) error { currentCount := len(cm.activeClusters) cm.mu.RUnlock() - if currentCount >= cm.config.MaxDatabases { + // Get system DB name for capacity check + systemDBName := cm.config.SystemDatabaseName + if systemDBName == "" { + systemDBName = "_system" + } + + // Bypass capacity check for system database (it replicates to all nodes) + if req.DatabaseName != systemDBName && currentCount >= cm.config.MaxDatabases { cm.logger.Debug("Cannot host database: at capacity", zap.String("database", req.DatabaseName), zap.Int("current", currentCount), @@ -37,11 +44,6 @@ func (cm *ClusterManager) handleCreateRequest(msg *MetadataMessage) error { var ports PortPair var err error - systemDBName := cm.config.SystemDatabaseName - if systemDBName == "" { - systemDBName = "_system" - } - if req.DatabaseName == systemDBName && cm.config.SystemHTTPPort > 0 { // Try to use fixed ports for system database first ports = PortPair{ diff --git a/pkg/rqlite/cluster_manager.go b/pkg/rqlite/cluster_manager.go index aa92fea..2cfc203 100644 --- a/pkg/rqlite/cluster_manager.go +++ b/pkg/rqlite/cluster_manager.go @@ -276,8 +276,28 @@ func (cm *ClusterManager) CreateDatabase(dbName string, replicationFactor int) e // Select nodes responses := coordinator.GetResponses() - if len(responses) < replicationFactor { - return fmt.Errorf("insufficient nodes responded: got %d, need %d", len(responses), replicationFactor) + + // For system database, always select all responders (replicate to all nodes) + systemDBName := cm.config.SystemDatabaseName + if systemDBName == "" { + systemDBName = "_system" + } + + effectiveReplicationFactor := replicationFactor + if dbName == systemDBName { + effectiveReplicationFactor = len(responses) + cm.logger.Info("System database: selecting all responders", + zap.String("database", dbName), + zap.Int("responders", len(responses))) + } + + if len(responses) < effectiveReplicationFactor { + return fmt.Errorf("insufficient nodes responded: got %d, need %d", len(responses), effectiveReplicationFactor) + } + + // Update coordinator replication factor if needed for system DB + if dbName == systemDBName { + coordinator.replicationFactor = effectiveReplicationFactor } selectedResponses := coordinator.SelectNodes() @@ -852,6 +872,85 @@ func (cm *ClusterManager) getActiveMembers(metadata *DatabaseMetadata) []string return activeMembers } +// autoJoinSystemDatabase handles joining a new node to an existing system database cluster +func (cm *ClusterManager) autoJoinSystemDatabase(metadata *DatabaseMetadata) error { + systemDBName := cm.config.SystemDatabaseName + if systemDBName == "" { + systemDBName = "_system" + } + + // Find leader node + leaderNodeID := metadata.LeaderNodeID + if leaderNodeID == "" { + cm.logger.Warn("No leader found in system database metadata") + return fmt.Errorf("no leader for system database") + } + + // Get leader's Raft port + leaderPorts, exists := metadata.PortMappings[leaderNodeID] + if !exists { + cm.logger.Warn("Leader ports not found in metadata", + zap.String("leader", leaderNodeID)) + return fmt.Errorf("leader ports not available") + } + + joinAddr := fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), leaderPorts.RaftPort) + cm.logger.Info("Auto-joining system database as follower", + zap.String("database", systemDBName), + zap.String("leader", leaderNodeID), + zap.String("join_address", joinAddr)) + + // Allocate ports for this node + var ports PortPair + var err error + if cm.config.SystemHTTPPort > 0 { + ports = PortPair{ + HTTPPort: cm.config.SystemHTTPPort, + RaftPort: cm.config.SystemRaftPort, + } + err = cm.portManager.AllocateSpecificPortPair(systemDBName, ports) + if err != nil { + ports, err = cm.portManager.AllocatePortPair(systemDBName) + } + } else { + ports, err = cm.portManager.AllocatePortPair(systemDBName) + } + + if err != nil { + return fmt.Errorf("failed to allocate ports: %w", err) + } + + // Create RQLite instance for system DB as a follower + advHTTPAddr := fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.HTTPPort) + advRaftAddr := fmt.Sprintf("%s:%d", cm.getAdvertiseAddress(), ports.RaftPort) + + instance := NewRQLiteInstance( + systemDBName, + ports, + cm.dataDir, + advHTTPAddr, + advRaftAddr, + cm.logger, + ) + + // Start as follower with join address + ctx, cancel := context.WithTimeout(cm.ctx, 30*time.Second) + defer cancel() + + if err := instance.Start(ctx, false, joinAddr); err != nil { + cm.logger.Error("Failed to start system database instance", + zap.Error(err)) + return err + } + + // Store the instance + cm.mu.Lock() + cm.activeClusters[systemDBName] = instance + cm.mu.Unlock() + + return nil +} + // initializeSystemDatabase creates and starts the system database on this node func (cm *ClusterManager) initializeSystemDatabase() error { systemDBName := cm.config.SystemDatabaseName @@ -892,9 +991,46 @@ func (cm *ClusterManager) initializeSystemDatabase() error { } if !isMember { - cm.logger.Info("This node is not a member of existing system database, skipping creation", + cm.logger.Info("This node is not a member of existing system database, auto-joining", + zap.String("database", systemDBName)) + + // Auto-join as a follower to the existing cluster + if err := cm.autoJoinSystemDatabase(existingDB); err != nil { + cm.logger.Warn("Failed to auto-join system database", + zap.String("database", systemDBName), + zap.Error(err)) + // Don't fail - the node can still operate without the system database + return nil + } + + // Update metadata to add this node + existingDB.NodeIDs = append(existingDB.NodeIDs, cm.nodeID) + if existingDB.PortMappings == nil { + existingDB.PortMappings = make(map[string]PortPair) + } + + // Get ports from the active cluster + cm.mu.RLock() + instance := cm.activeClusters[systemDBName] + cm.mu.RUnlock() + + if instance != nil { + existingDB.PortMappings[cm.nodeID] = PortPair{ + HTTPPort: instance.HTTPPort, + RaftPort: instance.RaftPort, + } + } + + UpdateDatabaseMetadata(existingDB, cm.nodeID) + cm.metadataStore.SetDatabase(existingDB) + + // Broadcast metadata sync + syncMsg := MetadataSync{Metadata: existingDB} + msgData, _ := MarshalMetadataMessage(MsgMetadataSync, cm.nodeID, syncMsg) + _ = cm.pubsubAdapter.Publish(cm.ctx, "/debros/metadata/v1", msgData) + + cm.logger.Info("Node joined system database cluster", zap.String("database", systemDBName)) - return nil } // Fall through to wait for activation