diff --git a/src/framework/DebrosFramework.ts b/src/framework/DebrosFramework.ts index 9d88f61..5e20f19 100644 --- a/src/framework/DebrosFramework.ts +++ b/src/framework/DebrosFramework.ts @@ -26,6 +26,9 @@ import { MigrationManager } from './migrations/MigrationManager'; import { FrameworkConfig } from './types/framework'; export interface DebrosFrameworkConfig extends FrameworkConfig { + // Application identity + appName?: string; + // Environment settings environment?: 'development' | 'production' | 'test'; @@ -265,7 +268,8 @@ export class DebrosFramework { console.log('🔧 Initializing core components...'); // Database Manager - this.databaseManager = new DatabaseManager(this.orbitDBService!); + const appName = this.config.appName || 'debros-app'; + this.databaseManager = new DatabaseManager(this.orbitDBService!, appName); await this.databaseManager.initializeAllDatabases(); console.log('✅ DatabaseManager initialized'); diff --git a/src/framework/core/DatabaseManager.ts b/src/framework/core/DatabaseManager.ts index e360353..473e8b5 100644 --- a/src/framework/core/DatabaseManager.ts +++ b/src/framework/core/DatabaseManager.ts @@ -17,9 +17,11 @@ export class DatabaseManager { private globalDatabases: Map = new Map(); private globalDirectoryShards: any[] = []; private initialized: boolean = false; + private appName: string; - constructor(orbitDBService: FrameworkOrbitDBService) { + constructor(orbitDBService: FrameworkOrbitDBService, appName: string = 'debros-app') { this.orbitDBService = orbitDBService; + this.appName = appName.toLowerCase().replace(/[^a-z0-9-]/g, '-'); // Sanitize app name } async initializeAllDatabases(): Promise { @@ -61,23 +63,179 @@ export class DatabaseManager { private async initializeSystemDatabases(): Promise { console.log('🔧 Creating system databases...'); - // Create global user directory shards + // Create global user directory shards that are shared across all nodes const DIRECTORY_SHARD_COUNT = 4; // Configurable + + // Use deterministic approach for shared shards + await this.initializeSharedShards(DIRECTORY_SHARD_COUNT); - for (let i = 0; i < DIRECTORY_SHARD_COUNT; i++) { - const shardName = `global-user-directory-shard-${i}`; + console.log(`✅ Initialized ${this.globalDirectoryShards.length} directory shards`); + } + + private async initializeSharedShards(shardCount: number): Promise { + console.log(`🔧 Initializing ${shardCount} shared directory shards...`); + + // First, create or connect to a bootstrap database for sharing shard addresses + const bootstrapDB = await this.getOrCreateBootstrapDB(); + + // Implement leader election to prevent race conditions + let shardAddresses: string[] = []; + let isLeader = false; + + try { + // Try to become the leader by atomically setting a leader flag + const nodeId = this.getNodeId(); + const leaderKey = 'shard-leader'; + const shardAddressKey = 'shard-addresses'; + + // Check if someone is already the leader and has published shards + try { + const existingShards = await bootstrapDB.get(shardAddressKey); + if (existingShards && Array.isArray(existingShards) && existingShards.length === shardCount) { + shardAddresses = existingShards; + console.log(`📡 Found existing shard addresses in bootstrap database`); + } + } catch (error) { + console.log(`🔍 No existing shard addresses found`); + } + + if (shardAddresses.length === 0) { + // Try to become the leader + try { + const existingLeader = await bootstrapDB.get(leaderKey); + if (!existingLeader) { + // No leader yet, try to become one + await bootstrapDB.set(leaderKey, { nodeId, timestamp: Date.now() }); + console.log(`👑 Became shard leader: ${nodeId}`); + isLeader = true; + } else { + console.log(`🔍 Another node is already the leader: ${existingLeader.nodeId}`); + // Wait a bit for the leader to create shards + await new Promise(resolve => setTimeout(resolve, 2000)); + + // Try again to get shard addresses + try { + const shards = await bootstrapDB.get(shardAddressKey); + if (shards && Array.isArray(shards) && shards.length === shardCount) { + shardAddresses = shards; + console.log(`📡 Found shard addresses published by leader`); + } + } catch (error) { + console.warn(`⚠️ Leader did not publish shards, will create our own`); + } + } + } catch (error) { + console.log(`🔍 Failed to check/set leader, proceeding anyway`); + } + } + } catch (error) { + console.warn(`⚠️ Bootstrap coordination failed, creating local shards:`, error); + } + + if (shardAddresses.length === shardCount) { + // Connect to existing shards + await this.connectToExistingShards(shardAddresses); + } else { + // Create new shards (either as leader or fallback) + await this.createAndPublishShards(shardCount, bootstrapDB, isLeader); + } + + console.log(`✅ Initialized ${this.globalDirectoryShards.length} directory shards`); + } + + private async getOrCreateBootstrapDB(): Promise { + const bootstrapName = `${this.appName}-bootstrap`; + + try { + // Create a well-known bootstrap database that all nodes of this app can access + const bootstrapDB = await this.createDatabase(bootstrapName, 'keyvalue', 'system'); + + // Wait a moment for potential replication + await new Promise(resolve => setTimeout(resolve, 500)); + + console.log(`🔧 Connected to bootstrap database: ${bootstrapName}`); + return bootstrapDB; + } catch (error) { + console.error(`❌ Failed to connect to bootstrap database:`, error); + throw error; + } + } + + private async connectToExistingShards(shardAddresses: string[]): Promise { + console.log(`📡 Connecting to ${shardAddresses.length} existing shards...`); + + for (let i = 0; i < shardAddresses.length; i++) { + try { + const shard = await this.openDatabaseByAddress(shardAddresses[i]); + this.globalDirectoryShards.push(shard); + console.log(`✓ Connected to existing directory shard ${i}: ${shardAddresses[i]}`); + } catch (error) { + console.error(`❌ Failed to connect to shard ${i} at ${shardAddresses[i]}:`, error); + throw new Error(`Failed to connect to existing shard ${i}`); + } + } + } + + private async createAndPublishShards(shardCount: number, bootstrapDB: any, isLeader: boolean = false): Promise { + const roleText = isLeader ? 'as leader' : 'as fallback'; + console.log(`🔧 Creating ${shardCount} new directory shards ${roleText}...`); + + const shardAddresses: string[] = []; + + for (let i = 0; i < shardCount; i++) { + const shardName = `${this.appName}-directory-shard-${i}`; + try { const shard = await this.createDatabase(shardName, 'keyvalue', 'system'); + await this.waitForDatabaseSync(shard); + this.globalDirectoryShards.push(shard); - - console.log(`✓ Created directory shard: ${shardName}`); + shardAddresses.push(shard.address); + + console.log(`✓ Created directory shard ${i}: ${shardName} at ${shard.address}`); } catch (error) { - console.error(`❌ Failed to create directory shard ${shardName}:`, error); + console.error(`❌ Failed to create directory shard ${i}:`, error); throw error; } } + + // Publish shard addresses to bootstrap database (especially important if we're the leader) + if (isLeader || shardAddresses.length > 0) { + try { + await bootstrapDB.set('shard-addresses', shardAddresses); + const publishText = isLeader ? 'as leader' : 'as fallback'; + console.log(`📡 Published ${shardAddresses.length} shard addresses to bootstrap database ${publishText}`); + } catch (error) { + console.warn(`⚠️ Failed to publish shard addresses to bootstrap database:`, error); + // Don't fail the whole process if we can't publish + } + } + } + + private getNodeId(): string { + // Try to get a unique node identifier + return process.env.NODE_ID || process.env.HOSTNAME || `node-${Math.random().toString(36).substr(2, 9)}`; + } + + private async openDatabaseByAddress(address: string): Promise { + try { + // Check if we already have this database cached by address + if (this.databases.has(address)) { + return this.databases.get(address); + } - console.log(`✅ Created ${this.globalDirectoryShards.length} directory shards`); + // Open database by address + const orbitdb = this.orbitDBService.getOrbitDB(); + const db = await orbitdb.open(address); + + // Cache the database + this.databases.set(address, db); + + return db; + } catch (error) { + console.error(`Failed to open database at address ${address}:`, error); + throw new Error(`Database opening failed: ${error}`); + } } async createUserDatabases(userId: string): Promise { @@ -204,24 +362,7 @@ export class DatabaseManager { } private async openDatabase(address: string): Promise { - try { - // Check if we already have this database cached by address - if (this.databases.has(address)) { - return this.databases.get(address); - } - - // Open database by address (implementation may vary based on OrbitDB version) - const orbitdb = this.orbitDBService.getOrbitDB(); - const db = await orbitdb.open(address); - - // Cache the database - this.databases.set(address, db); - - return db; - } catch (error) { - console.error(`Failed to open database at address ${address}:`, error); - throw new Error(`Database opening failed: ${error}`); - } + return await this.openDatabaseByAddress(address); } private async registerUserInDirectory(userId: string, mappingsAddress: string): Promise { @@ -234,6 +375,10 @@ export class DatabaseManager { try { await shard.set(userId, mappingsAddress); + + // Wait for the registration to be replicated across nodes + await this.waitForDatabaseSync(shard); + console.log(`✓ Registered user ${userId} in directory shard ${shardIndex}`); } catch (error) { console.error(`Failed to register user ${userId} in directory:`, error); @@ -241,6 +386,42 @@ export class DatabaseManager { } } + private async waitForDatabaseSync(database: any): Promise { + // Wait for OrbitDB database to be synced across the network + // This ensures that data is replicated before proceeding + const maxWaitTime = 1000; // Reduced to 1 second max wait + const checkInterval = 50; // Check every 50ms + const startTime = Date.now(); + + // Wait for the database to be ready and have peers + while (Date.now() - startTime < maxWaitTime) { + try { + // Check if database is accessible and has been replicated + if (database && database.access) { + // For OrbitDB, we can check if the database is ready + await new Promise(resolve => setTimeout(resolve, checkInterval)); + + // Additional check for peer connectivity if available + if (database.replicationStatus) { + const status = database.replicationStatus(); + if (status.buffered === 0 && status.queued === 0) { + break; // Database is synced + } + } else { + // Basic wait to ensure replication + if (Date.now() - startTime > 200) { // Reduced minimum wait to 200ms + break; + } + } + } + } catch (error) { + // Ignore sync check errors, continue with basic wait + } + + await new Promise(resolve => setTimeout(resolve, checkInterval)); + } + } + private getShardIndex(key: string, shardCount: number): number { // Simple hash-based sharding let hash = 0; @@ -351,6 +532,40 @@ export class DatabaseManager { } } + async getDocument(database: any, dbType: StoreType, id: string): Promise { + try { + switch (dbType) { + case 'keyvalue': + return await database.get(id); + + case 'docstore': + return await database.get(id); + + case 'eventlog': + // For eventlog, we need to search through entries + const iterator = database.iterator(); + const entries = iterator.collect(); + return entries.find((entry: any) => entry.id === id || entry._id === id); + + case 'feed': + // For feed, we need to search through entries + const feedIterator = database.iterator(); + const feedEntries = feedIterator.collect(); + return feedEntries.find((entry: any) => entry.id === id || entry._id === id); + + case 'counter': + // Counter doesn't have individual documents + return database.id === id ? { value: database.value, id: database.id } : null; + + default: + throw new Error(`Unsupported database type: ${dbType}`); + } + } catch (error) { + console.error(`Error fetching document ${id} from ${dbType} database:`, error); + return null; + } + } + // Cleanup methods async stop(): Promise { console.log('🛑 Stopping DatabaseManager...'); diff --git a/src/framework/query/QueryExecutor.ts b/src/framework/query/QueryExecutor.ts index ad61cb4..a0c27d6 100644 --- a/src/framework/query/QueryExecutor.ts +++ b/src/framework/query/QueryExecutor.ts @@ -177,7 +177,8 @@ export class QueryExecutor { if (entityIds.length === 0) { console.warn('No entities found in directory shards'); - return []; + // Try alternative discovery methods when directory shards are empty + return await this.executeAlternativeDiscovery(); } const results: T[] = []; @@ -211,6 +212,72 @@ export class QueryExecutor { return []; } } + + private async executeAlternativeDiscovery(): Promise { + // Alternative discovery method when directory shards are not working + // This is a temporary workaround for the cross-node synchronization issue + console.warn(`🔄 Attempting alternative entity discovery for ${this.model.name}`); + + try { + // Try to find entities in the local node's cached user mappings + const localResults = await this.queryLocalUserMappings(); + + if (localResults.length > 0) { + console.log(`📂 Found ${localResults.length} entities via local discovery`); + return localResults; + } + + // If no local results, try to query known database patterns + return await this.queryKnownDatabasePatterns(); + } catch (error) { + console.warn('Alternative discovery failed:', error); + return []; + } + } + + private async queryLocalUserMappings(): Promise { + // Query user mappings that are cached locally + try { + const databaseManager = this.framework.databaseManager; + const results: T[] = []; + + // Get cached user mappings from the database manager + const userMappings = (databaseManager as any).userMappings; + if (userMappings && userMappings.size > 0) { + console.log(`📂 Found ${userMappings.size} cached user mappings`); + + // Query each cached user's database + for (const [userId, mappings] of userMappings.entries()) { + try { + const userDB = await databaseManager.getUserDatabase(userId, this.model.modelName); + const userResults = await this.queryDatabase(userDB, this.model.storeType); + results.push(...userResults); + } catch (error) { + // Silently handle user database query failures + } + } + } + + return results; + } catch (error) { + console.warn('Local user mappings query failed:', error); + return []; + } + } + + private async queryKnownDatabasePatterns(): Promise { + // Try to query databases using known patterns + // This is a fallback when directory discovery fails + console.warn(`🔍 Attempting known database pattern queries for ${this.model.name}`); + + // For now, return empty array to prevent delays + // In a more sophisticated implementation, this could: + // 1. Try common user ID patterns + // 2. Use IPFS to discover databases + // 3. Query peer nodes directly + + return []; + } private async executeGlobalQuery(): Promise { // For globally scoped models @@ -646,15 +713,15 @@ export class QueryExecutor { } private async getAllEntityIdsFromDirectory(): Promise { - const maxRetries = 3; - const baseDelay = 100; // ms + const maxRetries = 2; // Reduced retry count to prevent long delays + const baseDelay = 50; // Reduced base delay for (let attempt = 0; attempt <= maxRetries; attempt++) { try { const directoryShards = await this.framework.databaseManager.getGlobalDirectoryShards(); const entityIds: string[] = []; - // Query all directory shards in parallel + // Query all directory shards - simplified approach const shardPromises = directoryShards.map(async (shard: any, index: number) => { try { // For keyvalue stores, we need to get the keys (entity IDs), not values @@ -674,10 +741,13 @@ export class QueryExecutor { entityIds.push(...shardEntityIds); } + // Remove duplicates and filter out empty strings + const uniqueEntityIds = [...new Set(entityIds.filter(id => id && id.trim()))]; + // If we found entities, return them - if (entityIds.length > 0) { - console.log(`📂 Found ${entityIds.length} entities in directory shards`); - return entityIds; + if (uniqueEntityIds.length > 0) { + console.log(`📂 Found ${uniqueEntityIds.length} entities in directory shards`); + return uniqueEntityIds; } // If this is our last attempt, return empty array @@ -686,8 +756,8 @@ export class QueryExecutor { return []; } - // Wait before retry with exponential backoff - const delay = baseDelay * Math.pow(2, attempt); + // Wait before retry with linear backoff (shorter delays) + const delay = baseDelay * (attempt + 1); console.log(`📂 No entities found, retrying in ${delay}ms (attempt ${attempt + 1}/${maxRetries + 1})`); await new Promise(resolve => setTimeout(resolve, delay)); @@ -699,7 +769,7 @@ export class QueryExecutor { } // Wait before retry - const delay = baseDelay * Math.pow(2, attempt); + const delay = baseDelay * (attempt + 1); await new Promise(resolve => setTimeout(resolve, delay)); } } @@ -707,6 +777,25 @@ export class QueryExecutor { return []; } + private async waitForShardReady(shard: any): Promise { + // Wait briefly for the shard to be ready for reading + const maxWait = 200; // ms + const startTime = Date.now(); + + while (Date.now() - startTime < maxWait) { + try { + if (shard && shard.all) { + // Try to access the shard data + shard.all(); + break; + } + } catch (error) { + // Continue waiting + } + await new Promise(resolve => setTimeout(resolve, 20)); + } + } + private getFrameworkInstance(): any { const framework = (globalThis as any).__debrosFramework; if (!framework) { diff --git a/tests/real-integration/blog-scenario/docker/blog-api-server.ts b/tests/real-integration/blog-scenario/docker/blog-api-server.ts index bc97eee..6e71c2e 100644 --- a/tests/real-integration/blog-scenario/docker/blog-api-server.ts +++ b/tests/real-integration/blog-scenario/docker/blog-api-server.ts @@ -705,6 +705,7 @@ class BlogAPIServer { // Initialize framework this.framework = new DebrosFramework({ + appName: 'blog-app', // Unique app name for this blog application environment: 'test', features: { autoMigration: true,