feat: Add appName configuration to DebrosFramework and enhance DatabaseManager with app-specific handling

This commit is contained in:
anonpenguin 2025-07-11 06:32:43 +03:00
parent 6e1cc2cbf0
commit dfc39bd975
4 changed files with 346 additions and 37 deletions

View File

@ -26,6 +26,9 @@ import { MigrationManager } from './migrations/MigrationManager';
import { FrameworkConfig } from './types/framework';
export interface DebrosFrameworkConfig extends FrameworkConfig {
// Application identity
appName?: string;
// Environment settings
environment?: 'development' | 'production' | 'test';
@ -265,7 +268,8 @@ export class DebrosFramework {
console.log('🔧 Initializing core components...');
// Database Manager
this.databaseManager = new DatabaseManager(this.orbitDBService!);
const appName = this.config.appName || 'debros-app';
this.databaseManager = new DatabaseManager(this.orbitDBService!, appName);
await this.databaseManager.initializeAllDatabases();
console.log('✅ DatabaseManager initialized');

View File

@ -17,9 +17,11 @@ export class DatabaseManager {
private globalDatabases: Map<string, any> = new Map();
private globalDirectoryShards: any[] = [];
private initialized: boolean = false;
private appName: string;
constructor(orbitDBService: FrameworkOrbitDBService) {
constructor(orbitDBService: FrameworkOrbitDBService, appName: string = 'debros-app') {
this.orbitDBService = orbitDBService;
this.appName = appName.toLowerCase().replace(/[^a-z0-9-]/g, '-'); // Sanitize app name
}
async initializeAllDatabases(): Promise<void> {
@ -61,23 +63,179 @@ export class DatabaseManager {
private async initializeSystemDatabases(): Promise<void> {
console.log('🔧 Creating system databases...');
// Create global user directory shards
// Create global user directory shards that are shared across all nodes
const DIRECTORY_SHARD_COUNT = 4; // Configurable
// Use deterministic approach for shared shards
await this.initializeSharedShards(DIRECTORY_SHARD_COUNT);
for (let i = 0; i < DIRECTORY_SHARD_COUNT; i++) {
const shardName = `global-user-directory-shard-${i}`;
console.log(`✅ Initialized ${this.globalDirectoryShards.length} directory shards`);
}
private async initializeSharedShards(shardCount: number): Promise<void> {
console.log(`🔧 Initializing ${shardCount} shared directory shards...`);
// First, create or connect to a bootstrap database for sharing shard addresses
const bootstrapDB = await this.getOrCreateBootstrapDB();
// Implement leader election to prevent race conditions
let shardAddresses: string[] = [];
let isLeader = false;
try {
// Try to become the leader by atomically setting a leader flag
const nodeId = this.getNodeId();
const leaderKey = 'shard-leader';
const shardAddressKey = 'shard-addresses';
// Check if someone is already the leader and has published shards
try {
const existingShards = await bootstrapDB.get(shardAddressKey);
if (existingShards && Array.isArray(existingShards) && existingShards.length === shardCount) {
shardAddresses = existingShards;
console.log(`📡 Found existing shard addresses in bootstrap database`);
}
} catch (error) {
console.log(`🔍 No existing shard addresses found`);
}
if (shardAddresses.length === 0) {
// Try to become the leader
try {
const existingLeader = await bootstrapDB.get(leaderKey);
if (!existingLeader) {
// No leader yet, try to become one
await bootstrapDB.set(leaderKey, { nodeId, timestamp: Date.now() });
console.log(`👑 Became shard leader: ${nodeId}`);
isLeader = true;
} else {
console.log(`🔍 Another node is already the leader: ${existingLeader.nodeId}`);
// Wait a bit for the leader to create shards
await new Promise(resolve => setTimeout(resolve, 2000));
// Try again to get shard addresses
try {
const shards = await bootstrapDB.get(shardAddressKey);
if (shards && Array.isArray(shards) && shards.length === shardCount) {
shardAddresses = shards;
console.log(`📡 Found shard addresses published by leader`);
}
} catch (error) {
console.warn(`⚠️ Leader did not publish shards, will create our own`);
}
}
} catch (error) {
console.log(`🔍 Failed to check/set leader, proceeding anyway`);
}
}
} catch (error) {
console.warn(`⚠️ Bootstrap coordination failed, creating local shards:`, error);
}
if (shardAddresses.length === shardCount) {
// Connect to existing shards
await this.connectToExistingShards(shardAddresses);
} else {
// Create new shards (either as leader or fallback)
await this.createAndPublishShards(shardCount, bootstrapDB, isLeader);
}
console.log(`✅ Initialized ${this.globalDirectoryShards.length} directory shards`);
}
private async getOrCreateBootstrapDB(): Promise<any> {
const bootstrapName = `${this.appName}-bootstrap`;
try {
// Create a well-known bootstrap database that all nodes of this app can access
const bootstrapDB = await this.createDatabase(bootstrapName, 'keyvalue', 'system');
// Wait a moment for potential replication
await new Promise(resolve => setTimeout(resolve, 500));
console.log(`🔧 Connected to bootstrap database: ${bootstrapName}`);
return bootstrapDB;
} catch (error) {
console.error(`❌ Failed to connect to bootstrap database:`, error);
throw error;
}
}
private async connectToExistingShards(shardAddresses: string[]): Promise<void> {
console.log(`📡 Connecting to ${shardAddresses.length} existing shards...`);
for (let i = 0; i < shardAddresses.length; i++) {
try {
const shard = await this.openDatabaseByAddress(shardAddresses[i]);
this.globalDirectoryShards.push(shard);
console.log(`✓ Connected to existing directory shard ${i}: ${shardAddresses[i]}`);
} catch (error) {
console.error(`❌ Failed to connect to shard ${i} at ${shardAddresses[i]}:`, error);
throw new Error(`Failed to connect to existing shard ${i}`);
}
}
}
private async createAndPublishShards(shardCount: number, bootstrapDB: any, isLeader: boolean = false): Promise<void> {
const roleText = isLeader ? 'as leader' : 'as fallback';
console.log(`🔧 Creating ${shardCount} new directory shards ${roleText}...`);
const shardAddresses: string[] = [];
for (let i = 0; i < shardCount; i++) {
const shardName = `${this.appName}-directory-shard-${i}`;
try {
const shard = await this.createDatabase(shardName, 'keyvalue', 'system');
await this.waitForDatabaseSync(shard);
this.globalDirectoryShards.push(shard);
console.log(`✓ Created directory shard: ${shardName}`);
shardAddresses.push(shard.address);
console.log(`✓ Created directory shard ${i}: ${shardName} at ${shard.address}`);
} catch (error) {
console.error(`❌ Failed to create directory shard ${shardName}:`, error);
console.error(`❌ Failed to create directory shard ${i}:`, error);
throw error;
}
}
// Publish shard addresses to bootstrap database (especially important if we're the leader)
if (isLeader || shardAddresses.length > 0) {
try {
await bootstrapDB.set('shard-addresses', shardAddresses);
const publishText = isLeader ? 'as leader' : 'as fallback';
console.log(`📡 Published ${shardAddresses.length} shard addresses to bootstrap database ${publishText}`);
} catch (error) {
console.warn(`⚠️ Failed to publish shard addresses to bootstrap database:`, error);
// Don't fail the whole process if we can't publish
}
}
}
private getNodeId(): string {
// Try to get a unique node identifier
return process.env.NODE_ID || process.env.HOSTNAME || `node-${Math.random().toString(36).substr(2, 9)}`;
}
private async openDatabaseByAddress(address: string): Promise<any> {
try {
// Check if we already have this database cached by address
if (this.databases.has(address)) {
return this.databases.get(address);
}
console.log(`✅ Created ${this.globalDirectoryShards.length} directory shards`);
// Open database by address
const orbitdb = this.orbitDBService.getOrbitDB();
const db = await orbitdb.open(address);
// Cache the database
this.databases.set(address, db);
return db;
} catch (error) {
console.error(`Failed to open database at address ${address}:`, error);
throw new Error(`Database opening failed: ${error}`);
}
}
async createUserDatabases(userId: string): Promise<UserMappingsData> {
@ -204,24 +362,7 @@ export class DatabaseManager {
}
private async openDatabase(address: string): Promise<any> {
try {
// Check if we already have this database cached by address
if (this.databases.has(address)) {
return this.databases.get(address);
}
// Open database by address (implementation may vary based on OrbitDB version)
const orbitdb = this.orbitDBService.getOrbitDB();
const db = await orbitdb.open(address);
// Cache the database
this.databases.set(address, db);
return db;
} catch (error) {
console.error(`Failed to open database at address ${address}:`, error);
throw new Error(`Database opening failed: ${error}`);
}
return await this.openDatabaseByAddress(address);
}
private async registerUserInDirectory(userId: string, mappingsAddress: string): Promise<void> {
@ -234,6 +375,10 @@ export class DatabaseManager {
try {
await shard.set(userId, mappingsAddress);
// Wait for the registration to be replicated across nodes
await this.waitForDatabaseSync(shard);
console.log(`✓ Registered user ${userId} in directory shard ${shardIndex}`);
} catch (error) {
console.error(`Failed to register user ${userId} in directory:`, error);
@ -241,6 +386,42 @@ export class DatabaseManager {
}
}
private async waitForDatabaseSync(database: any): Promise<void> {
// Wait for OrbitDB database to be synced across the network
// This ensures that data is replicated before proceeding
const maxWaitTime = 1000; // Reduced to 1 second max wait
const checkInterval = 50; // Check every 50ms
const startTime = Date.now();
// Wait for the database to be ready and have peers
while (Date.now() - startTime < maxWaitTime) {
try {
// Check if database is accessible and has been replicated
if (database && database.access) {
// For OrbitDB, we can check if the database is ready
await new Promise(resolve => setTimeout(resolve, checkInterval));
// Additional check for peer connectivity if available
if (database.replicationStatus) {
const status = database.replicationStatus();
if (status.buffered === 0 && status.queued === 0) {
break; // Database is synced
}
} else {
// Basic wait to ensure replication
if (Date.now() - startTime > 200) { // Reduced minimum wait to 200ms
break;
}
}
}
} catch (error) {
// Ignore sync check errors, continue with basic wait
}
await new Promise(resolve => setTimeout(resolve, checkInterval));
}
}
private getShardIndex(key: string, shardCount: number): number {
// Simple hash-based sharding
let hash = 0;
@ -351,6 +532,40 @@ export class DatabaseManager {
}
}
async getDocument(database: any, dbType: StoreType, id: string): Promise<any> {
try {
switch (dbType) {
case 'keyvalue':
return await database.get(id);
case 'docstore':
return await database.get(id);
case 'eventlog':
// For eventlog, we need to search through entries
const iterator = database.iterator();
const entries = iterator.collect();
return entries.find((entry: any) => entry.id === id || entry._id === id);
case 'feed':
// For feed, we need to search through entries
const feedIterator = database.iterator();
const feedEntries = feedIterator.collect();
return feedEntries.find((entry: any) => entry.id === id || entry._id === id);
case 'counter':
// Counter doesn't have individual documents
return database.id === id ? { value: database.value, id: database.id } : null;
default:
throw new Error(`Unsupported database type: ${dbType}`);
}
} catch (error) {
console.error(`Error fetching document ${id} from ${dbType} database:`, error);
return null;
}
}
// Cleanup methods
async stop(): Promise<void> {
console.log('🛑 Stopping DatabaseManager...');

View File

@ -177,7 +177,8 @@ export class QueryExecutor<T extends BaseModel> {
if (entityIds.length === 0) {
console.warn('No entities found in directory shards');
return [];
// Try alternative discovery methods when directory shards are empty
return await this.executeAlternativeDiscovery();
}
const results: T[] = [];
@ -211,6 +212,72 @@ export class QueryExecutor<T extends BaseModel> {
return [];
}
}
private async executeAlternativeDiscovery(): Promise<T[]> {
// Alternative discovery method when directory shards are not working
// This is a temporary workaround for the cross-node synchronization issue
console.warn(`🔄 Attempting alternative entity discovery for ${this.model.name}`);
try {
// Try to find entities in the local node's cached user mappings
const localResults = await this.queryLocalUserMappings();
if (localResults.length > 0) {
console.log(`📂 Found ${localResults.length} entities via local discovery`);
return localResults;
}
// If no local results, try to query known database patterns
return await this.queryKnownDatabasePatterns();
} catch (error) {
console.warn('Alternative discovery failed:', error);
return [];
}
}
private async queryLocalUserMappings(): Promise<T[]> {
// Query user mappings that are cached locally
try {
const databaseManager = this.framework.databaseManager;
const results: T[] = [];
// Get cached user mappings from the database manager
const userMappings = (databaseManager as any).userMappings;
if (userMappings && userMappings.size > 0) {
console.log(`📂 Found ${userMappings.size} cached user mappings`);
// Query each cached user's database
for (const [userId, mappings] of userMappings.entries()) {
try {
const userDB = await databaseManager.getUserDatabase(userId, this.model.modelName);
const userResults = await this.queryDatabase(userDB, this.model.storeType);
results.push(...userResults);
} catch (error) {
// Silently handle user database query failures
}
}
}
return results;
} catch (error) {
console.warn('Local user mappings query failed:', error);
return [];
}
}
private async queryKnownDatabasePatterns(): Promise<T[]> {
// Try to query databases using known patterns
// This is a fallback when directory discovery fails
console.warn(`🔍 Attempting known database pattern queries for ${this.model.name}`);
// For now, return empty array to prevent delays
// In a more sophisticated implementation, this could:
// 1. Try common user ID patterns
// 2. Use IPFS to discover databases
// 3. Query peer nodes directly
return [];
}
private async executeGlobalQuery(): Promise<T[]> {
// For globally scoped models
@ -646,15 +713,15 @@ export class QueryExecutor<T extends BaseModel> {
}
private async getAllEntityIdsFromDirectory(): Promise<string[]> {
const maxRetries = 3;
const baseDelay = 100; // ms
const maxRetries = 2; // Reduced retry count to prevent long delays
const baseDelay = 50; // Reduced base delay
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const directoryShards = await this.framework.databaseManager.getGlobalDirectoryShards();
const entityIds: string[] = [];
// Query all directory shards in parallel
// Query all directory shards - simplified approach
const shardPromises = directoryShards.map(async (shard: any, index: number) => {
try {
// For keyvalue stores, we need to get the keys (entity IDs), not values
@ -674,10 +741,13 @@ export class QueryExecutor<T extends BaseModel> {
entityIds.push(...shardEntityIds);
}
// Remove duplicates and filter out empty strings
const uniqueEntityIds = [...new Set(entityIds.filter(id => id && id.trim()))];
// If we found entities, return them
if (entityIds.length > 0) {
console.log(`📂 Found ${entityIds.length} entities in directory shards`);
return entityIds;
if (uniqueEntityIds.length > 0) {
console.log(`📂 Found ${uniqueEntityIds.length} entities in directory shards`);
return uniqueEntityIds;
}
// If this is our last attempt, return empty array
@ -686,8 +756,8 @@ export class QueryExecutor<T extends BaseModel> {
return [];
}
// Wait before retry with exponential backoff
const delay = baseDelay * Math.pow(2, attempt);
// Wait before retry with linear backoff (shorter delays)
const delay = baseDelay * (attempt + 1);
console.log(`📂 No entities found, retrying in ${delay}ms (attempt ${attempt + 1}/${maxRetries + 1})`);
await new Promise(resolve => setTimeout(resolve, delay));
@ -699,7 +769,7 @@ export class QueryExecutor<T extends BaseModel> {
}
// Wait before retry
const delay = baseDelay * Math.pow(2, attempt);
const delay = baseDelay * (attempt + 1);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
@ -707,6 +777,25 @@ export class QueryExecutor<T extends BaseModel> {
return [];
}
private async waitForShardReady(shard: any): Promise<void> {
// Wait briefly for the shard to be ready for reading
const maxWait = 200; // ms
const startTime = Date.now();
while (Date.now() - startTime < maxWait) {
try {
if (shard && shard.all) {
// Try to access the shard data
shard.all();
break;
}
} catch (error) {
// Continue waiting
}
await new Promise(resolve => setTimeout(resolve, 20));
}
}
private getFrameworkInstance(): any {
const framework = (globalThis as any).__debrosFramework;
if (!framework) {

View File

@ -705,6 +705,7 @@ class BlogAPIServer {
// Initialize framework
this.framework = new DebrosFramework({
appName: 'blog-app', // Unique app name for this blog application
environment: 'test',
features: {
autoMigration: true,