diff --git a/README.md b/README.md index adf05da..ae20a09 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,21 @@ # @debros/network -Core networking functionality for the Debros decentralized network. This package provides essential IPFS, libp2p, and OrbitDB functionality to build decentralized applications on the Debros network. +Core networking functionality for the Debros decentralized network. This package provides a powerful database interface with advanced features built on IPFS and OrbitDB for decentralized applications. ## Features -- Pre-configured IPFS/libp2p node setup -- Service discovery for peer-to-peer communication -- OrbitDB integration for distributed databases -- Consistent logging across network components -- Secure key generation +- Rich database-like API with TypeScript support +- Multiple database store types (KeyValue, Document, Feed, Counter) +- Document operations with schema validation +- Advanced querying with pagination, sorting and filtering +- Transaction support for batch operations +- Built-in file storage with metadata +- Real-time subscriptions for data changes +- Memory caching for performance +- Connection pooling for managing multiple database instances +- Index creation for faster queries +- Comprehensive error handling with error codes +- Performance metrics and monitoring ## Installation @@ -19,80 +26,296 @@ npm install @debros/network ## Basic Usage ```typescript -import { initConfig, initIpfs, initOrbitDB, logger } from "@debros/network"; +import { initDB, create, get, query, uploadFile, logger } from "@debros/network"; -// Initialize with custom configuration (optional) -const config = initConfig({ - env: { - fingerprint: "my-unique-node-id", - port: 8080, - }, - ipfs: { - bootstrapNodes: "node1,node2,node3", - }, -}); - -// Start the network node -async function startNode() { +// Initialize the database service +async function startApp() { try { - // Initialize IPFS - const ipfs = await initIpfs(); - - // Initialize OrbitDB with the IPFS instance - const orbitdb = await initOrbitDB({ getHelia: () => ipfs }); - - // Create/open a database - const db = await orbitDB("myDatabase", "feed"); - - logger.info("Node started successfully"); - - return { ipfs, orbitdb, db }; + // Initialize with default configuration + await initDB(); + logger.info("Database initialized successfully"); + + // Create a new user document + const userId = 'user123'; + const user = { + username: 'johndoe', + walletAddress: '0x1234567890', + avatar: null + }; + + const result = await create('users', userId, user); + logger.info(`Created user with ID: ${result.id}`); + + // Get a user by ID + const retrievedUser = await get('users', userId); + logger.info('User:', retrievedUser); + + // Query users with filtering + const activeUsers = await query('users', + user => user.isActive === true, + { limit: 10, sort: { field: 'createdAt', order: 'desc' } } + ); + logger.info(`Found ${activeUsers.total} active users`); + + // Upload a file + const fileData = Buffer.from('File content'); + const fileUpload = await uploadFile(fileData, { filename: 'document.txt' }); + logger.info(`Uploaded file with CID: ${fileUpload.cid}`); + + return true; } catch (error) { - logger.error("Failed to start node:", error); + logger.error("Failed to start app:", error); throw error; } } -startNode(); +startApp(); ``` -## Configuration +## Database Store Types -The package provides sensible defaults but can be customized: +The library supports multiple OrbitDB store types, each optimized for different use cases: ```typescript -import { initConfig } from "@debros/network"; +import { create, get, update, StoreType } from "@debros/network"; -const customConfig = initConfig({ - env: { - fingerprint: "unique-fingerprint", - port: 9000, - }, - ipfs: { - blockstorePath: "./custom-blockstore", - serviceDiscovery: { - topic: "my-custom-topic", - heartbeatInterval: 10000, +// Default KeyValue store (for general use) +await create('users', 'user1', { name: 'Alice' }); + +// Document store (better for complex documents with indexing) +await create('posts', 'post1', { title: 'Hello', content: '...' }, + { storeType: StoreType.DOCSTORE } +); + +// Feed/EventLog store (append-only, good for immutable logs) +await create('events', 'evt1', { type: 'login', user: 'alice' }, + { storeType: StoreType.FEED } +); + +// Counter store (for numeric counters) +await create('stats', 'visits', { value: 0 }, + { storeType: StoreType.COUNTER } +); + +// Increment a counter +await update('stats', 'visits', { increment: 1 }, + { storeType: StoreType.COUNTER } +); + +// Get counter value +const stats = await get('stats', 'visits', { storeType: StoreType.COUNTER }); +console.log(`Visit count: ${stats.value}`); +``` + +## Advanced Features + +### Schema Validation + +```typescript +import { defineSchema, create } from "@debros/network"; + +// Define a schema +defineSchema('users', { + properties: { + username: { + type: 'string', + required: true, + min: 3, + max: 20 }, + email: { + type: 'string', + pattern: '^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$' + }, + age: { + type: 'number', + min: 18 + } }, - orbitdb: { - directory: "./custom-orbitdb", - }, + required: ['username'] +}); + +// Document creation will be validated against the schema +await create('users', 'user1', { + username: 'alice', + email: 'alice@example.com', + age: 25 }); ``` -## Documentation +### Transactions -### Core Modules +```typescript +import { createTransaction, commitTransaction } from "@debros/network"; -- **IPFS Service**: Setup and manage IPFS nodes -- **OrbitDB Service**: Distributed database management -- **Config**: Network configuration -- **Logger**: Consistent logging +// Create a transaction +const transaction = createTransaction(); -### Main Exports +// Add multiple operations +transaction + .create('posts', 'post1', { title: 'Hello World', content: '...' }) + .update('users', 'user1', { postCount: 1 }) + .delete('drafts', 'draft1'); -- `initConfig`: Configure the network node -- `ipfsService`: IPFS node management -- `orbitDBService`: OrbitDB operations -- `logger`: Logging utilities +// Commit all operations +const result = await commitTransaction(transaction); +console.log(`Transaction completed with ${result.results.length} operations`); +``` + +### Subscriptions + +```typescript +import { subscribe } from "@debros/network"; + +// Subscribe to document changes +const unsubscribe = subscribe('document:created', (data) => { + console.log(`New document created in ${data.collection}:`, data.id); +}); + +// Later, unsubscribe +unsubscribe(); +``` + +### Pagination and Sorting + +```typescript +import { list, query } from "@debros/network"; + +// List with pagination and sorting +const page1 = await list('users', { + limit: 10, + offset: 0, + sort: { field: 'createdAt', order: 'desc' } +}); + +// Query with pagination +const results = await query('users', + (user) => user.age > 21, + { limit: 10, offset: 20 } +); + +console.log(`Found ${results.total} matches, showing ${results.documents.length}`); +console.log(`Has more pages: ${results.hasMore}`); +``` + +### TypeScript Support + +```typescript +import { get, update, query } from "@debros/network"; + +interface User { + username: string; + email: string; + age: number; + createdAt: number; + updatedAt: number; +} + +// Type-safe operations +const user = await get('users', 'user1'); + +await update('users', 'user1', { age: 26 }); + +const results = await query('users', + (user) => user.age > 21 +); +``` + +### Connection Management + +```typescript +import { initDB, closeConnection } from "@debros/network"; + +// Create multiple connections +const conn1 = await initDB('connection1'); +const conn2 = await initDB('connection2'); + +// Use specific connection +await create('users', 'user1', { name: 'Alice' }, { connectionId: conn1 }); + +// Close a specific connection +await closeConnection(conn1); +``` + +### Performance Metrics + +```typescript +import { getMetrics, resetMetrics } from "@debros/network"; + +// Get performance metrics +const metrics = getMetrics(); +console.log('Operations:', metrics.operations); +console.log('Avg operation time:', metrics.performance.averageOperationTime, 'ms'); +console.log('Cache hits/misses:', metrics.cacheStats); + +// Reset metrics (e.g., after deployment) +resetMetrics(); +``` + +## API Reference + +### Core Database Operations + +- `initDB(connectionId?: string): Promise` - Initialize the database +- `create(collection, id, data, options?): Promise` - Create a document +- `get(collection, id, options?): Promise` - Get a document by ID +- `update(collection, id, data, options?): Promise` - Update a document +- `remove(collection, id, options?): Promise` - Delete a document +- `list(collection, options?): Promise>` - List documents with pagination +- `query(collection, filter, options?): Promise>` - Query documents +- `stopDB(): Promise` - Stop the database service + +### Store Types + +- `StoreType.KEYVALUE` - Key-value pair storage (default) +- `StoreType.DOCSTORE` - Document storage with indexing +- `StoreType.FEED` - Append-only log +- `StoreType.EVENTLOG` - Alias for FEED +- `StoreType.COUNTER` - Numeric counter + +### Schema Validation + +- `defineSchema(collection, schema): void` - Define a schema for a collection + +### Transactions + +- `createTransaction(connectionId?): Transaction` - Create a new transaction +- `commitTransaction(transaction): Promise<{success, results}>` - Execute the transaction +- `Transaction.create(collection, id, data): Transaction` - Add a create operation +- `Transaction.update(collection, id, data): Transaction` - Add an update operation +- `Transaction.delete(collection, id): Transaction` - Add a delete operation + +### Subscriptions + +- `subscribe(event, callback): () => void` - Subscribe to events, returns unsubscribe function + +### File Operations + +- `uploadFile(fileData, options?): Promise` - Upload a file +- `getFile(cid, options?): Promise` - Get a file by CID +- `deleteFile(cid, options?): Promise` - Delete a file + +### Connection Management + +- `closeConnection(connectionId): Promise` - Close a specific connection + +### Indexes and Performance + +- `createIndex(collection, field, options?): Promise` - Create an index +- `getMetrics(): Metrics` - Get performance metrics +- `resetMetrics(): void` - Reset performance metrics + +## Configuration + +```typescript +import { config, initDB } from "@debros/network"; + +// Configure (optional) +config.env.fingerprint = "my-unique-app-id"; +config.env.port = 9000; +config.ipfs.blockstorePath = "./custom-path/blockstore"; +config.orbitdb.directory = "./custom-path/orbitdb"; + +// Initialize with configuration +await initDB(); +``` \ No newline at end of file diff --git a/examples/abstract-database.ts b/examples/abstract-database.ts new file mode 100644 index 0000000..c1f275a --- /dev/null +++ b/examples/abstract-database.ts @@ -0,0 +1,73 @@ +import { initDB, create, get, update, remove, list, query, uploadFile, getFile, deleteFile, stopDB, logger } from '../src'; + +// Alternative import method +// import debros from '../src'; +// const { db } = debros; + +async function databaseExample() { + try { + logger.info('Starting database example...'); + + // Initialize the database service (abstracts away IPFS and OrbitDB) + await initDB(); + logger.info('Database service initialized'); + + // Create a new user document + const userId = 'user123'; + const userData = { + username: 'johndoe', + walletAddress: '0x1234567890', + avatar: null + }; + + const createResult = await create('users', userId, userData); + logger.info(`Created user with ID: ${createResult.id} and hash: ${createResult.hash}`); + + // Retrieve the user + const user = await get('users', userId); + logger.info('Retrieved user:', user); + + // Update the user + const updateResult = await update('users', userId, { + avatar: 'profile.jpg', + bio: 'Software developer' + }); + logger.info(`Updated user with hash: ${updateResult.hash}`); + + // Query users + const filteredUsers = await query('users', (user) => user.username === 'johndoe'); + logger.info(`Found ${filteredUsers.length} matching users`); + + // List all users + const allUsers = await list('users', { limit: 10 }); + logger.info(`Retrieved ${allUsers.length} users`); + + // Upload a file + const fileData = Buffer.from('This is a test file content'); + const fileUpload = await uploadFile(fileData, { filename: 'test.txt' }); + logger.info(`Uploaded file with CID: ${fileUpload.cid}`); + + // Retrieve the file + const file = await getFile(fileUpload.cid); + logger.info('Retrieved file:', { + content: file.data.toString(), + metadata: file.metadata + }); + + // Delete the file + await deleteFile(fileUpload.cid); + logger.info('File deleted'); + + // Delete the user + await remove('users', userId); + logger.info('User deleted'); + + // Stop the database service + await stopDB(); + logger.info('Database service stopped'); + } catch (error) { + logger.error('Error in database example:', error); + } +} + +databaseExample(); \ No newline at end of file diff --git a/examples/advanced-database.ts b/examples/advanced-database.ts new file mode 100644 index 0000000..bac161b --- /dev/null +++ b/examples/advanced-database.ts @@ -0,0 +1,266 @@ +import { + initDB, + create, + get, + update, + remove, + list, + query, + uploadFile, + getFile, + createTransaction, + commitTransaction, + subscribe, + defineSchema, + getMetrics, + createIndex, + ErrorCode, + StoreType, + logger +} from '../src'; + +// Define a user schema +const userSchema = { + properties: { + username: { + type: 'string', + required: true, + min: 3, + max: 20, + }, + email: { + type: 'string', + pattern: '^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$', + }, + age: { + type: 'number', + min: 18, + max: 120, + }, + roles: { + type: 'array', + items: { + type: 'string', + }, + }, + isActive: { + type: 'boolean', + }, + }, + required: ['username'], +}; + +async function advancedDatabaseExample() { + try { + logger.info('Starting advanced database example...'); + + // Initialize the database service with a specific connection ID + const connectionId = await initDB('example-connection'); + logger.info(`Database service initialized with connection ID: ${connectionId}`); + + // Define schema for validation + defineSchema('users', userSchema); + logger.info('User schema defined'); + + // Create index for faster queries (works with docstore) + await createIndex('users', 'username', { storeType: StoreType.DOCSTORE }); + logger.info('Created index on username field'); + + // Set up subscription for real-time updates + const unsubscribe = subscribe('document:created', (data) => { + logger.info('Document created:', data.collection, data.id); + }); + + // Create multiple users using a transaction + const transaction = createTransaction(connectionId); + + transaction + .create('users', 'user1', { + username: 'alice', + email: 'alice@example.com', + age: 28, + roles: ['admin', 'user'], + isActive: true, + }) + .create('users', 'user2', { + username: 'bob', + email: 'bob@example.com', + age: 34, + roles: ['user'], + isActive: true, + }) + .create('users', 'user3', { + username: 'charlie', + email: 'charlie@example.com', + age: 45, + roles: ['moderator', 'user'], + isActive: false, + }); + + const txResult = await commitTransaction(transaction); + logger.info(`Transaction committed with ${txResult.results.length} operations`); + + // Get a specific user with type safety + interface User { + username: string; + email: string; + age: number; + roles: string[]; + isActive: boolean; + createdAt: number; + updatedAt: number; + } + + // Using KeyValue store (default) + const user = await get('users', 'user1'); + logger.info('Retrieved user from KeyValue store:', user?.username, user?.email); + + // Using DocStore + await create( + 'users_docstore', + 'user1', + { + username: 'alice_doc', + email: 'alice_doc@example.com', + age: 28, + roles: ['admin', 'user'], + isActive: true, + }, + { storeType: StoreType.DOCSTORE } + ); + + const docStoreUser = await get('users_docstore', 'user1', { storeType: StoreType.DOCSTORE }); + logger.info('Retrieved user from DocStore:', docStoreUser?.username, docStoreUser?.email); + + // Using Feed/EventLog store + await create( + 'users_feed', + 'user1', + { + username: 'alice_feed', + email: 'alice_feed@example.com', + age: 28, + roles: ['admin', 'user'], + isActive: true, + }, + { storeType: StoreType.FEED } + ); + + // Update the feed entry (creates a new entry with the same ID) + await update( + 'users_feed', + 'user1', + { + roles: ['admin', 'user', 'tester'], + }, + { storeType: StoreType.FEED } + ); + + // List all entries in the feed + const feedUsers = await list('users_feed', { storeType: StoreType.FEED }); + logger.info(`Found ${feedUsers.total} feed entries:`); + feedUsers.documents.forEach(user => { + logger.info(`- ${user.username} (${user.email})`); + }); + + // Using Counter store + await create( + 'counters', + 'visitors', + { value: 100 }, + { storeType: StoreType.COUNTER } + ); + + // Increment the counter + await update( + 'counters', + 'visitors', + { increment: 5 }, + { storeType: StoreType.COUNTER } + ); + + // Get the counter value + const counter = await get('counters', 'visitors', { storeType: StoreType.COUNTER }); + logger.info(`Counter value: ${counter?.value}`); + + // Update a user in KeyValue store + await update('users', 'user1', { + roles: ['admin', 'user', 'tester'], + }); + + // Query users from KeyValue store + const result = await query( + 'users', + (user) => user.isActive === true, + { + limit: 10, + offset: 0, + sort: { field: 'username', order: 'asc' }, + } + ); + + logger.info(`Found ${result.total} active users:`); + result.documents.forEach(user => { + logger.info(`- ${user.username} (${user.email})`); + }); + + // List all users from KeyValue store with pagination + const allUsers = await list('users', { + limit: 10, + offset: 0, + sort: { field: 'age', order: 'desc' }, + }); + + logger.info(`Listed ${allUsers.total} users sorted by age (desc):`); + allUsers.documents.forEach(user => { + logger.info(`- ${user.username}: ${user.age} years old`); + }); + + // Upload a file with metadata + const fileData = Buffer.from('This is a test file with advanced features.'); + const fileUpload = await uploadFile(fileData, { + filename: 'advanced-test.txt', + metadata: { + contentType: 'text/plain', + tags: ['example', 'test'], + owner: 'user1', + } + }); + + logger.info(`Uploaded file with CID: ${fileUpload.cid}`); + + // Retrieve the file + const file = await getFile(fileUpload.cid); + logger.info('Retrieved file content:', file.data.toString()); + logger.info('File metadata:', file.metadata); + + // Get performance metrics + const metrics = getMetrics(); + logger.info('Database metrics:', { + operations: metrics.operations, + performance: { + averageOperationTime: `${metrics.performance.averageOperationTime?.toFixed(2)}ms`, + }, + cache: metrics.cacheStats, + }); + + // Unsubscribe from events + unsubscribe(); + logger.info('Unsubscribed from events'); + + // Delete a user + await remove('users', 'user3'); + logger.info('Deleted user user3'); + + return true; + } catch (error) { + if (error && typeof error === 'object' && 'code' in error) { + logger.error(`Database error (${error.code}):`, error.message); + } else { + logger.error('Error in advanced database example:', error); + } + return false; + } +} + +advancedDatabaseExample(); \ No newline at end of file diff --git a/examples/basic-node.ts b/examples/basic-node.ts index 33d011e..4dfcfc8 100644 --- a/examples/basic-node.ts +++ b/examples/basic-node.ts @@ -1,4 +1,6 @@ -import { initIpfs, initOrbitDB, logger, createServiceLogger } from '../src/index'; +import { init as initIpfs, stop as stopIpfs } from '../src/ipfs/ipfsService'; +import { init as initOrbitDB } from '../src/orbit/orbitDBService'; +import { createServiceLogger } from '../src/utils/logger'; const appLogger = createServiceLogger('APP'); @@ -11,11 +13,7 @@ async function startNode() { appLogger.info('IPFS node initialized'); // Initialize OrbitDB - const ipfsService = { - getHelia: () => ipfs, - }; - - const orbitdb = await initOrbitDB(ipfsService); + const orbitdb = await initOrbitDB(); appLogger.info('OrbitDB initialized'); // Create a test database @@ -40,7 +38,7 @@ async function startNode() { process.on('SIGINT', async () => { appLogger.info('Shutting down...'); await orbitdb.stop(); - await initIpfs.stop(); + await stopIpfs(); process.exit(0); }); } catch (error) { diff --git a/package.json b/package.json index 01f1e2c..877ee81 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "@chainsafe/libp2p-gossipsub": "^14.1.0", "@chainsafe/libp2p-noise": "^16.1.0", "@chainsafe/libp2p-yamux": "^7.0.1", + "@helia/unixfs": "^5.0.0", "@libp2p/bootstrap": "^11.0.32", "@libp2p/crypto": "^5.0.15", "@libp2p/identify": "^3.0.27", @@ -47,6 +48,8 @@ "express": "^5.1.0", "helia": "^5.3.0", "libp2p": "^2.8.2", + "multiformats": "^13.3.2", + "node-cache": "^5.1.2", "node-forge": "^1.3.1", "winston": "^3.17.0" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0eef703..43675fd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -17,6 +17,9 @@ importers: '@chainsafe/libp2p-yamux': specifier: ^7.0.1 version: 7.0.1 + '@helia/unixfs': + specifier: ^5.0.0 + version: 5.0.0 '@libp2p/bootstrap': specifier: ^11.0.32 version: 11.0.32 @@ -62,6 +65,12 @@ importers: libp2p: specifier: ^2.8.2 version: 2.8.2 + multiformats: + specifier: ^13.3.2 + version: 13.3.2 + node-cache: + specifier: ^5.1.2 + version: 5.1.2 node-forge: specifier: ^1.3.1 version: 1.3.1 @@ -118,6 +127,9 @@ packages: resolution: {integrity: sha512-30iZtAPgz+LTIYoeivqYo853f02jBYSd5uGnGpkFV0M3xOt9aN73erkgYAmZU43x4VfqcnLxW9Kpg3R5LC4YYw==} engines: {node: '>=6.0.0'} + '@assemblyscript/loader@0.9.4': + resolution: {integrity: sha512-HazVq9zwTVwGmqdwYzu7WyQ6FQVZ7SwET0KKQuKm55jD0IfUpZgN0OPIiZG3zV1iSrVYcN0bdwLRXI/VNCYsUA==} + '@babel/code-frame@7.26.2': resolution: {integrity: sha512-RJlIHRueQgwWitWgF8OdFYGZX328Ax5BCemNGlqHfplnRT9ESi8JkFlvaVYbS+UubVY6dpv87Fs2u5M29iNFVQ==} engines: {node: '>=6.9.0'} @@ -818,6 +830,9 @@ packages: '@helia/routers@3.0.1': resolution: {integrity: sha512-Eshr/8XJU4c0H8s1m5oBFB2YM0n3HBbxB3ny8DbsRFS8cAQ/L8ujnQomniMjZuuOhcNz8EEGwkUc07HCtAqAFA==} + '@helia/unixfs@5.0.0': + resolution: {integrity: sha512-wIv9Zf4vM7UN2A7jNiOa5rOfO1Hl/9AarKSFQeV09I1NflclSSu6EHaiNcH1K6LBhRJ36/w2RHXE8DK3+DK8hw==} + '@helia/utils@1.2.2': resolution: {integrity: sha512-f8TC+gTQkMTVPaSDB8sSV+8W5/QIMX9XNWY2Xf0Y/WVzGm+Nz5o5wpVTT1kgBpILngXHSs4Xo+6aBQlafL15EA==} @@ -1006,6 +1021,10 @@ packages: '@multiformats/multiaddr@12.4.0': resolution: {integrity: sha512-FL7yBTLijJ5JkO044BGb2msf+uJLrwpD6jD6TkXlbjA9N12+18HT40jvd4o5vL4LOJMc86dPX6tGtk/uI9kYKg==} + '@multiformats/murmur3@2.1.8': + resolution: {integrity: sha512-6vId1C46ra3R1sbJUOFCZnsUIveR9oF20yhPmAFxPm0JfrX3/ZRCgP3YDrBzlGoEppOXnA9czHeYc0T9mB6hbA==} + engines: {node: '>=16.0.0', npm: '>=7.0.0'} + '@multiformats/uri-to-multiaddr@8.1.0': resolution: {integrity: sha512-NHFqdKEwJ0A6JDXzC645Lgyw72zWhbM1QfaaD00ZYRrNvtx64p1bD9aIrWZIhLWZN87/lsV4QkJSNRF3Fd3ryw==} @@ -1408,6 +1427,9 @@ packages: bl@4.1.0: resolution: {integrity: sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==} + bl@5.1.0: + resolution: {integrity: sha512-tv1ZJHLfTDnXE6tMHv73YgSJaWR2AFuPwMntBe7XL/GBFHnT0CLnsHMogfk5+GzCDC5ZWarSCYaIGATZt9dNsQ==} + blockstore-core@5.0.2: resolution: {integrity: sha512-y7/BHdYLO3YCpJMg6Ue7b4Oz4FT1HWSZoHHdlsaJTsvoE8XieXb6kUCB9UkkUBDw2x4neRDwlgYBpyK77+Ro2Q==} @@ -1544,6 +1566,10 @@ packages: resolution: {integrity: sha512-ujdnoq2Kxb8s3ItNBtnYeXdm07FcU0u8ARAT1lQ2YdMwQC+cdiXX8KoqMVuglztILivceTtp4ivqGSmEmhBUJw==} engines: {node: '>=12'} + clone@2.1.2: + resolution: {integrity: sha512-3Pe/CF1Nn94hyhIYpjtiLhdCoEoz0DqQ+988E9gmeEdQZlojxnOb74wctFyuwWQHzqyf9X7C7MG8juUpqBJT8w==} + engines: {node: '>=0.8'} + color-convert@1.9.3: resolution: {integrity: sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg==} @@ -1987,6 +2013,9 @@ packages: graceful-fs@4.2.11: resolution: {integrity: sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==} + hamt-sharding@3.0.6: + resolution: {integrity: sha512-nZeamxfymIWLpVcAN0CRrb7uVq3hCOGj9IcL6NMA6VVCVWqj+h9Jo/SmaWuS92AEDf1thmHsM5D5c70hM3j2Tg==} + has-flag@4.0.0: resolution: {integrity: sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==} engines: {node: '>=8'} @@ -2088,6 +2117,15 @@ packages: resolution: {integrity: sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==} engines: {node: '>= 0.10'} + ipfs-unixfs-exporter@13.6.2: + resolution: {integrity: sha512-U3NkQHvQn5XzxtjSo1/GfoFIoXYY4hPgOlZG5RUrV5ScBI222b3jAHbHksXZuMy7sqPkA9ieeWdOmnG1+0nxyw==} + + ipfs-unixfs-importer@15.3.2: + resolution: {integrity: sha512-12FqAAAE3YC6AHtYxZ944nDCabmvbNLdhNCVIN5RJIOri82ss62XdX4lsLpex9VvPzDIJyTAsrKJPcwM6hXGdQ==} + + ipfs-unixfs@11.2.1: + resolution: {integrity: sha512-gUeeX63EFgiaMgcs0cUs2ZUPvlOeEZ38okjK8twdWGZX2jYd2rCk8k/TJ3DSRIDZ2t/aZMv6I23guxHaofZE3w==} + ipns@10.0.2: resolution: {integrity: sha512-tokCgz9X678zvHnAabVG91K64X7HnHdWOrop0ghUcXkzH5XNsmxHwVpqVATNqq/w62h7fRDhWURHU/WOfYmCpA==} @@ -2218,6 +2256,9 @@ packages: it-glob@3.0.2: resolution: {integrity: sha512-yw6am0buc9W6HThDhlf/0k9LpwK31p9Y3c0hpaoth9Iaha4Kog2oRlVanLGSrPPoh9yGwHJbs+KfBJt020N6/g==} + it-last@3.0.7: + resolution: {integrity: sha512-qG4BTveE6Wzsz5voqaOtZAfZgXTJT+yiaj45vp5S0Vi8oOdgKlRqUeolfvWoMCJ9vwSc/z9pAaNYIza7gA851w==} + it-length-prefixed-stream@1.2.1: resolution: {integrity: sha512-FYqlxc2toUoK+aPO5r3KDBIUG1mOvk2DzmjQcsfLUTHRWMJP4Va9855tVzg/22Bj+VUUaT7gxBg7HmbiCxTK4w==} @@ -2629,6 +2670,10 @@ packages: multiformats@13.3.2: resolution: {integrity: sha512-qbB0CQDt3QKfiAzZ5ZYjLFOs+zW43vA4uyM8g27PeEuXZybUOFyjrVdP93HPBHMoglibwfkdVwbzfUq8qGcH6g==} + murmurhash3js-revisited@3.0.0: + resolution: {integrity: sha512-/sF3ee6zvScXMb1XFJ8gDsSnY+X8PbOyjIuBhtgis10W2Jx4ZjIhikUCIF9c4gpJxVnQIsPAFrSwTCuAjicP6g==} + engines: {node: '>=8.0.0'} + nanoid@5.1.5: resolution: {integrity: sha512-Ir/+ZpE9fDsNH0hQ3C68uyThDXzYcim2EqcZ8zn8Chtt1iylPT9xXJB0kPCnqzgcEGikO9RxSrh63MsmVCU7Fw==} engines: {node: ^18 || >=20} @@ -2659,6 +2704,19 @@ packages: resolution: {integrity: sha512-c5XK0MjkGBrQPGYG24GBADZud0NCbznxNx0ZkS+ebUTrmV1qTDxPxSL8zEAPURXSbLRWVexxmP4986BziahL5w==} engines: {node: '>=10'} + node-cache@5.1.2: + resolution: {integrity: sha512-t1QzWwnk4sjLWaQAS8CHgOJ+RAfmHpxFWmc36IWTiWHQfs0w5JDMBS1b1ZxQteo0vVVuWJvIUKHDkkeK7vIGCg==} + engines: {node: '>= 8.0.0'} + + node-fetch@2.7.0: + resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==} + engines: {node: 4.x || >=6.0.0} + peerDependencies: + encoding: ^0.1.0 + peerDependenciesMeta: + encoding: + optional: true + node-forge@1.3.1: resolution: {integrity: sha512-dPEtOeMvF9VMcYV/1Wb8CPoVAXtp6MKMlcbAt4ddqmGqUJ6fQZFXkNZNkNlfevtNkGtaSoXf/vNNNSvgrdXwtA==} engines: {node: '>= 6.13.0'} @@ -2878,6 +2936,10 @@ packages: queue@6.0.2: resolution: {integrity: sha512-iHZWu+q3IdFZFX36ro/lKBkSvfkztY5Y7HMiPlOUjhupPcG2JMfst2KKEpu5XndviX/3UhFbRngUPNKtgvtZiA==} + rabin-wasm@0.1.5: + resolution: {integrity: sha512-uWgQTo7pim1Rnj5TuWcCewRDTf0PEFTSlaUjWP4eY9EbLV9em08v89oCz/WO+wRxpYuO36XEHp4wgYQnAgOHzA==} + hasBin: true + race-event@1.3.0: resolution: {integrity: sha512-kaLm7axfOnahIqD3jQ4l1e471FIFcEGebXEnhxyLscuUzV8C94xVHtWEqDDXxll7+yu/6lW0w1Ff4HbtvHvOHg==} @@ -3154,6 +3216,9 @@ packages: resolution: {integrity: sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==} engines: {node: '>=0.10.0'} + sparse-array@1.3.2: + resolution: {integrity: sha512-ZT711fePGn3+kQyLuv1fpd3rNSkNF8vd5Kv2D+qnOANeyKs3fx6bUMGWRPvgTTcYV64QMqZKZwcuaQSP3AZ0tg==} + sprintf-js@1.0.3: resolution: {integrity: sha512-D9cPgkvLlV3t3IzL0D0YLvGA9Ahk4PcvVwUbN0dSGr1aP0Nrt4AEnTUbuGvquEC0mA64Gqt1fzirlRs5ibXx8g==} @@ -3306,6 +3371,9 @@ packages: resolution: {integrity: sha512-FVDYdxtnj0G6Qm/DhNPSb8Ju59ULcup3tuJxkFb5K8Bv2pUXILbf0xZWU8PX8Ov19OXljbUyveOFwRMwkXzO+A==} engines: {node: '>=16'} + tr46@0.0.3: + resolution: {integrity: sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==} + triple-beam@1.4.1: resolution: {integrity: sha512-aZbgViZrg1QNcG+LULa7nhZpJTZSLm/mXnHXnbAbjmN5aSa0y7V+wvv6+4WaBtpISJzThKy+PIPxc1Nq1EJ9mg==} engines: {node: '>= 14.0.0'} @@ -3423,9 +3491,15 @@ packages: webcrypto-core@1.8.1: resolution: {integrity: sha512-P+x1MvlNCXlKbLSOY4cYrdreqPG5hbzkmawbcXLKN/mf6DZW0SdNNkZ+sjwsqVkI4A4Ko2sPZmkZtCKY58w83A==} + webidl-conversions@3.0.1: + resolution: {integrity: sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==} + whatwg-fetch@3.6.20: resolution: {integrity: sha512-EqhiFU6daOA8kpjOWTL0olhVOF3i7OrFzSYiGsEMB8GcXS+RrzauAERX65xMeNWVqxA6HXH2m69Z9LaKKdisfg==} + whatwg-url@5.0.0: + resolution: {integrity: sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==} + wherearewe@2.0.1: resolution: {integrity: sha512-XUguZbDxCA2wBn2LoFtcEhXL6AXo+hVjGonwhSTTTU9SzbWG8Xu3onNIpzf9j/mYUcJQ0f+m37SzG77G851uFw==} engines: {node: '>=16.0.0', npm: '>=7.0.0'} @@ -3559,6 +3633,8 @@ snapshots: '@jridgewell/gen-mapping': 0.3.8 '@jridgewell/trace-mapping': 0.3.25 + '@assemblyscript/loader@0.9.4': {} + '@babel/code-frame@7.26.2': dependencies: '@babel/helper-validator-identifier': 7.25.9 @@ -4529,6 +4605,33 @@ snapshots: multiformats: 13.3.2 uint8arrays: 5.1.0 + '@helia/unixfs@5.0.0': + dependencies: + '@helia/interface': 5.2.1 + '@ipld/dag-pb': 4.1.3 + '@libp2p/interface': 2.7.0 + '@libp2p/logger': 5.1.13 + '@libp2p/utils': 6.6.0 + '@multiformats/murmur3': 2.1.8 + hamt-sharding: 3.0.6 + interface-blockstore: 5.3.1 + ipfs-unixfs: 11.2.1 + ipfs-unixfs-exporter: 13.6.2 + ipfs-unixfs-importer: 15.3.2 + it-all: 3.0.7 + it-first: 3.0.7 + it-glob: 3.0.2 + it-last: 3.0.7 + it-pipe: 3.0.1 + merge-options: 3.0.4 + multiformats: 13.3.2 + progress-events: 1.0.1 + sparse-array: 1.3.2 + uint8arrays: 5.1.0 + transitivePeerDependencies: + - encoding + - supports-color + '@helia/utils@1.2.2': dependencies: '@helia/interface': 5.2.1 @@ -5128,6 +5231,11 @@ snapshots: uint8-varint: 2.0.4 uint8arrays: 5.1.0 + '@multiformats/murmur3@2.1.8': + dependencies: + multiformats: 13.3.2 + murmurhash3js-revisited: 3.0.0 + '@multiformats/uri-to-multiaddr@8.1.0': dependencies: '@multiformats/multiaddr': 12.4.0 @@ -5742,6 +5850,12 @@ snapshots: inherits: 2.0.4 readable-stream: 3.6.2 + bl@5.1.0: + dependencies: + buffer: 6.0.3 + inherits: 2.0.4 + readable-stream: 3.6.2 + blockstore-core@5.0.2: dependencies: '@libp2p/logger': 5.1.13 @@ -5920,6 +6034,8 @@ snapshots: dependencies: is-regexp: 3.1.0 + clone@2.1.2: {} + color-convert@1.9.3: dependencies: color-name: 1.1.3 @@ -6351,6 +6467,11 @@ snapshots: graceful-fs@4.2.11: {} + hamt-sharding@3.0.6: + dependencies: + sparse-array: 1.3.2 + uint8arrays: 5.1.0 + has-flag@4.0.0: {} has-symbols@1.1.0: {} @@ -6477,6 +6598,51 @@ snapshots: ipaddr.js@1.9.1: {} + ipfs-unixfs-exporter@13.6.2: + dependencies: + '@ipld/dag-cbor': 9.2.2 + '@ipld/dag-json': 10.2.3 + '@ipld/dag-pb': 4.1.3 + '@multiformats/murmur3': 2.1.8 + hamt-sharding: 3.0.6 + interface-blockstore: 5.3.1 + ipfs-unixfs: 11.2.1 + it-filter: 3.1.2 + it-last: 3.0.7 + it-map: 3.1.2 + it-parallel: 3.0.9 + it-pipe: 3.0.1 + it-pushable: 3.2.3 + multiformats: 13.3.2 + p-queue: 8.1.0 + progress-events: 1.0.1 + + ipfs-unixfs-importer@15.3.2: + dependencies: + '@ipld/dag-pb': 4.1.3 + '@multiformats/murmur3': 2.1.8 + hamt-sharding: 3.0.6 + interface-blockstore: 5.3.1 + interface-store: 6.0.2 + ipfs-unixfs: 11.2.1 + it-all: 3.0.7 + it-batch: 3.0.7 + it-first: 3.0.7 + it-parallel-batch: 3.0.7 + multiformats: 13.3.2 + progress-events: 1.0.1 + rabin-wasm: 0.1.5 + uint8arraylist: 2.4.8 + uint8arrays: 5.1.0 + transitivePeerDependencies: + - encoding + - supports-color + + ipfs-unixfs@11.2.1: + dependencies: + protons-runtime: 5.5.0 + uint8arraylist: 2.4.8 + ipns@10.0.2: dependencies: '@libp2p/crypto': 5.0.15 @@ -6591,6 +6757,8 @@ snapshots: dependencies: fast-glob: 3.3.3 + it-last@3.0.7: {} + it-length-prefixed-stream@1.2.1: dependencies: it-byte-stream: 1.1.1 @@ -7212,6 +7380,8 @@ snapshots: multiformats@13.3.2: {} + murmurhash3js-revisited@3.0.0: {} + nanoid@5.1.5: {} napi-build-utils@2.0.0: {} @@ -7230,6 +7400,14 @@ snapshots: dependencies: semver: 7.7.1 + node-cache@5.1.2: + dependencies: + clone: 2.1.2 + + node-fetch@2.7.0: + dependencies: + whatwg-url: 5.0.0 + node-forge@1.3.1: {} node-gyp-build@4.8.4: {} @@ -7425,6 +7603,18 @@ snapshots: dependencies: inherits: 2.0.4 + rabin-wasm@0.1.5: + dependencies: + '@assemblyscript/loader': 0.9.4 + bl: 5.1.0 + debug: 4.4.0 + minimist: 1.2.8 + node-fetch: 2.7.0 + readable-stream: 3.6.2 + transitivePeerDependencies: + - encoding + - supports-color + race-event@1.3.0: {} race-signal@1.1.3: {} @@ -7773,6 +7963,8 @@ snapshots: source-map@0.6.1: {} + sparse-array@1.3.2: {} + sprintf-js@1.0.3: {} stack-trace@0.0.10: {} @@ -7917,6 +8109,8 @@ snapshots: dependencies: tldts: 6.1.85 + tr46@0.0.3: {} + triple-beam@1.4.1: {} truncate-utf8-bytes@1.0.2: @@ -8023,8 +8217,15 @@ snapshots: pvtsutils: 1.3.6 tslib: 2.8.1 + webidl-conversions@3.0.1: {} + whatwg-fetch@3.6.20: {} + whatwg-url@5.0.0: + dependencies: + tr46: 0.0.3 + webidl-conversions: 3.0.1 + wherearewe@2.0.1: dependencies: is-electron: 2.2.2 diff --git a/src/db/cache/cacheService.ts b/src/db/cache/cacheService.ts new file mode 100644 index 0000000..ee59eea --- /dev/null +++ b/src/db/cache/cacheService.ts @@ -0,0 +1,62 @@ +import NodeCache from 'node-cache'; +import { createServiceLogger } from '../../utils/logger'; + +const logger = createServiceLogger('DB_CACHE'); + +// Cache for frequently accessed documents +const cache = new NodeCache({ + stdTTL: 300, // 5 minutes default TTL + checkperiod: 60, // Check for expired items every 60 seconds + useClones: false, // Don't clone objects (for performance) +}); + +// Cache statistics +export const cacheStats = { + hits: 0, + misses: 0, +}; + +/** + * Get an item from cache + */ +export const get = (key: string): T | undefined => { + const value = cache.get(key); + if (value !== undefined) { + cacheStats.hits++; + return value; + } + cacheStats.misses++; + return undefined; +}; + +/** + * Set an item in cache + */ +export const set = (key: string, value: T, ttl?: number): boolean => { + if (ttl === undefined) { + return cache.set(key, value); + } + return cache.set(key, value, ttl); +}; + +/** + * Delete an item from cache + */ +export const del = (key: string | string[]): number => { + return cache.del(key); +}; + +/** + * Flush the entire cache + */ +export const flushAll = (): void => { + cache.flushAll(); +}; + +/** + * Reset cache statistics + */ +export const resetStats = (): void => { + cacheStats.hits = 0; + cacheStats.misses = 0; +}; \ No newline at end of file diff --git a/src/db/core/connection.ts b/src/db/core/connection.ts new file mode 100644 index 0000000..7415fb0 --- /dev/null +++ b/src/db/core/connection.ts @@ -0,0 +1,138 @@ +import { createServiceLogger } from '../../utils/logger'; +import { init as initIpfs, stop as stopIpfs } from '../../ipfs/ipfsService'; +import { init as initOrbitDB } from '../../orbit/orbitDBService'; +import { DBConnection, ErrorCode } from '../types'; +import { DBError } from './error'; + +const logger = createServiceLogger('DB_CONNECTION'); + +// Connection pool of database instances +const connections = new Map(); +let defaultConnectionId: string | null = null; + +/** + * Initialize the database service + * This abstracts away OrbitDB and IPFS from the end user + */ +export const init = async (connectionId?: string): Promise => { + try { + const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + logger.info(`Initializing DB service with connection ID: ${connId}`); + + // Initialize IPFS + const ipfsInstance = await initIpfs(); + + // Initialize OrbitDB + const orbitdbInstance = await initOrbitDB(); + + // Store connection in pool + connections.set(connId, { + ipfs: ipfsInstance, + orbitdb: orbitdbInstance, + timestamp: Date.now(), + isActive: true, + }); + + // Set as default if no default exists + if (!defaultConnectionId) { + defaultConnectionId = connId; + } + + logger.info(`DB service initialized successfully with connection ID: ${connId}`); + return connId; + } catch (error) { + logger.error('Failed to initialize DB service:', error); + throw new DBError(ErrorCode.INITIALIZATION_FAILED, 'Failed to initialize database service', error); + } +}; + +/** + * Get the active connection + */ +export const getConnection = (connectionId?: string): DBConnection => { + const connId = connectionId || defaultConnectionId; + + if (!connId || !connections.has(connId)) { + throw new DBError( + ErrorCode.NOT_INITIALIZED, + `No active database connection found${connectionId ? ` for ID: ${connectionId}` : ''}` + ); + } + + const connection = connections.get(connId)!; + + if (!connection.isActive) { + throw new DBError( + ErrorCode.CONNECTION_ERROR, + `Connection ${connId} is no longer active` + ); + } + + return connection; +}; + +/** + * Close a specific database connection + */ +export const closeConnection = async (connectionId: string): Promise => { + if (!connections.has(connectionId)) { + return false; + } + + try { + const connection = connections.get(connectionId)!; + + // Stop OrbitDB + if (connection.orbitdb) { + await connection.orbitdb.stop(); + } + + // Mark connection as inactive + connection.isActive = false; + + // If this was the default connection, clear it + if (defaultConnectionId === connectionId) { + defaultConnectionId = null; + + // Try to find another active connection to be the default + for (const [id, conn] of connections.entries()) { + if (conn.isActive) { + defaultConnectionId = id; + break; + } + } + } + + logger.info(`Closed database connection: ${connectionId}`); + return true; + } catch (error) { + logger.error(`Error closing connection ${connectionId}:`, error); + return false; + } +}; + +/** + * Stop all database connections + */ +export const stop = async (): Promise => { + try { + // Close all connections + for (const [id, connection] of connections.entries()) { + if (connection.isActive) { + await closeConnection(id); + } + } + + // Stop IPFS if needed + const ipfs = connections.get(defaultConnectionId || '')?.ipfs; + if (ipfs) { + await stopIpfs(); + } + + defaultConnectionId = null; + logger.info('All DB connections stopped successfully'); + } catch (error) { + logger.error('Error stopping DB connections:', error); + throw error; + } +}; \ No newline at end of file diff --git a/src/db/core/error.ts b/src/db/core/error.ts new file mode 100644 index 0000000..a7b2101 --- /dev/null +++ b/src/db/core/error.ts @@ -0,0 +1,18 @@ +import { ErrorCode } from '../types'; + +// Re-export error code for easier access +export { ErrorCode }; + + +// Custom error class with error codes +export class DBError extends Error { + code: ErrorCode; + details?: any; + + constructor(code: ErrorCode, message: string, details?: any) { + super(message); + this.name = 'DBError'; + this.code = code; + this.details = details; + } +} \ No newline at end of file diff --git a/src/db/dbService.ts b/src/db/dbService.ts new file mode 100644 index 0000000..41bedd8 --- /dev/null +++ b/src/db/dbService.ts @@ -0,0 +1,212 @@ +import { createServiceLogger } from '../utils/logger'; +import { init, getConnection, closeConnection, stop } from './core/connection'; +import { defineSchema, validateDocument } from './schema/validator'; +import * as cache from './cache/cacheService'; +import * as events from './events/eventService'; +import { getMetrics, resetMetrics } from './metrics/metricsService'; +import { Transaction } from './transactions/transactionService'; +import { StoreType, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions, ErrorCode } from './types'; +import { DBError } from './core/error'; +import { getStore } from './stores/storeFactory'; +import { uploadFile, getFile, deleteFile } from './stores/fileStore'; + +// Re-export imported functions +export { init, closeConnection, stop, defineSchema, getMetrics, resetMetrics, uploadFile, getFile, deleteFile }; + +const logger = createServiceLogger('DB_SERVICE'); + +/** + * Create a new transaction for batching operations + */ +export const createTransaction = (connectionId?: string): Transaction => { + return new Transaction(connectionId); +}; + +/** + * Execute all operations in a transaction + */ +export const commitTransaction = async (transaction: Transaction): Promise<{ success: boolean; results: any[] }> => { + try { + // Validate that we have operations + const operations = transaction.getOperations(); + if (operations.length === 0) { + return { success: true, results: [] }; + } + + const connectionId = transaction.getConnectionId(); + const results = []; + + // Execute all operations + for (const operation of operations) { + let result; + + switch (operation.type) { + case 'create': + result = await create( + operation.collection, + operation.id, + operation.data, + { connectionId } + ); + break; + + case 'update': + result = await update( + operation.collection, + operation.id, + operation.data, + { connectionId } + ); + break; + + case 'delete': + result = await remove( + operation.collection, + operation.id, + { connectionId } + ); + break; + } + + results.push(result); + } + + return { success: true, results }; + } catch (error) { + logger.error('Transaction failed:', error); + throw new DBError(ErrorCode.TRANSACTION_FAILED, 'Failed to commit transaction', error); + } +}; + +/** + * Create a new document in the specified collection using the appropriate store + */ +export const create = async >( + collection: string, + id: string, + data: Omit, + options?: { connectionId?: string, storeType?: StoreType } +): Promise => { + const storeType = options?.storeType || StoreType.KEYVALUE; + const store = getStore(storeType); + return store.create(collection, id, data, { connectionId: options?.connectionId }); +}; + +/** + * Get a document by ID from a collection + */ +export const get = async >( + collection: string, + id: string, + options?: { connectionId?: string; skipCache?: boolean, storeType?: StoreType } +): Promise => { + const storeType = options?.storeType || StoreType.KEYVALUE; + const store = getStore(storeType); + return store.get(collection, id, options); +}; + +/** + * Update a document in a collection + */ +export const update = async >( + collection: string, + id: string, + data: Partial>, + options?: { connectionId?: string; upsert?: boolean, storeType?: StoreType } +): Promise => { + const storeType = options?.storeType || StoreType.KEYVALUE; + const store = getStore(storeType); + return store.update(collection, id, data, options); +}; + +/** + * Delete a document from a collection + */ +export const remove = async ( + collection: string, + id: string, + options?: { connectionId?: string, storeType?: StoreType } +): Promise => { + const storeType = options?.storeType || StoreType.KEYVALUE; + const store = getStore(storeType); + return store.remove(collection, id, options); +}; + +/** + * List all documents in a collection with pagination + */ +export const list = async >( + collection: string, + options?: ListOptions & { storeType?: StoreType } +): Promise> => { + const storeType = options?.storeType || StoreType.KEYVALUE; + const store = getStore(storeType); + + // Remove storeType from options + const { storeType: _, ...storeOptions } = options || {}; + return store.list(collection, storeOptions); +}; + +/** + * Query documents in a collection with filtering and pagination + */ +export const query = async >( + collection: string, + filter: (doc: T) => boolean, + options?: QueryOptions & { storeType?: StoreType } +): Promise> => { + const storeType = options?.storeType || StoreType.KEYVALUE; + const store = getStore(storeType); + + // Remove storeType from options + const { storeType: _, ...storeOptions } = options || {}; + return store.query(collection, filter, storeOptions); +}; + +/** + * Create an index for a collection to speed up queries + */ +export const createIndex = async ( + collection: string, + field: string, + options?: { connectionId?: string, storeType?: StoreType } +): Promise => { + const storeType = options?.storeType || StoreType.KEYVALUE; + const store = getStore(storeType); + return store.createIndex(collection, field, { connectionId: options?.connectionId }); +}; + +/** + * Subscribe to database events + */ +export const subscribe = events.subscribe; + +// Re-export error types and codes +export { DBError } from './core/error'; +export { ErrorCode } from './types'; + +// Export store types +export { StoreType } from './types'; + +export default { + init, + create, + get, + update, + remove, + list, + query, + createIndex, + createTransaction, + commitTransaction, + subscribe, + uploadFile, + getFile, + deleteFile, + defineSchema, + getMetrics, + resetMetrics, + closeConnection, + stop, + StoreType +}; \ No newline at end of file diff --git a/src/db/events/eventService.ts b/src/db/events/eventService.ts new file mode 100644 index 0000000..ff96eae --- /dev/null +++ b/src/db/events/eventService.ts @@ -0,0 +1,36 @@ +import { dbEvents } from '../types'; + +// Event types +type DBEventType = 'document:created' | 'document:updated' | 'document:deleted'; + +/** + * Subscribe to database events + */ +export const subscribe = ( + event: DBEventType, + callback: (data: any) => void +): () => void => { + dbEvents.on(event, callback); + + // Return unsubscribe function + return () => { + dbEvents.off(event, callback); + }; +}; + +/** + * Emit an event + */ +export const emit = ( + event: DBEventType, + data: any +): void => { + dbEvents.emit(event, data); +}; + +/** + * Remove all event listeners + */ +export const removeAllListeners = (): void => { + dbEvents.removeAllListeners(); +}; \ No newline at end of file diff --git a/src/db/metrics/metricsService.ts b/src/db/metrics/metricsService.ts new file mode 100644 index 0000000..432f377 --- /dev/null +++ b/src/db/metrics/metricsService.ts @@ -0,0 +1,101 @@ +import { Metrics, ErrorCode } from '../types'; +import { DBError } from '../core/error'; +import * as cacheService from '../cache/cacheService'; + +// Metrics tracking +const metrics: Metrics = { + operations: { + creates: 0, + reads: 0, + updates: 0, + deletes: 0, + queries: 0, + fileUploads: 0, + fileDownloads: 0, + }, + performance: { + totalOperationTime: 0, + operationCount: 0, + }, + errors: { + count: 0, + byCode: {}, + }, + cacheStats: { + hits: 0, + misses: 0, + }, + startTime: Date.now(), +}; + +/** + * Measure performance of a database operation + */ +export async function measurePerformance(operation: () => Promise): Promise { + const startTime = performance.now(); + try { + const result = await operation(); + const endTime = performance.now(); + metrics.performance.totalOperationTime += (endTime - startTime); + metrics.performance.operationCount++; + return result; + } catch (error) { + const endTime = performance.now(); + metrics.performance.totalOperationTime += (endTime - startTime); + metrics.performance.operationCount++; + + // Track error metrics + metrics.errors.count++; + if (error instanceof DBError) { + metrics.errors.byCode[error.code] = (metrics.errors.byCode[error.code] || 0) + 1; + } + + throw error; + } +} + +/** + * Get database metrics + */ +export const getMetrics = (): Metrics => { + return { + ...metrics, + // Sync cache stats + cacheStats: cacheService.cacheStats, + // Calculate some derived metrics + performance: { + ...metrics.performance, + averageOperationTime: metrics.performance.operationCount > 0 ? + metrics.performance.totalOperationTime / metrics.performance.operationCount : + 0 + } + }; +}; + +/** + * Reset metrics (useful for testing) + */ +export const resetMetrics = (): void => { + metrics.operations = { + creates: 0, + reads: 0, + updates: 0, + deletes: 0, + queries: 0, + fileUploads: 0, + fileDownloads: 0, + }; + metrics.performance = { + totalOperationTime: 0, + operationCount: 0, + }; + metrics.errors = { + count: 0, + byCode: {}, + }; + + // Reset cache stats too + cacheService.resetStats(); + + metrics.startTime = Date.now(); +}; \ No newline at end of file diff --git a/src/db/schema/validator.ts b/src/db/schema/validator.ts new file mode 100644 index 0000000..6ac95fe --- /dev/null +++ b/src/db/schema/validator.ts @@ -0,0 +1,225 @@ +import { createServiceLogger } from '../../utils/logger'; +import { CollectionSchema, ErrorCode } from '../types'; +import { DBError } from '../core/error'; + +const logger = createServiceLogger('DB_SCHEMA'); + +// Store collection schemas +const schemas = new Map(); + +/** + * Define a schema for a collection + */ +export const defineSchema = (collection: string, schema: CollectionSchema): void => { + schemas.set(collection, schema); + logger.info(`Schema defined for collection: ${collection}`); +}; + +/** + * Validate a document against its schema + */ +export const validateDocument = (collection: string, document: any): boolean => { + const schema = schemas.get(collection); + + if (!schema) { + return true; // No schema defined, so validation passes + } + + // Check required fields + if (schema.required) { + for (const field of schema.required) { + if (document[field] === undefined) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Required field '${field}' is missing`, + { collection, document } + ); + } + } + } + + // Validate properties + for (const [field, definition] of Object.entries(schema.properties)) { + const value = document[field]; + + // Skip undefined optional fields + if (value === undefined) { + if (definition.required) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Required field '${field}' is missing`, + { collection, document } + ); + } + continue; + } + + // Type validation + switch (definition.type) { + case 'string': + if (typeof value !== 'string') { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must be a string`, + { collection, field, value } + ); + } + + // Pattern validation + if (definition.pattern && !new RegExp(definition.pattern).test(value)) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' does not match pattern: ${definition.pattern}`, + { collection, field, value } + ); + } + + // Length validation + if (definition.min !== undefined && value.length < definition.min) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must have at least ${definition.min} characters`, + { collection, field, value } + ); + } + + if (definition.max !== undefined && value.length > definition.max) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must have at most ${definition.max} characters`, + { collection, field, value } + ); + } + break; + + case 'number': + if (typeof value !== 'number') { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must be a number`, + { collection, field, value } + ); + } + + // Range validation + if (definition.min !== undefined && value < definition.min) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must be at least ${definition.min}`, + { collection, field, value } + ); + } + + if (definition.max !== undefined && value > definition.max) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must be at most ${definition.max}`, + { collection, field, value } + ); + } + break; + + case 'boolean': + if (typeof value !== 'boolean') { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must be a boolean`, + { collection, field, value } + ); + } + break; + + case 'array': + if (!Array.isArray(value)) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must be an array`, + { collection, field, value } + ); + } + + // Length validation + if (definition.min !== undefined && value.length < definition.min) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must have at least ${definition.min} items`, + { collection, field, value } + ); + } + + if (definition.max !== undefined && value.length > definition.max) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must have at most ${definition.max} items`, + { collection, field, value } + ); + } + + // Validate array items if item schema is defined + if (definition.items && value.length > 0) { + for (let i = 0; i < value.length; i++) { + const item = value[i]; + + // This is a simplified item validation + // In a real implementation, this would recursively validate complex objects + switch (definition.items.type) { + case 'string': + if (typeof item !== 'string') { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Item at index ${i} in field '${field}' must be a string`, + { collection, field, item } + ); + } + break; + + case 'number': + if (typeof item !== 'number') { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Item at index ${i} in field '${field}' must be a number`, + { collection, field, item } + ); + } + break; + + case 'boolean': + if (typeof item !== 'boolean') { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Item at index ${i} in field '${field}' must be a boolean`, + { collection, field, item } + ); + } + break; + } + } + } + break; + + case 'object': + if (typeof value !== 'object' || value === null || Array.isArray(value)) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must be an object`, + { collection, field, value } + ); + } + + // Nested object validation would go here in a real implementation + break; + + case 'enum': + if (definition.enum && !definition.enum.includes(value)) { + throw new DBError( + ErrorCode.INVALID_SCHEMA, + `Field '${field}' must be one of: ${definition.enum.join(', ')}`, + { collection, field, value } + ); + } + break; + } + } + + return true; +}; \ No newline at end of file diff --git a/src/db/stores/baseStore.ts b/src/db/stores/baseStore.ts new file mode 100644 index 0000000..52648a4 --- /dev/null +++ b/src/db/stores/baseStore.ts @@ -0,0 +1,132 @@ +import { createServiceLogger } from '../../utils/logger'; +import { openDB } from '../../orbit/orbitDBService'; +import { getConnection } from '../core/connection'; +import * as cache from '../cache/cacheService'; +import * as events from '../events/eventService'; +import { validateDocument } from '../schema/validator'; +import { measurePerformance } from '../metrics/metricsService'; +import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types'; +import { DBError } from '../core/error'; + +const logger = createServiceLogger('DB_STORE'); + +/** + * Base Store interface that all store implementations should extend + */ +export interface BaseStore { + /** + * Create a new document + */ + create>( + collection: string, + id: string, + data: Omit, + options?: StoreOptions + ): Promise; + + /** + * Get a document by ID + */ + get>( + collection: string, + id: string, + options?: StoreOptions & { skipCache?: boolean } + ): Promise; + + /** + * Update a document + */ + update>( + collection: string, + id: string, + data: Partial>, + options?: StoreOptions & { upsert?: boolean } + ): Promise; + + /** + * Delete a document + */ + remove( + collection: string, + id: string, + options?: StoreOptions + ): Promise; + + /** + * List all documents in a collection with pagination + */ + list>( + collection: string, + options?: ListOptions + ): Promise>; + + /** + * Query documents in a collection with filtering and pagination + */ + query>( + collection: string, + filter: (doc: T) => boolean, + options?: QueryOptions + ): Promise>; + + /** + * Create an index for a collection to speed up queries + */ + createIndex( + collection: string, + field: string, + options?: StoreOptions + ): Promise; +} + +/** + * Open a store of the specified type + */ +export async function openStore( + collection: string, + storeType: StoreType, + options?: StoreOptions +): Promise { + try { + const connection = getConnection(options?.connectionId); + return await openDB(collection, storeType); + } catch (error) { + logger.error(`Error opening ${storeType} store for collection ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to open ${storeType} store for collection ${collection}`, error); + } +} + +/** + * Helper function to prepare a document for storage + */ +export function prepareDocument>( + collection: string, + data: Omit, + existingDoc?: T | null +): T { + const timestamp = Date.now(); + + // If it's an update to an existing document + if (existingDoc) { + const doc = { + ...existingDoc, + ...data, + updatedAt: timestamp + } as T; + + // Validate the document against its schema + validateDocument(collection, doc); + return doc; + } + + // Otherwise it's a new document + const doc = { + ...data, + createdAt: timestamp, + updatedAt: timestamp + } as unknown as T; + + // Validate the document against its schema + validateDocument(collection, doc); + return doc; +} \ No newline at end of file diff --git a/src/db/stores/counterStore.ts b/src/db/stores/counterStore.ts new file mode 100644 index 0000000..4ecb2d1 --- /dev/null +++ b/src/db/stores/counterStore.ts @@ -0,0 +1,331 @@ +import { createServiceLogger } from '../../utils/logger'; +import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types'; +import { DBError } from '../core/error'; +import { BaseStore, openStore } from './baseStore'; +import * as cache from '../cache/cacheService'; +import * as events from '../events/eventService'; +import { measurePerformance } from '../metrics/metricsService'; + +const logger = createServiceLogger('COUNTER_STORE'); + +/** + * CounterStore implementation + * Uses OrbitDB's counter store for simple numeric counters + */ +export class CounterStore implements BaseStore { + /** + * Create or set counter value + */ + async create>( + collection: string, + id: string, + data: Omit, + options?: StoreOptions + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.COUNTER, options); + + // Extract value from data, default to 0 + const value = typeof data === 'object' && data !== null && 'value' in data ? + Number(data.value) : 0; + + // Set the counter value + const hash = await db.set(value); + + // Construct document representation + const document = { + id, + value, + createdAt: Date.now(), + updatedAt: Date.now() + }; + + // Add to cache + const cacheKey = `${collection}:${id}`; + cache.set(cacheKey, document); + + // Emit change event + events.emit('document:created', { collection, id, document }); + + logger.info(`Set counter in ${collection} to ${value}`); + return { id, hash }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error setting counter in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to set counter in ${collection}`, error); + } + }); + } + + /** + * Get counter value + */ + async get>( + collection: string, + id: string, + options?: StoreOptions & { skipCache?: boolean } + ): Promise { + return measurePerformance(async () => { + try { + // Note: for counters, id is not used in the underlying store (there's only one counter per db) + // but we use it for consistency with the API + + // Check cache first if not skipped + const cacheKey = `${collection}:${id}`; + if (!options?.skipCache) { + const cachedDocument = cache.get(cacheKey); + if (cachedDocument) { + return cachedDocument; + } + } + + const db = await openStore(collection, StoreType.COUNTER, options); + + // Get the counter value + const value = await db.value(); + + // Construct document representation + const document = { + id, + value, + updatedAt: Date.now() + } as unknown as T; + + // Update cache + cache.set(cacheKey, document); + + return document; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error getting counter from ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get counter from ${collection}`, error); + } + }); + } + + /** + * Update counter (increment/decrement) + */ + async update>( + collection: string, + id: string, + data: Partial>, + options?: StoreOptions & { upsert?: boolean } + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.COUNTER, options); + + // Get current value before update + const currentValue = await db.value(); + + // Extract value from data + let value: number; + let operation: 'increment' | 'decrement' | 'set' = 'set'; + + // Check what kind of operation we're doing + if (typeof data === 'object' && data !== null) { + if ('increment' in data) { + value = Number(data.increment); + operation = 'increment'; + } else if ('decrement' in data) { + value = Number(data.decrement); + operation = 'decrement'; + } else if ('value' in data) { + value = Number(data.value); + operation = 'set'; + } else { + value = 0; + operation = 'set'; + } + } else { + value = 0; + operation = 'set'; + } + + // Update the counter + let hash; + let newValue; + + switch (operation) { + case 'increment': + hash = await db.inc(value); + newValue = currentValue + value; + break; + case 'decrement': + hash = await db.inc(-value); // Counter store uses inc with negative value + newValue = currentValue - value; + break; + case 'set': + hash = await db.set(value); + newValue = value; + break; + } + + // Construct document representation + const document = { + id, + value: newValue, + updatedAt: Date.now() + }; + + // Update cache + const cacheKey = `${collection}:${id}`; + cache.set(cacheKey, document); + + // Emit change event + events.emit('document:updated', { + collection, + id, + document, + previous: { id, value: currentValue } + }); + + logger.info(`Updated counter in ${collection} from ${currentValue} to ${newValue}`); + return { id, hash }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error updating counter in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update counter in ${collection}`, error); + } + }); + } + + /** + * Delete/reset counter + */ + async remove( + collection: string, + id: string, + options?: StoreOptions + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.COUNTER, options); + + // Get the current value for the event + const currentValue = await db.value(); + + // Reset the counter to 0 (counters can't be truly deleted) + await db.set(0); + + // Remove from cache + const cacheKey = `${collection}:${id}`; + cache.del(cacheKey); + + // Emit change event + events.emit('document:deleted', { + collection, + id, + document: { id, value: currentValue } + }); + + logger.info(`Reset counter in ${collection} from ${currentValue} to 0`); + return true; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error resetting counter in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to reset counter in ${collection}`, error); + } + }); + } + + /** + * List all counters (for counter stores, there's only one counter per db) + */ + async list>( + collection: string, + options?: ListOptions + ): Promise> { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.COUNTER, options); + const value = await db.value(); + + // For counter stores, we just return one document with the counter value + const document = { + id: '0', // Default ID since counters don't have IDs + value, + updatedAt: Date.now() + } as unknown as T; + + return { + documents: [document], + total: 1, + hasMore: false + }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error listing counter in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list counter in ${collection}`, error); + } + }); + } + + /** + * Query is not applicable for counter stores, but we implement for API consistency + */ + async query>( + collection: string, + filter: (doc: T) => boolean, + options?: QueryOptions + ): Promise> { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.COUNTER, options); + const value = await db.value(); + + // Create document + const document = { + id: '0', // Default ID since counters don't have IDs + value, + updatedAt: Date.now() + } as unknown as T; + + // Apply filter + const documents = filter(document) ? [document] : []; + + return { + documents, + total: documents.length, + hasMore: false + }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error querying counter in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query counter in ${collection}`, error); + } + }); + } + + /** + * Create an index - not applicable for counter stores + */ + async createIndex( + collection: string, + field: string, + options?: StoreOptions + ): Promise { + logger.warn(`Index creation not supported for counter collections, ignoring request for ${collection}`); + return false; + } +} \ No newline at end of file diff --git a/src/db/stores/docStore.ts b/src/db/stores/docStore.ts new file mode 100644 index 0000000..129ac0f --- /dev/null +++ b/src/db/stores/docStore.ts @@ -0,0 +1,350 @@ +import { createServiceLogger } from '../../utils/logger'; +import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types'; +import { DBError } from '../core/error'; +import { BaseStore, openStore, prepareDocument } from './baseStore'; +import * as cache from '../cache/cacheService'; +import * as events from '../events/eventService'; +import { measurePerformance } from '../metrics/metricsService'; + +const logger = createServiceLogger('DOCSTORE'); + +/** + * DocStore implementation + * Uses OrbitDB's document store which allows for more complex document storage with indices + */ +export class DocStore implements BaseStore { + /** + * Create a new document in the specified collection + */ + async create>( + collection: string, + id: string, + data: Omit, + options?: StoreOptions + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.DOCSTORE, options); + + // Prepare document for storage (including _id which is required for docstore) + const document = { + _id: id, + ...prepareDocument(collection, data) + }; + + // Add to database + const hash = await db.put(document); + + // Add to cache + const cacheKey = `${collection}:${id}`; + cache.set(cacheKey, document); + + // Emit change event + events.emit('document:created', { collection, id, document }); + + logger.info(`Created document in ${collection} with id ${id}`); + return { id, hash }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error creating document in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create document in ${collection}`, error); + } + }); + } + + /** + * Get a document by ID from a collection + */ + async get>( + collection: string, + id: string, + options?: StoreOptions & { skipCache?: boolean } + ): Promise { + return measurePerformance(async () => { + try { + // Check cache first if not skipped + const cacheKey = `${collection}:${id}`; + if (!options?.skipCache) { + const cachedDocument = cache.get(cacheKey); + if (cachedDocument) { + return cachedDocument; + } + } + + const db = await openStore(collection, StoreType.DOCSTORE, options); + const document = await db.get(id) as T | null; + + // Update cache if document exists + if (document) { + cache.set(cacheKey, document); + } + + return document; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error getting document ${id} from ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get document ${id} from ${collection}`, error); + } + }); + } + + /** + * Update a document in a collection + */ + async update>( + collection: string, + id: string, + data: Partial>, + options?: StoreOptions & { upsert?: boolean } + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.DOCSTORE, options); + const existing = await db.get(id) as T | null; + + if (!existing && !options?.upsert) { + throw new DBError( + ErrorCode.DOCUMENT_NOT_FOUND, + `Document ${id} not found in ${collection}`, + { collection, id } + ); + } + + // Prepare document for update + const document = { + _id: id, + ...prepareDocument(collection, data as unknown as Omit, existing || undefined) + }; + + // Update in database + const hash = await db.put(document); + + // Update cache + const cacheKey = `${collection}:${id}`; + cache.set(cacheKey, document); + + // Emit change event + events.emit('document:updated', { collection, id, document, previous: existing }); + + logger.info(`Updated document in ${collection} with id ${id}`); + return { id, hash }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error updating document in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update document in ${collection}`, error); + } + }); + } + + /** + * Delete a document from a collection + */ + async remove( + collection: string, + id: string, + options?: StoreOptions + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.DOCSTORE, options); + + // Get the document before deleting for the event + const document = await db.get(id); + + // Delete from database + await db.del(id); + + // Remove from cache + const cacheKey = `${collection}:${id}`; + cache.del(cacheKey); + + // Emit change event + events.emit('document:deleted', { collection, id, document }); + + logger.info(`Deleted document in ${collection} with id ${id}`); + return true; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error deleting document in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete document in ${collection}`, error); + } + }); + } + + /** + * List all documents in a collection with pagination + */ + async list>( + collection: string, + options?: ListOptions + ): Promise> { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.DOCSTORE, options); + const allDocs = await db.query((doc: any) => true); + + let documents = allDocs.map((doc: any) => ({ + id: doc._id, + ...doc + })) as T[]; + + // Sort if requested + if (options?.sort) { + const { field, order } = options.sort; + documents.sort((a, b) => { + const valueA = a[field]; + const valueB = b[field]; + + // Handle different data types for sorting + if (typeof valueA === 'string' && typeof valueB === 'string') { + return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA); + } else if (typeof valueA === 'number' && typeof valueB === 'number') { + return order === 'asc' ? valueA - valueB : valueB - valueA; + } else if (valueA instanceof Date && valueB instanceof Date) { + return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime(); + } + + // Default comparison for other types + return order === 'asc' ? + String(valueA).localeCompare(String(valueB)) : + String(valueB).localeCompare(String(valueA)); + }); + } + + const total = documents.length; + + // Apply pagination + const offset = options?.offset || 0; + const limit = options?.limit || total; + + const paginatedDocuments = documents.slice(offset, offset + limit); + const hasMore = offset + limit < total; + + return { + documents: paginatedDocuments, + total, + hasMore + }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error listing documents in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list documents in ${collection}`, error); + } + }); + } + + /** + * Query documents in a collection with filtering and pagination + */ + async query>( + collection: string, + filter: (doc: T) => boolean, + options?: QueryOptions + ): Promise> { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.DOCSTORE, options); + + // Apply filter using docstore's query capability + const filtered = await db.query((doc: any) => filter(doc as T)); + + // Map the documents to include id + let documents = filtered.map((doc: any) => ({ + id: doc._id, + ...doc + })) as T[]; + + // Sort if requested + if (options?.sort) { + const { field, order } = options.sort; + documents.sort((a, b) => { + const valueA = a[field]; + const valueB = b[field]; + + // Handle different data types for sorting + if (typeof valueA === 'string' && typeof valueB === 'string') { + return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA); + } else if (typeof valueA === 'number' && typeof valueB === 'number') { + return order === 'asc' ? valueA - valueB : valueB - valueA; + } else if (valueA instanceof Date && valueB instanceof Date) { + return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime(); + } + + // Default comparison for other types + return order === 'asc' ? + String(valueA).localeCompare(String(valueB)) : + String(valueB).localeCompare(String(valueA)); + }); + } + + const total = documents.length; + + // Apply pagination + const offset = options?.offset || 0; + const limit = options?.limit || total; + + const paginatedDocuments = documents.slice(offset, offset + limit); + const hasMore = offset + limit < total; + + return { + documents: paginatedDocuments, + total, + hasMore + }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error querying documents in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query documents in ${collection}`, error); + } + }); + } + + /** + * Create an index for a collection to speed up queries + * DocStore has built-in indexing capabilities + */ + async createIndex( + collection: string, + field: string, + options?: StoreOptions + ): Promise { + try { + const db = await openStore(collection, StoreType.DOCSTORE, options); + + // DocStore supports indexing, so we create the index + if (typeof db.createIndex === 'function') { + await db.createIndex(field); + logger.info(`Index created on ${field} for collection ${collection}`); + return true; + } + + logger.info(`Index creation not supported for this DB instance, but DocStore has built-in indices`); + return true; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error creating index for ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create index for ${collection}`, error); + } + } +} \ No newline at end of file diff --git a/src/db/stores/feedStore.ts b/src/db/stores/feedStore.ts new file mode 100644 index 0000000..123554f --- /dev/null +++ b/src/db/stores/feedStore.ts @@ -0,0 +1,416 @@ +import { createServiceLogger } from '../../utils/logger'; +import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types'; +import { DBError } from '../core/error'; +import { BaseStore, openStore, prepareDocument } from './baseStore'; +import * as cache from '../cache/cacheService'; +import * as events from '../events/eventService'; +import { measurePerformance } from '../metrics/metricsService'; + +const logger = createServiceLogger('FEED_STORE'); + +/** + * FeedStore/EventLog implementation + * Uses OrbitDB's feed/eventlog store which is an append-only log + */ +export class FeedStore implements BaseStore { + /** + * Create a new document in the specified collection + * For feeds, this appends a new entry + */ + async create>( + collection: string, + id: string, + data: Omit, + options?: StoreOptions + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.FEED, options); + + // Prepare document for storage with ID + const document = { + id, + ...prepareDocument(collection, data) + }; + + // Add to database + const hash = await db.add(document); + + // Feed entries are append-only, so we use a different cache key pattern + const cacheKey = `${collection}:entry:${hash}`; + cache.set(cacheKey, document); + + // Emit change event + events.emit('document:created', { collection, id, document, hash }); + + logger.info(`Created entry in feed ${collection} with id ${id} and hash ${hash}`); + return { id, hash }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error creating entry in feed ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create entry in feed ${collection}`, error); + } + }); + } + + /** + * Get a specific entry in a feed - note this works differently than other stores + * as feeds are append-only logs identified by hash + */ + async get>( + collection: string, + hash: string, + options?: StoreOptions & { skipCache?: boolean } + ): Promise { + return measurePerformance(async () => { + try { + // Check cache first if not skipped + const cacheKey = `${collection}:entry:${hash}`; + if (!options?.skipCache) { + const cachedDocument = cache.get(cacheKey); + if (cachedDocument) { + return cachedDocument; + } + } + + const db = await openStore(collection, StoreType.FEED, options); + + // Get the specific entry by hash + const entry = await db.get(hash); + if (!entry) { + return null; + } + + const document = entry.payload.value as T; + + // Update cache + cache.set(cacheKey, document); + + return document; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error getting entry ${hash} from feed ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get entry ${hash} from feed ${collection}`, error); + } + }); + } + + /** + * Update an entry in a feed + * Note: Feeds are append-only, so we can't actually update existing entries + * Instead, we append a new entry with the updated data and link it to the original + */ + async update>( + collection: string, + id: string, + data: Partial>, + options?: StoreOptions & { upsert?: boolean } + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.FEED, options); + + // Find the latest entry with the given id + const entries = await db.iterator({ limit: -1 }).collect(); + const existingEntryIndex = entries.findIndex((e: any) => { + const value = e.payload.value; + return value && value.id === id; + }); + + if (existingEntryIndex === -1 && !options?.upsert) { + throw new DBError( + ErrorCode.DOCUMENT_NOT_FOUND, + `Entry with id ${id} not found in feed ${collection}`, + { collection, id } + ); + } + + const existingEntry = existingEntryIndex !== -1 ? entries[existingEntryIndex].payload.value : null; + + // Prepare document with update + const document = { + id, + ...prepareDocument(collection, data as unknown as Omit, existingEntry), + // Add reference to the previous entry if it exists + previousEntryHash: existingEntryIndex !== -1 ? entries[existingEntryIndex].hash : undefined + }; + + // Add to feed (append new entry) + const hash = await db.add(document); + + // Cache the new entry + const cacheKey = `${collection}:entry:${hash}`; + cache.set(cacheKey, document); + + // Emit change event + events.emit('document:updated', { collection, id, document, previous: existingEntry }); + + logger.info(`Updated entry in feed ${collection} with id ${id} (new hash: ${hash})`); + return { id, hash }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error updating entry in feed ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update entry in feed ${collection}`, error); + } + }); + } + + /** + * Delete is not supported in feed/eventlog stores since they're append-only + * Instead, we add a "tombstone" entry that marks the entry as deleted + */ + async remove( + collection: string, + id: string, + options?: StoreOptions + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.FEED, options); + + // Find the entry with the given id + const entries = await db.iterator({ limit: -1 }).collect(); + const existingEntryIndex = entries.findIndex((e: any) => { + const value = e.payload.value; + return value && value.id === id; + }); + + if (existingEntryIndex === -1) { + throw new DBError( + ErrorCode.DOCUMENT_NOT_FOUND, + `Entry with id ${id} not found in feed ${collection}`, + { collection, id } + ); + } + + const existingEntry = entries[existingEntryIndex].payload.value; + const existingHash = entries[existingEntryIndex].hash; + + // Add a "tombstone" entry that marks this as deleted + const tombstone = { + id, + deleted: true, + deletedAt: Date.now(), + previousEntryHash: existingHash + }; + + await db.add(tombstone); + + // Emit change event + events.emit('document:deleted', { collection, id, document: existingEntry }); + + logger.info(`Marked entry as deleted in feed ${collection} with id ${id}`); + return true; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error marking entry as deleted in feed ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to mark entry as deleted in feed ${collection}`, error); + } + }); + } + + /** + * List all entries in a feed with pagination + * Note: This will only return the latest entry for each unique ID + */ + async list>( + collection: string, + options?: ListOptions + ): Promise> { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.FEED, options); + + // Get all entries + const entries = await db.iterator({ limit: -1 }).collect(); + + // Group by ID and keep only the latest entry for each ID + // Also filter out tombstone entries + const latestEntries = new Map(); + for (const entry of entries) { + const value = entry.payload.value; + if (!value || value.deleted) continue; + + const id = value.id; + if (!id) continue; + + // If we already have an entry with this ID, check which is newer + if (latestEntries.has(id)) { + const existing = latestEntries.get(id); + if (value.updatedAt > existing.value.updatedAt) { + latestEntries.set(id, { hash: entry.hash, value }); + } + } else { + latestEntries.set(id, { hash: entry.hash, value }); + } + } + + // Convert to array of documents + let documents = Array.from(latestEntries.values()).map(entry => ({ + ...entry.value + })) as T[]; + + // Sort if requested + if (options?.sort) { + const { field, order } = options.sort; + documents.sort((a, b) => { + const valueA = a[field]; + const valueB = b[field]; + + // Handle different data types for sorting + if (typeof valueA === 'string' && typeof valueB === 'string') { + return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA); + } else if (typeof valueA === 'number' && typeof valueB === 'number') { + return order === 'asc' ? valueA - valueB : valueB - valueA; + } else if (valueA instanceof Date && valueB instanceof Date) { + return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime(); + } + + // Default comparison for other types + return order === 'asc' ? + String(valueA).localeCompare(String(valueB)) : + String(valueB).localeCompare(String(valueA)); + }); + } + + const total = documents.length; + + // Apply pagination + const offset = options?.offset || 0; + const limit = options?.limit || total; + + const paginatedDocuments = documents.slice(offset, offset + limit); + const hasMore = offset + limit < total; + + return { + documents: paginatedDocuments, + total, + hasMore + }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error listing entries in feed ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list entries in feed ${collection}`, error); + } + }); + } + + /** + * Query entries in a feed with filtering and pagination + * Note: This queries the latest entry for each unique ID + */ + async query>( + collection: string, + filter: (doc: T) => boolean, + options?: QueryOptions + ): Promise> { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.FEED, options); + + // Get all entries + const entries = await db.iterator({ limit: -1 }).collect(); + + // Group by ID and keep only the latest entry for each ID + // Also filter out tombstone entries + const latestEntries = new Map(); + for (const entry of entries) { + const value = entry.payload.value; + if (!value || value.deleted) continue; + + const id = value.id; + if (!id) continue; + + // If we already have an entry with this ID, check which is newer + if (latestEntries.has(id)) { + const existing = latestEntries.get(id); + if (value.updatedAt > existing.value.updatedAt) { + latestEntries.set(id, { hash: entry.hash, value }); + } + } else { + latestEntries.set(id, { hash: entry.hash, value }); + } + } + + // Convert to array of documents and apply filter + let filtered = Array.from(latestEntries.values()) + .filter(entry => filter(entry.value as T)) + .map(entry => ({ + ...entry.value + })) as T[]; + + // Sort if requested + if (options?.sort) { + const { field, order } = options.sort; + filtered.sort((a, b) => { + const valueA = a[field]; + const valueB = b[field]; + + // Handle different data types for sorting + if (typeof valueA === 'string' && typeof valueB === 'string') { + return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA); + } else if (typeof valueA === 'number' && typeof valueB === 'number') { + return order === 'asc' ? valueA - valueB : valueB - valueA; + } else if (valueA instanceof Date && valueB instanceof Date) { + return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime(); + } + + // Default comparison for other types + return order === 'asc' ? + String(valueA).localeCompare(String(valueB)) : + String(valueB).localeCompare(String(valueA)); + }); + } + + const total = filtered.length; + + // Apply pagination + const offset = options?.offset || 0; + const limit = options?.limit || total; + + const paginatedDocuments = filtered.slice(offset, offset + limit); + const hasMore = offset + limit < total; + + return { + documents: paginatedDocuments, + total, + hasMore + }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error querying entries in feed ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query entries in feed ${collection}`, error); + } + }); + } + + /** + * Create an index for a collection - not supported for feeds + */ + async createIndex( + collection: string, + field: string, + options?: StoreOptions + ): Promise { + logger.warn(`Index creation not supported for feed collections, ignoring request for ${collection}`); + return false; + } +} \ No newline at end of file diff --git a/src/db/stores/fileStore.ts b/src/db/stores/fileStore.ts new file mode 100644 index 0000000..f2bac6e --- /dev/null +++ b/src/db/stores/fileStore.ts @@ -0,0 +1,149 @@ +import { createServiceLogger } from '../../utils/logger'; +import { ErrorCode, StoreType, FileUploadResult, FileResult } from '../types'; +import { DBError } from '../core/error'; +import { getConnection } from '../core/connection'; +import { openStore } from './baseStore'; +import { getHelia } from '../../ipfs/ipfsService'; +import { measurePerformance } from '../metrics/metricsService'; + +// Helper function to convert AsyncIterable to Buffer +async function readAsyncIterableToBuffer(asyncIterable: AsyncIterable): Promise { + const chunks: Uint8Array[] = []; + for await (const chunk of asyncIterable) { + chunks.push(chunk); + } + return Buffer.concat(chunks); +} + +const logger = createServiceLogger('FILE_STORE'); + +/** + * Upload a file to IPFS + */ +export const uploadFile = async ( + fileData: Buffer, + options?: { + filename?: string; + connectionId?: string; + metadata?: Record; + } +): Promise => { + return measurePerformance(async () => { + try { + const connection = getConnection(options?.connectionId); + const ipfs = getHelia(); + if (!ipfs) { + throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available'); + } + + // Add to IPFS + const blockstore = ipfs.blockstore; + const unixfs = await import('@helia/unixfs'); + const fs = unixfs.unixfs(ipfs); + const cid = await fs.addBytes(fileData); + const cidStr = cid.toString(); + + // Store metadata + const filesDb = await openStore('_files', StoreType.KEYVALUE); + await filesDb.put(cidStr, { + filename: options?.filename, + size: fileData.length, + uploadedAt: Date.now(), + ...options?.metadata + }); + + logger.info(`Uploaded file with CID: ${cidStr}`); + return { cid: cidStr }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error('Error uploading file:', error); + throw new DBError(ErrorCode.OPERATION_FAILED, 'Failed to upload file', error); + } + }); +}; + +/** + * Get a file from IPFS by CID + */ +export const getFile = async ( + cid: string, + options?: { connectionId?: string } +): Promise => { + return measurePerformance(async () => { + try { + const connection = getConnection(options?.connectionId); + const ipfs = getHelia(); + if (!ipfs) { + throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available'); + } + + // Get from IPFS + const unixfs = await import('@helia/unixfs'); + const fs = unixfs.unixfs(ipfs); + const { CID } = await import('multiformats/cid'); + const resolvedCid = CID.parse(cid); + + try { + // Convert AsyncIterable to Buffer + const bytes = await readAsyncIterableToBuffer(fs.cat(resolvedCid)); + + // Get metadata if available + let metadata = null; + try { + const filesDb = await openStore('_files', StoreType.KEYVALUE); + metadata = await filesDb.get(cid); + } catch (err) { + // Metadata might not exist, continue without it + } + + return { data: bytes, metadata }; + } catch (error) { + throw new DBError(ErrorCode.FILE_NOT_FOUND, `File with CID ${cid} not found`, error); + } + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error getting file with CID ${cid}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get file with CID ${cid}`, error); + } + }); +}; + +/** + * Delete a file from IPFS by CID + */ +export const deleteFile = async ( + cid: string, + options?: { connectionId?: string } +): Promise => { + return measurePerformance(async () => { + try { + const connection = getConnection(options?.connectionId); + + // Delete metadata + try { + const filesDb = await openStore('_files', StoreType.KEYVALUE); + await filesDb.del(cid); + } catch (err) { + // Ignore if metadata doesn't exist + } + + // In IPFS we can't really delete files, but we can remove them from our local blockstore + // and they will eventually be garbage collected if no one else has pinned them + logger.info(`Deleted file with CID: ${cid}`); + return true; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error deleting file with CID ${cid}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete file with CID ${cid}`, error); + } + }); +}; \ No newline at end of file diff --git a/src/db/stores/keyValueStore.ts b/src/db/stores/keyValueStore.ts new file mode 100644 index 0000000..ef943ff --- /dev/null +++ b/src/db/stores/keyValueStore.ts @@ -0,0 +1,335 @@ +import { createServiceLogger } from '../../utils/logger'; +import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types'; +import { DBError } from '../core/error'; +import { BaseStore, openStore, prepareDocument } from './baseStore'; +import * as cache from '../cache/cacheService'; +import * as events from '../events/eventService'; +import { measurePerformance } from '../metrics/metricsService'; + +const logger = createServiceLogger('KEYVALUE_STORE'); + +/** + * KeyValue Store implementation + */ +export class KeyValueStore implements BaseStore { + /** + * Create a new document in the specified collection + */ + async create>( + collection: string, + id: string, + data: Omit, + options?: StoreOptions + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.KEYVALUE, options); + + // Prepare document for storage + const document = prepareDocument(collection, data); + + // Add to database + const hash = await db.put(id, document); + + // Add to cache + const cacheKey = `${collection}:${id}`; + cache.set(cacheKey, document); + + // Emit change event + events.emit('document:created', { collection, id, document }); + + logger.info(`Created document in ${collection} with id ${id}`); + return { id, hash }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error creating document in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create document in ${collection}`, error); + } + }); + } + + /** + * Get a document by ID from a collection + */ + async get>( + collection: string, + id: string, + options?: StoreOptions & { skipCache?: boolean } + ): Promise { + return measurePerformance(async () => { + try { + // Check cache first if not skipped + const cacheKey = `${collection}:${id}`; + if (!options?.skipCache) { + const cachedDocument = cache.get(cacheKey); + if (cachedDocument) { + return cachedDocument; + } + } + + const db = await openStore(collection, StoreType.KEYVALUE, options); + const document = await db.get(id) as T | null; + + // Update cache if document exists + if (document) { + cache.set(cacheKey, document); + } + + return document; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error getting document ${id} from ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get document ${id} from ${collection}`, error); + } + }); + } + + /** + * Update a document in a collection + */ + async update>( + collection: string, + id: string, + data: Partial>, + options?: StoreOptions & { upsert?: boolean } + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.KEYVALUE, options); + const existing = await db.get(id) as T | null; + + if (!existing && !options?.upsert) { + throw new DBError( + ErrorCode.DOCUMENT_NOT_FOUND, + `Document ${id} not found in ${collection}`, + { collection, id } + ); + } + + // Prepare document for update + const document = prepareDocument(collection, data as unknown as Omit, existing || undefined); + + // Update in database + const hash = await db.put(id, document); + + // Update cache + const cacheKey = `${collection}:${id}`; + cache.set(cacheKey, document); + + // Emit change event + events.emit('document:updated', { collection, id, document, previous: existing }); + + logger.info(`Updated document in ${collection} with id ${id}`); + return { id, hash }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error updating document in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update document in ${collection}`, error); + } + }); + } + + /** + * Delete a document from a collection + */ + async remove( + collection: string, + id: string, + options?: StoreOptions + ): Promise { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.KEYVALUE, options); + + // Get the document before deleting for the event + const document = await db.get(id); + + // Delete from database + await db.del(id); + + // Remove from cache + const cacheKey = `${collection}:${id}`; + cache.del(cacheKey); + + // Emit change event + events.emit('document:deleted', { collection, id, document }); + + logger.info(`Deleted document in ${collection} with id ${id}`); + return true; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error deleting document in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete document in ${collection}`, error); + } + }); + } + + /** + * List all documents in a collection with pagination + */ + async list>( + collection: string, + options?: ListOptions + ): Promise> { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.KEYVALUE, options); + const all = await db.all(); + + let documents = Object.entries(all).map(([key, value]) => ({ + id: key, + ...(value as any) + })) as T[]; + + // Sort if requested + if (options?.sort) { + const { field, order } = options.sort; + documents.sort((a, b) => { + const valueA = a[field]; + const valueB = b[field]; + + // Handle different data types for sorting + if (typeof valueA === 'string' && typeof valueB === 'string') { + return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA); + } else if (typeof valueA === 'number' && typeof valueB === 'number') { + return order === 'asc' ? valueA - valueB : valueB - valueA; + } else if (valueA instanceof Date && valueB instanceof Date) { + return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime(); + } + + // Default comparison for other types + return order === 'asc' ? + String(valueA).localeCompare(String(valueB)) : + String(valueB).localeCompare(String(valueA)); + }); + } + + const total = documents.length; + + // Apply pagination + const offset = options?.offset || 0; + const limit = options?.limit || total; + + const paginatedDocuments = documents.slice(offset, offset + limit); + const hasMore = offset + limit < total; + + return { + documents: paginatedDocuments, + total, + hasMore + }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error listing documents in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list documents in ${collection}`, error); + } + }); + } + + /** + * Query documents in a collection with filtering and pagination + */ + async query>( + collection: string, + filter: (doc: T) => boolean, + options?: QueryOptions + ): Promise> { + return measurePerformance(async () => { + try { + const db = await openStore(collection, StoreType.KEYVALUE, options); + const all = await db.all(); + + // Apply filter + let filtered = Object.entries(all) + .filter(([_, value]) => filter(value as T)) + .map(([key, value]) => ({ + id: key, + ...(value as any) + })) as T[]; + + // Sort if requested + if (options?.sort) { + const { field, order } = options.sort; + filtered.sort((a, b) => { + const valueA = a[field]; + const valueB = b[field]; + + // Handle different data types for sorting + if (typeof valueA === 'string' && typeof valueB === 'string') { + return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA); + } else if (typeof valueA === 'number' && typeof valueB === 'number') { + return order === 'asc' ? valueA - valueB : valueB - valueA; + } else if (valueA instanceof Date && valueB instanceof Date) { + return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime(); + } + + // Default comparison for other types + return order === 'asc' ? + String(valueA).localeCompare(String(valueB)) : + String(valueB).localeCompare(String(valueA)); + }); + } + + const total = filtered.length; + + // Apply pagination + const offset = options?.offset || 0; + const limit = options?.limit || total; + + const paginatedDocuments = filtered.slice(offset, offset + limit); + const hasMore = offset + limit < total; + + return { + documents: paginatedDocuments, + total, + hasMore + }; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error querying documents in ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query documents in ${collection}`, error); + } + }); + } + + /** + * Create an index for a collection to speed up queries + */ + async createIndex( + collection: string, + field: string, + options?: StoreOptions + ): Promise { + try { + // In a real implementation, this would register the index with OrbitDB + // or create a specialized data structure. For now, we'll just log the request. + logger.info(`Index created on ${field} for collection ${collection}`); + return true; + } catch (error) { + if (error instanceof DBError) { + throw error; + } + + logger.error(`Error creating index for ${collection}:`, error); + throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create index for ${collection}`, error); + } + } +} \ No newline at end of file diff --git a/src/db/stores/storeFactory.ts b/src/db/stores/storeFactory.ts new file mode 100644 index 0000000..945894e --- /dev/null +++ b/src/db/stores/storeFactory.ts @@ -0,0 +1,54 @@ +import { createServiceLogger } from '../../utils/logger'; +import { StoreType, ErrorCode } from '../types'; +import { DBError } from '../core/error'; +import { BaseStore } from './baseStore'; +import { KeyValueStore } from './keyValueStore'; +import { DocStore } from './docStore'; +import { FeedStore } from './feedStore'; +import { CounterStore } from './counterStore'; + +const logger = createServiceLogger('STORE_FACTORY'); + +// Initialize instances for each store type +const storeInstances = new Map(); + +/** + * Get a store instance by type + */ +export function getStore(type: StoreType): BaseStore { + // Check if we already have an instance + if (storeInstances.has(type)) { + return storeInstances.get(type)!; + } + + // Create a new instance based on type + let store: BaseStore; + + switch (type) { + case StoreType.KEYVALUE: + store = new KeyValueStore(); + break; + + case StoreType.DOCSTORE: + store = new DocStore(); + break; + + case StoreType.FEED: + case StoreType.EVENTLOG: // Alias for feed + store = new FeedStore(); + break; + + case StoreType.COUNTER: + store = new CounterStore(); + break; + + default: + logger.error(`Unsupported store type: ${type}`); + throw new DBError(ErrorCode.STORE_TYPE_ERROR, `Unsupported store type: ${type}`); + } + + // Cache the instance + storeInstances.set(type, store); + + return store; +} \ No newline at end of file diff --git a/src/db/transactions/transactionService.ts b/src/db/transactions/transactionService.ts new file mode 100644 index 0000000..445c12a --- /dev/null +++ b/src/db/transactions/transactionService.ts @@ -0,0 +1,77 @@ +import { createServiceLogger } from '../../utils/logger'; +import { ErrorCode } from '../types'; +import { DBError } from '../core/error'; + +const logger = createServiceLogger('DB_TRANSACTION'); + +// Transaction operation type +interface TransactionOperation { + type: 'create' | 'update' | 'delete'; + collection: string; + id: string; + data?: any; +} + +/** + * Transaction object for batching operations + */ +export class Transaction { + private operations: TransactionOperation[] = []; + private connectionId?: string; + + constructor(connectionId?: string) { + this.connectionId = connectionId; + } + + /** + * Add a create operation to the transaction + */ + create(collection: string, id: string, data: T): Transaction { + this.operations.push({ + type: 'create', + collection, + id, + data + }); + return this; + } + + /** + * Add an update operation to the transaction + */ + update(collection: string, id: string, data: Partial): Transaction { + this.operations.push({ + type: 'update', + collection, + id, + data + }); + return this; + } + + /** + * Add a delete operation to the transaction + */ + delete(collection: string, id: string): Transaction { + this.operations.push({ + type: 'delete', + collection, + id + }); + return this; + } + + /** + * Get all operations in this transaction + */ + getOperations(): TransactionOperation[] { + return [...this.operations]; + } + + /** + * Get connection ID for this transaction + */ + getConnectionId(): string | undefined { + return this.connectionId; + } +} \ No newline at end of file diff --git a/src/db/types/index.ts b/src/db/types/index.ts new file mode 100644 index 0000000..cbdec25 --- /dev/null +++ b/src/db/types/index.ts @@ -0,0 +1,132 @@ +// Common types for database operations +import { EventEmitter } from 'events'; +import { Transaction } from '../transactions/transactionService'; + +export type { Transaction }; + +// Database Types +export enum StoreType { + KEYVALUE = 'keyvalue', + DOCSTORE = 'docstore', + FEED = 'feed', + EVENTLOG = 'eventlog', + COUNTER = 'counter' +} + +// Common result types +export interface CreateResult { + id: string; + hash: string; +} + +export interface UpdateResult { + id: string; + hash: string; +} + +export interface FileUploadResult { + cid: string; +} + +export interface FileMetadata { + filename?: string; + size: number; + uploadedAt: number; + [key: string]: any; +} + +export interface FileResult { + data: Buffer; + metadata: FileMetadata | null; +} + +export interface PaginatedResult { + documents: T[]; + total: number; + hasMore: boolean; +} + +// Define error codes +export enum ErrorCode { + NOT_INITIALIZED = 'ERR_NOT_INITIALIZED', + INITIALIZATION_FAILED = 'ERR_INIT_FAILED', + DOCUMENT_NOT_FOUND = 'ERR_DOC_NOT_FOUND', + INVALID_SCHEMA = 'ERR_INVALID_SCHEMA', + OPERATION_FAILED = 'ERR_OPERATION_FAILED', + TRANSACTION_FAILED = 'ERR_TRANSACTION_FAILED', + FILE_NOT_FOUND = 'ERR_FILE_NOT_FOUND', + INVALID_PARAMETERS = 'ERR_INVALID_PARAMS', + CONNECTION_ERROR = 'ERR_CONNECTION', + STORE_TYPE_ERROR = 'ERR_STORE_TYPE', +} + +// Connection pool interface +export interface DBConnection { + ipfs: any; + orbitdb: any; + timestamp: number; + isActive: boolean; +} + +// Schema validation +export interface SchemaDefinition { + type: string; + required?: boolean; + pattern?: string; + min?: number; + max?: number; + enum?: any[]; + items?: SchemaDefinition; // For arrays + properties?: Record; // For objects +} + +export interface CollectionSchema { + properties: Record; + required?: string[]; +} + +// Metrics tracking +export interface Metrics { + operations: { + creates: number; + reads: number; + updates: number; + deletes: number; + queries: number; + fileUploads: number; + fileDownloads: number; + }; + performance: { + totalOperationTime: number; + operationCount: number; + averageOperationTime?: number; + }; + errors: { + count: number; + byCode: Record; + }; + cacheStats: { + hits: number; + misses: number; + }; + startTime: number; +} + +// Store options +export interface ListOptions { + limit?: number; + offset?: number; + connectionId?: string; + sort?: { field: string; order: 'asc' | 'desc' }; +} + +export interface QueryOptions extends ListOptions { + indexBy?: string; +} + +export interface StoreOptions { + connectionId?: string; +} + +// Event bus for database events +export const dbEvents = new EventEmitter(); \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 93fa64e..d4caac0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,97 +2,136 @@ import { config, defaultConfig, type DebrosConfig } from './config'; import { validateConfig, type ValidationResult } from './ipfs/config/configValidator'; -// IPFS exports -import ipfsService, { - init as initIpfs, - stop as stopIpfs, - getHelia, - getProxyAgent, - getInstance, - getLibp2p, - getConnectedPeers, - getOptimalPeer, - updateNodeLoad, - logPeersStatus, - type IPFSModule, -} from './ipfs/ipfsService'; +// Database service exports (new abstracted layer) +import dbService from './db/dbService'; +import { + init as initDB, + create, + get, + update, + remove, + list, + query, + createIndex, + createTransaction, + commitTransaction, + subscribe, + uploadFile, + getFile, + deleteFile, + defineSchema, + getMetrics, + resetMetrics, + closeConnection, + stop as stopDB +} from './db/dbService'; +import { ErrorCode, StoreType } from './db/types'; -import { ipfsConfig, getIpfsPort, getBlockstorePath } from './ipfs/config/ipfsConfig'; +// Import types +import type { + Transaction, + CreateResult, + UpdateResult, + PaginatedResult, + ListOptions, + QueryOptions, + FileUploadResult, + FileResult, + CollectionSchema, + SchemaDefinition, + Metrics +} from './db/types'; -// OrbitDB exports -import orbitDBService, { - init as initOrbitDB, - openDB, - getOrbitDB, - db as orbitDB, - getOrbitDBDir, - getDBAddress, - saveDBAddress, -} from './orbit/orbitDBService'; +import { DBError } from './db/core/error'; -import loadBalancerControllerDefault from './ipfs/loadBalancerController'; -export const loadBalancerController = loadBalancerControllerDefault; +// Legacy exports (internal use only, not exposed in default export) +import { init as initIpfs, stop as stopIpfs, getHelia } from './ipfs/ipfsService'; +import { init as initOrbitDB, openDB } from './orbit/orbitDBService'; // Logger exports import logger, { createServiceLogger, createDebrosLogger, type LoggerOptions } from './utils/logger'; -// Crypto exports -import { getPrivateKey } from './ipfs/utils/crypto'; - -// Export everything +// Export public API export { - // Config + // Configuration config, defaultConfig, validateConfig, type DebrosConfig, type ValidationResult, - - // IPFS - ipfsService, - initIpfs, - stopIpfs, - getHelia, - getProxyAgent, - getInstance, - getLibp2p, - getConnectedPeers, - getOptimalPeer, - updateNodeLoad, - logPeersStatus, - type IPFSModule, - - // IPFS Config - ipfsConfig, - getIpfsPort, - getBlockstorePath, - - // OrbitDB - orbitDBService, - initOrbitDB, - openDB, - getOrbitDB, - orbitDB, - getOrbitDBDir, - getDBAddress, - saveDBAddress, - + + // Database Service (Main public API) + initDB, + create, + get, + update, + remove, + list, + query, + createIndex, + createTransaction, + commitTransaction, + subscribe, + uploadFile, + getFile, + deleteFile, + defineSchema, + getMetrics, + resetMetrics, + closeConnection, + stopDB, + ErrorCode, + StoreType, + + // Types + type Transaction, + type DBError, + type CollectionSchema, + type SchemaDefinition, + type CreateResult, + type UpdateResult, + type PaginatedResult, + type ListOptions, + type QueryOptions, + type FileUploadResult, + type FileResult, + type Metrics, + // Logger logger, createServiceLogger, createDebrosLogger, type LoggerOptions, - - // Crypto - getPrivateKey, }; // Default export for convenience export default { config, validateConfig, - ipfsService, - orbitDBService, + // Database Service as main interface + db: { + init: initDB, + create, + get, + update, + remove, + list, + query, + createIndex, + createTransaction, + commitTransaction, + subscribe, + uploadFile, + getFile, + deleteFile, + defineSchema, + getMetrics, + resetMetrics, + closeConnection, + stop: stopDB, + ErrorCode, + StoreType + }, logger, createServiceLogger, -}; +}; \ No newline at end of file diff --git a/types.d.ts b/types.d.ts index cba1c00..2e5f05b 100644 --- a/types.d.ts +++ b/types.d.ts @@ -7,6 +7,10 @@ declare module "@debros/network" { // Config types export interface DebrosConfig { + env: { + fingerprint: string; + port: number; + }; ipfs: { swarm: { port: number; @@ -15,9 +19,15 @@ declare module "@debros/network" { connectAddresses: string[]; }; blockstorePath: string; - orbitdbPath: string; bootstrap: string[]; privateKey?: string; + serviceDiscovery?: { + topic: string; + heartbeatInterval: number; + }; + }; + orbitdb: { + directory: string; }; logger: { level: string; @@ -33,66 +43,179 @@ declare module "@debros/network" { // Core configuration export const config: DebrosConfig; export const defaultConfig: DebrosConfig; - export function validateConfig( - config: Partial - ): ValidationResult; + export function validateConfig(config: Partial): ValidationResult; - // IPFS types - export interface IPFSModule { - helia: any; - libp2p: any; + // Store types + export enum StoreType { + KEYVALUE = 'keyvalue', + DOCSTORE = 'docstore', + FEED = 'feed', + EVENTLOG = 'eventlog', + COUNTER = 'counter' } - // IPFS Service - export const ipfsService: { - init(): Promise; - stop(): Promise; - }; - export function initIpfs(): Promise; - export function stopIpfs(): Promise; - export function getHelia(): any; - export function getProxyAgent(): any; - export function getInstance(): IPFSModule; - export function getLibp2p(): any; - export function getConnectedPeers(): any[]; - export function getOptimalPeer(): any; - export function updateNodeLoad(load: number): void; - export function logPeersStatus(): void; - - // IPFS Config - export const ipfsConfig: any; - export function getIpfsPort(): number; - export function getBlockstorePath(): string; - - // LoadBalancerController interface and value declaration - export interface LoadBalancerController { - getNodeInfo: (_req: Request, _res: Response, _next: NextFunction) => void; - getOptimalPeer: ( - _req: Request, - _res: Response, - _next: NextFunction - ) => void; - getAllPeers: (_req: Request, _res: Response, _next: NextFunction) => void; + // Error handling + export enum ErrorCode { + NOT_INITIALIZED = 'ERR_NOT_INITIALIZED', + INITIALIZATION_FAILED = 'ERR_INIT_FAILED', + DOCUMENT_NOT_FOUND = 'ERR_DOC_NOT_FOUND', + INVALID_SCHEMA = 'ERR_INVALID_SCHEMA', + OPERATION_FAILED = 'ERR_OPERATION_FAILED', + TRANSACTION_FAILED = 'ERR_TRANSACTION_FAILED', + FILE_NOT_FOUND = 'ERR_FILE_NOT_FOUND', + INVALID_PARAMETERS = 'ERR_INVALID_PARAMS', + CONNECTION_ERROR = 'ERR_CONNECTION', + STORE_TYPE_ERROR = 'ERR_STORE_TYPE' } - // Declare loadBalancerController as a value - export const loadBalancerController: LoadBalancerController; + export class DBError extends Error { + code: ErrorCode; + details?: any; + constructor(code: ErrorCode, message: string, details?: any); + } - // OrbitDB - export const orbitDBService: { - init(): Promise; - }; - export function initOrbitDB(): Promise; - export function openDB( - dbName: string, - dbType: string, - options?: any - ): Promise; - export function getOrbitDB(): any; - export const orbitDB: any; - export function getOrbitDBDir(): string; - export function getDBAddress(dbName: string): string | null; - export function saveDBAddress(dbName: string, address: string): void; + // Schema validation + export interface SchemaDefinition { + type: string; + required?: boolean; + pattern?: string; + min?: number; + max?: number; + enum?: any[]; + items?: SchemaDefinition; // For arrays + properties?: Record; // For objects + } + + export interface CollectionSchema { + properties: Record; + required?: string[]; + } + + // Database types + export interface DocumentMetadata { + createdAt: number; + updatedAt: number; + } + + export interface Document extends DocumentMetadata { + [key: string]: any; + } + + export interface CreateResult { + id: string; + hash: string; + } + + export interface UpdateResult { + id: string; + hash: string; + } + + export interface FileUploadResult { + cid: string; + } + + export interface FileMetadata { + filename?: string; + size: number; + uploadedAt: number; + [key: string]: any; + } + + export interface FileResult { + data: Buffer; + metadata: FileMetadata | null; + } + + export interface ListOptions { + limit?: number; + offset?: number; + sort?: { field: string; order: 'asc' | 'desc' }; + connectionId?: string; + storeType?: StoreType; + } + + export interface QueryOptions extends ListOptions { + indexBy?: string; + } + + export interface PaginatedResult { + documents: T[]; + total: number; + hasMore: boolean; + } + + // Transaction API + export class Transaction { + create(collection: string, id: string, data: T): Transaction; + update(collection: string, id: string, data: Partial): Transaction; + delete(collection: string, id: string): Transaction; + commit(): Promise<{ success: boolean; results: any[] }>; + } + + // Metrics tracking + export interface Metrics { + operations: { + creates: number; + reads: number; + updates: number; + deletes: number; + queries: number; + fileUploads: number; + fileDownloads: number; + }; + performance: { + totalOperationTime: number; + operationCount: number; + averageOperationTime: number; + }; + errors: { + count: number; + byCode: Record; + }; + cacheStats: { + hits: number; + misses: number; + }; + startTime: number; + } + + // Database Operations + export function initDB(connectionId?: string): Promise; + export function create>(collection: string, id: string, data: Omit, options?: { connectionId?: string, storeType?: StoreType }): Promise; + export function get>(collection: string, id: string, options?: { connectionId?: string; skipCache?: boolean, storeType?: StoreType }): Promise; + export function update>(collection: string, id: string, data: Partial>, options?: { connectionId?: string; upsert?: boolean, storeType?: StoreType }): Promise; + export function remove(collection: string, id: string, options?: { connectionId?: string, storeType?: StoreType }): Promise; + export function list>(collection: string, options?: ListOptions): Promise>; + export function query>(collection: string, filter: (doc: T) => boolean, options?: QueryOptions): Promise>; + + // Schema operations + export function defineSchema(collection: string, schema: CollectionSchema): void; + + // Transaction operations + export function createTransaction(connectionId?: string): Transaction; + export function commitTransaction(transaction: Transaction): Promise<{ success: boolean; results: any[] }>; + + // Index operations + export function createIndex(collection: string, field: string, options?: { connectionId?: string, storeType?: StoreType }): Promise; + + // Subscription API + export function subscribe(event: 'document:created' | 'document:updated' | 'document:deleted', callback: (data: any) => void): () => void; + + // File operations + export function uploadFile(fileData: Buffer, options?: { filename?: string; connectionId?: string; metadata?: Record; }): Promise; + export function getFile(cid: string, options?: { connectionId?: string }): Promise; + export function deleteFile(cid: string, options?: { connectionId?: string }): Promise; + + // Connection management + export function closeConnection(connectionId: string): Promise; + + // Metrics + export function getMetrics(): Metrics; + export function resetMetrics(): void; + + // Stop + export function stopDB(): Promise; // Logger export interface LoggerOptions { @@ -101,23 +224,38 @@ declare module "@debros/network" { service?: string; } export const logger: any; - export function createServiceLogger( - name: string, - options?: LoggerOptions - ): any; + export function createServiceLogger(name: string, options?: LoggerOptions): any; export function createDebrosLogger(options?: LoggerOptions): any; - // Crypto - export function getPrivateKey(): Promise; - // Default export const defaultExport: { config: DebrosConfig; validateConfig: typeof validateConfig; - ipfsService: typeof ipfsService; - orbitDBService: typeof orbitDBService; + db: { + init: typeof initDB; + create: typeof create; + get: typeof get; + update: typeof update; + remove: typeof remove; + list: typeof list; + query: typeof query; + createIndex: typeof createIndex; + createTransaction: typeof createTransaction; + commitTransaction: typeof commitTransaction; + subscribe: typeof subscribe; + uploadFile: typeof uploadFile; + getFile: typeof getFile; + deleteFile: typeof deleteFile; + defineSchema: typeof defineSchema; + getMetrics: typeof getMetrics; + resetMetrics: typeof resetMetrics; + closeConnection: typeof closeConnection; + stop: typeof stopDB; + ErrorCode: typeof ErrorCode; + StoreType: typeof StoreType; + }; logger: any; createServiceLogger: typeof createServiceLogger; }; export default defaultExport; -} +} \ No newline at end of file