Compare commits

..

No commits in common. "e3dfa052f8480e733299ecb85c56e75b0aa87aae" and "be3763d98862e8919c4a2e3fab5d1ce1cf65e798" have entirely different histories.

34 changed files with 3907 additions and 6428 deletions

111
README.md
View File

@ -26,21 +26,21 @@ npm install @debros/network
## Basic Usage ## Basic Usage
```typescript ```typescript
import { initDB, create, get, query, uploadFile, logger } from '@debros/network'; import { initDB, create, get, query, uploadFile, logger } from "@debros/network";
// Initialize the database service // Initialize the database service
async function startApp() { async function startApp() {
try { try {
// Initialize with default configuration // Initialize with default configuration
await initDB(); await initDB();
logger.info('Database initialized successfully'); logger.info("Database initialized successfully");
// Create a new user document // Create a new user document
const userId = 'user123'; const userId = 'user123';
const user = { const user = {
username: 'johndoe', username: 'johndoe',
walletAddress: '0x1234567890', walletAddress: '0x1234567890',
avatar: null, avatar: null
}; };
const result = await create('users', userId, user); const result = await create('users', userId, user);
@ -51,10 +51,10 @@ async function startApp() {
logger.info('User:', retrievedUser); logger.info('User:', retrievedUser);
// Query users with filtering // Query users with filtering
const activeUsers = await query('users', (user) => user.isActive === true, { const activeUsers = await query('users',
limit: 10, user => user.isActive === true,
sort: { field: 'createdAt', order: 'desc' }, { limit: 10, sort: { field: 'createdAt', order: 'desc' } }
}); );
logger.info(`Found ${activeUsers.total} active users`); logger.info(`Found ${activeUsers.total} active users`);
// Upload a file // Upload a file
@ -64,7 +64,7 @@ async function startApp() {
return true; return true;
} catch (error) { } catch (error) {
logger.error('Failed to start app:', error); logger.error("Failed to start app:", error);
throw error; throw error;
} }
} }
@ -77,27 +77,30 @@ startApp();
The library supports multiple OrbitDB store types, each optimized for different use cases: The library supports multiple OrbitDB store types, each optimized for different use cases:
```typescript ```typescript
import { create, get, update, StoreType } from '@debros/network'; import { create, get, update, StoreType } from "@debros/network";
// Default KeyValue store (for general use) // Default KeyValue store (for general use)
await create('users', 'user1', { name: 'Alice' }); await create('users', 'user1', { name: 'Alice' });
// Document store (better for complex documents with indexing) // Document store (better for complex documents with indexing)
await create( await create('posts', 'post1', { title: 'Hello', content: '...' },
'posts', { storeType: StoreType.DOCSTORE }
'post1',
{ title: 'Hello', content: '...' },
{ storeType: StoreType.DOCSTORE },
); );
// Feed/EventLog store (append-only, good for immutable logs) // Feed/EventLog store (append-only, good for immutable logs)
await create('events', 'evt1', { type: 'login', user: 'alice' }, { storeType: StoreType.FEED }); await create('events', 'evt1', { type: 'login', user: 'alice' },
{ storeType: StoreType.FEED }
);
// Counter store (for numeric counters) // Counter store (for numeric counters)
await create('stats', 'visits', { value: 0 }, { storeType: StoreType.COUNTER }); await create('stats', 'visits', { value: 0 },
{ storeType: StoreType.COUNTER }
);
// Increment a counter // Increment a counter
await update('stats', 'visits', { increment: 1 }, { storeType: StoreType.COUNTER }); await update('stats', 'visits', { increment: 1 },
{ storeType: StoreType.COUNTER }
);
// Get counter value // Get counter value
const stats = await get('stats', 'visits', { storeType: StoreType.COUNTER }); const stats = await get('stats', 'visits', { storeType: StoreType.COUNTER });
@ -109,7 +112,7 @@ console.log(`Visit count: ${stats.value}`);
### Schema Validation ### Schema Validation
```typescript ```typescript
import { defineSchema, create } from '@debros/network'; import { defineSchema, create } from "@debros/network";
// Define a schema // Define a schema
defineSchema('users', { defineSchema('users', {
@ -118,32 +121,32 @@ defineSchema('users', {
type: 'string', type: 'string',
required: true, required: true,
min: 3, min: 3,
max: 20, max: 20
}, },
email: { email: {
type: 'string', type: 'string',
pattern: '^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$', pattern: '^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$'
}, },
age: { age: {
type: 'number', type: 'number',
min: 18, min: 18
}, }
}, },
required: ['username'], required: ['username']
}); });
// Document creation will be validated against the schema // Document creation will be validated against the schema
await create('users', 'user1', { await create('users', 'user1', {
username: 'alice', username: 'alice',
email: 'alice@example.com', email: 'alice@example.com',
age: 25, age: 25
}); });
``` ```
### Transactions ### Transactions
```typescript ```typescript
import { createTransaction, commitTransaction } from '@debros/network'; import { createTransaction, commitTransaction } from "@debros/network";
// Create a transaction // Create a transaction
const transaction = createTransaction(); const transaction = createTransaction();
@ -159,39 +162,37 @@ const result = await commitTransaction(transaction);
console.log(`Transaction completed with ${result.results.length} operations`); console.log(`Transaction completed with ${result.results.length} operations`);
``` ```
### Event Subscriptions ### Subscriptions
```typescript ```typescript
import { subscribe } from '@debros/network'; import { subscribe } from "@debros/network";
// Subscribe to document changes // Subscribe to document changes
const unsubscribe = subscribe('document:created', (data) => { const unsubscribe = subscribe('document:created', (data) => {
console.log(`New document created in ${data.collection}:`, data.id); console.log(`New document created in ${data.collection}:`, data.id);
console.log('Document data:', data.document);
}); });
// Other event types // Later, unsubscribe
// subscribe('document:updated', (data) => { ... });
// subscribe('document:deleted', (data) => { ... });
// Later, unsubscribe when done
unsubscribe(); unsubscribe();
``` ```
### Pagination and Sorting ### Pagination and Sorting
```typescript ```typescript
import { list, query } from '@debros/network'; import { list, query } from "@debros/network";
// List with pagination and sorting // List with pagination and sorting
const page1 = await list('users', { const page1 = await list('users', {
limit: 10, limit: 10,
offset: 0, offset: 0,
sort: { field: 'createdAt', order: 'desc' }, sort: { field: 'createdAt', order: 'desc' }
}); });
// Query with pagination // Query with pagination
const results = await query('users', (user) => user.age > 21, { limit: 10, offset: 20 }); const results = await query('users',
(user) => user.age > 21,
{ limit: 10, offset: 20 }
);
console.log(`Found ${results.total} matches, showing ${results.documents.length}`); console.log(`Found ${results.total} matches, showing ${results.documents.length}`);
console.log(`Has more pages: ${results.hasMore}`); console.log(`Has more pages: ${results.hasMore}`);
@ -200,7 +201,7 @@ console.log(`Has more pages: ${results.hasMore}`);
### TypeScript Support ### TypeScript Support
```typescript ```typescript
import { get, update, query } from '@debros/network'; import { get, update, query } from "@debros/network";
interface User { interface User {
username: string; username: string;
@ -215,13 +216,15 @@ const user = await get<User>('users', 'user1');
await update<User>('users', 'user1', { age: 26 }); await update<User>('users', 'user1', { age: 26 });
const results = await query<User>('users', (user) => user.age > 21); const results = await query<User>('users',
(user) => user.age > 21
);
``` ```
### Connection Management ### Connection Management
```typescript ```typescript
import { initDB, closeConnection } from '@debros/network'; import { initDB, closeConnection } from "@debros/network";
// Create multiple connections // Create multiple connections
const conn1 = await initDB('connection1'); const conn1 = await initDB('connection1');
@ -234,6 +237,21 @@ await create('users', 'user1', { name: 'Alice' }, { connectionId: conn1 });
await closeConnection(conn1); await closeConnection(conn1);
``` ```
### Performance Metrics
```typescript
import { getMetrics, resetMetrics } from "@debros/network";
// Get performance metrics
const metrics = getMetrics();
console.log('Operations:', metrics.operations);
console.log('Avg operation time:', metrics.performance.averageOperationTime, 'ms');
console.log('Cache hits/misses:', metrics.cacheStats);
// Reset metrics (e.g., after deployment)
resetMetrics();
```
## API Reference ## API Reference
### Core Database Operations ### Core Database Operations
@ -267,10 +285,9 @@ await closeConnection(conn1);
- `Transaction.update<T>(collection, id, data): Transaction` - Add an update operation - `Transaction.update<T>(collection, id, data): Transaction` - Add an update operation
- `Transaction.delete(collection, id): Transaction` - Add a delete operation - `Transaction.delete(collection, id): Transaction` - Add a delete operation
### Event Subscriptions ### Subscriptions
- `subscribe(event, callback): () => void` - Subscribe to database events, returns unsubscribe function - `subscribe(event, callback): () => void` - Subscribe to events, returns unsubscribe function
- Event types: 'document:created', 'document:updated', 'document:deleted'
### File Operations ### File Operations
@ -285,17 +302,19 @@ await closeConnection(conn1);
### Indexes and Performance ### Indexes and Performance
- `createIndex(collection, field, options?): Promise<boolean>` - Create an index - `createIndex(collection, field, options?): Promise<boolean>` - Create an index
- `getMetrics(): Metrics` - Get performance metrics
- `resetMetrics(): void` - Reset performance metrics
## Configuration ## Configuration
```typescript ```typescript
import { config, initDB } from '@debros/network'; import { config, initDB } from "@debros/network";
// Configure (optional) // Configure (optional)
config.env.fingerprint = 'my-unique-app-id'; config.env.fingerprint = "my-unique-app-id";
config.env.port = 9000; config.env.port = 9000;
config.ipfs.blockstorePath = './custom-path/blockstore'; config.ipfs.blockstorePath = "./custom-path/blockstore";
config.orbitdb.directory = './custom-path/orbitdb'; config.orbitdb.directory = "./custom-path/orbitdb";
// Initialize with configuration // Initialize with configuration
await initDB(); await initDB();

View File

@ -0,0 +1,73 @@
import { initDB, create, get, update, remove, list, query, uploadFile, getFile, deleteFile, stopDB, logger } from '../src';
// Alternative import method
// import debros from '../src';
// const { db } = debros;
async function databaseExample() {
try {
logger.info('Starting database example...');
// Initialize the database service (abstracts away IPFS and OrbitDB)
await initDB();
logger.info('Database service initialized');
// Create a new user document
const userId = 'user123';
const userData = {
username: 'johndoe',
walletAddress: '0x1234567890',
avatar: null
};
const createResult = await create('users', userId, userData);
logger.info(`Created user with ID: ${createResult.id} and hash: ${createResult.hash}`);
// Retrieve the user
const user = await get('users', userId);
logger.info('Retrieved user:', user);
// Update the user
const updateResult = await update('users', userId, {
avatar: 'profile.jpg',
bio: 'Software developer'
});
logger.info(`Updated user with hash: ${updateResult.hash}`);
// Query users
const filteredUsers = await query('users', (user) => user.username === 'johndoe');
logger.info(`Found ${filteredUsers.length} matching users`);
// List all users
const allUsers = await list('users', { limit: 10 });
logger.info(`Retrieved ${allUsers.length} users`);
// Upload a file
const fileData = Buffer.from('This is a test file content');
const fileUpload = await uploadFile(fileData, { filename: 'test.txt' });
logger.info(`Uploaded file with CID: ${fileUpload.cid}`);
// Retrieve the file
const file = await getFile(fileUpload.cid);
logger.info('Retrieved file:', {
content: file.data.toString(),
metadata: file.metadata
});
// Delete the file
await deleteFile(fileUpload.cid);
logger.info('File deleted');
// Delete the user
await remove('users', userId);
logger.info('User deleted');
// Stop the database service
await stopDB();
logger.info('Database service stopped');
} catch (error) {
logger.error('Error in database example:', error);
}
}
databaseExample();

View File

@ -0,0 +1,266 @@
import {
initDB,
create,
get,
update,
remove,
list,
query,
uploadFile,
getFile,
createTransaction,
commitTransaction,
subscribe,
defineSchema,
getMetrics,
createIndex,
ErrorCode,
StoreType,
logger
} from '../src';
// Define a user schema
const userSchema = {
properties: {
username: {
type: 'string',
required: true,
min: 3,
max: 20,
},
email: {
type: 'string',
pattern: '^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$',
},
age: {
type: 'number',
min: 18,
max: 120,
},
roles: {
type: 'array',
items: {
type: 'string',
},
},
isActive: {
type: 'boolean',
},
},
required: ['username'],
};
async function advancedDatabaseExample() {
try {
logger.info('Starting advanced database example...');
// Initialize the database service with a specific connection ID
const connectionId = await initDB('example-connection');
logger.info(`Database service initialized with connection ID: ${connectionId}`);
// Define schema for validation
defineSchema('users', userSchema);
logger.info('User schema defined');
// Create index for faster queries (works with docstore)
await createIndex('users', 'username', { storeType: StoreType.DOCSTORE });
logger.info('Created index on username field');
// Set up subscription for real-time updates
const unsubscribe = subscribe('document:created', (data) => {
logger.info('Document created:', data.collection, data.id);
});
// Create multiple users using a transaction
const transaction = createTransaction(connectionId);
transaction
.create('users', 'user1', {
username: 'alice',
email: 'alice@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
})
.create('users', 'user2', {
username: 'bob',
email: 'bob@example.com',
age: 34,
roles: ['user'],
isActive: true,
})
.create('users', 'user3', {
username: 'charlie',
email: 'charlie@example.com',
age: 45,
roles: ['moderator', 'user'],
isActive: false,
});
const txResult = await commitTransaction(transaction);
logger.info(`Transaction committed with ${txResult.results.length} operations`);
// Get a specific user with type safety
interface User {
username: string;
email: string;
age: number;
roles: string[];
isActive: boolean;
createdAt: number;
updatedAt: number;
}
// Using KeyValue store (default)
const user = await get<User>('users', 'user1');
logger.info('Retrieved user from KeyValue store:', user?.username, user?.email);
// Using DocStore
await create<User>(
'users_docstore',
'user1',
{
username: 'alice_doc',
email: 'alice_doc@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
},
{ storeType: StoreType.DOCSTORE }
);
const docStoreUser = await get<User>('users_docstore', 'user1', { storeType: StoreType.DOCSTORE });
logger.info('Retrieved user from DocStore:', docStoreUser?.username, docStoreUser?.email);
// Using Feed/EventLog store
await create<User>(
'users_feed',
'user1',
{
username: 'alice_feed',
email: 'alice_feed@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
},
{ storeType: StoreType.FEED }
);
// Update the feed entry (creates a new entry with the same ID)
await update<User>(
'users_feed',
'user1',
{
roles: ['admin', 'user', 'tester'],
},
{ storeType: StoreType.FEED }
);
// List all entries in the feed
const feedUsers = await list<User>('users_feed', { storeType: StoreType.FEED });
logger.info(`Found ${feedUsers.total} feed entries:`);
feedUsers.documents.forEach(user => {
logger.info(`- ${user.username} (${user.email})`);
});
// Using Counter store
await create(
'counters',
'visitors',
{ value: 100 },
{ storeType: StoreType.COUNTER }
);
// Increment the counter
await update(
'counters',
'visitors',
{ increment: 5 },
{ storeType: StoreType.COUNTER }
);
// Get the counter value
const counter = await get('counters', 'visitors', { storeType: StoreType.COUNTER });
logger.info(`Counter value: ${counter?.value}`);
// Update a user in KeyValue store
await update<User>('users', 'user1', {
roles: ['admin', 'user', 'tester'],
});
// Query users from KeyValue store
const result = await query<User>(
'users',
(user) => user.isActive === true,
{
limit: 10,
offset: 0,
sort: { field: 'username', order: 'asc' },
}
);
logger.info(`Found ${result.total} active users:`);
result.documents.forEach(user => {
logger.info(`- ${user.username} (${user.email})`);
});
// List all users from KeyValue store with pagination
const allUsers = await list<User>('users', {
limit: 10,
offset: 0,
sort: { field: 'age', order: 'desc' },
});
logger.info(`Listed ${allUsers.total} users sorted by age (desc):`);
allUsers.documents.forEach(user => {
logger.info(`- ${user.username}: ${user.age} years old`);
});
// Upload a file with metadata
const fileData = Buffer.from('This is a test file with advanced features.');
const fileUpload = await uploadFile(fileData, {
filename: 'advanced-test.txt',
metadata: {
contentType: 'text/plain',
tags: ['example', 'test'],
owner: 'user1',
}
});
logger.info(`Uploaded file with CID: ${fileUpload.cid}`);
// Retrieve the file
const file = await getFile(fileUpload.cid);
logger.info('Retrieved file content:', file.data.toString());
logger.info('File metadata:', file.metadata);
// Get performance metrics
const metrics = getMetrics();
logger.info('Database metrics:', {
operations: metrics.operations,
performance: {
averageOperationTime: `${metrics.performance.averageOperationTime?.toFixed(2)}ms`,
},
cache: metrics.cacheStats,
});
// Unsubscribe from events
unsubscribe();
logger.info('Unsubscribed from events');
// Delete a user
await remove('users', 'user3');
logger.info('Deleted user user3');
return true;
} catch (error) {
if (error && typeof error === 'object' && 'code' in error) {
logger.error(`Database error (${error.code}):`, error.message);
} else {
logger.error('Error in advanced database example:', error);
}
return false;
}
}
advancedDatabaseExample();

49
examples/basic-node.ts Normal file
View File

@ -0,0 +1,49 @@
import { init as initIpfs, stop as stopIpfs } from '../src/ipfs/ipfsService';
import { init as initOrbitDB } from '../src/orbit/orbitDBService';
import { createServiceLogger } from '../src/utils/logger';
const appLogger = createServiceLogger('APP');
async function startNode() {
try {
appLogger.info('Starting Debros node...');
// Initialize IPFS
const ipfs = await initIpfs();
appLogger.info('IPFS node initialized');
// Initialize OrbitDB
const orbitdb = await initOrbitDB();
appLogger.info('OrbitDB initialized');
// Create a test database
const db = await orbitdb.open('test-db', {
type: 'feed',
overwrite: false,
});
appLogger.info(`Database opened: ${db.address.toString()}`);
// Add some data
const hash = await db.add('Hello from Debros Network!');
appLogger.info(`Added entry: ${hash}`);
// Query data
const allEntries = db.iterator({ limit: 10 }).collect();
appLogger.info('Database entries:', allEntries);
// Keep the process running
appLogger.info('Node is running. Press Ctrl+C to stop.');
// Handle shutdown
process.on('SIGINT', async () => {
appLogger.info('Shutting down...');
await orbitdb.stop();
await stopIpfs();
process.exit(0);
});
} catch (error) {
appLogger.error('Failed to start node:', error);
}
}
startNode();

View File

@ -1,6 +1,6 @@
{ {
"name": "@debros/network", "name": "@debros/network",
"version": "0.0.23-alpha", "version": "0.0.22-alpha",
"description": "Debros network core functionality for IPFS, libp2p and OrbitDB", "description": "Debros network core functionality for IPFS, libp2p and OrbitDB",
"type": "module", "type": "module",
"main": "dist/index.js", "main": "dist/index.js",

5935
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -57,8 +57,7 @@ export const defaultConfig: DebrosConfig = {
heartbeatInterval: parseInt(process.env.HEARTBEAT_INTERVAL || '5000'), heartbeatInterval: parseInt(process.env.HEARTBEAT_INTERVAL || '5000'),
staleTimeout: parseInt(process.env.STALE_PEER_TIMEOUT || '30000'), staleTimeout: parseInt(process.env.STALE_PEER_TIMEOUT || '30000'),
logInterval: parseInt(process.env.PEER_LOG_INTERVAL || '60000'), logInterval: parseInt(process.env.PEER_LOG_INTERVAL || '60000'),
publicAddress: publicAddress: process.env.NODE_PUBLIC_ADDRESS || `http://localhost:${process.env.PORT || 7777}`,
process.env.NODE_PUBLIC_ADDRESS || `http://localhost:${process.env.PORT || 7777}`,
}, },
}, },
orbitdb: { orbitdb: {

62
src/db/cache/cacheService.ts vendored Normal file
View File

@ -0,0 +1,62 @@
import NodeCache from 'node-cache';
import { createServiceLogger } from '../../utils/logger';
const logger = createServiceLogger('DB_CACHE');
// Cache for frequently accessed documents
const cache = new NodeCache({
stdTTL: 300, // 5 minutes default TTL
checkperiod: 60, // Check for expired items every 60 seconds
useClones: false, // Don't clone objects (for performance)
});
// Cache statistics
export const cacheStats = {
hits: 0,
misses: 0,
};
/**
* Get an item from cache
*/
export const get = <T>(key: string): T | undefined => {
const value = cache.get<T>(key);
if (value !== undefined) {
cacheStats.hits++;
return value;
}
cacheStats.misses++;
return undefined;
};
/**
* Set an item in cache
*/
export const set = <T>(key: string, value: T, ttl?: number): boolean => {
if (ttl === undefined) {
return cache.set(key, value);
}
return cache.set(key, value, ttl);
};
/**
* Delete an item from cache
*/
export const del = (key: string | string[]): number => {
return cache.del(key);
};
/**
* Flush the entire cache
*/
export const flushAll = (): void => {
cache.flushAll();
};
/**
* Reset cache statistics
*/
export const resetStats = (): void => {
cacheStats.hits = 0;
cacheStats.misses = 0;
};

View File

@ -9,93 +9,41 @@ const logger = createServiceLogger('DB_CONNECTION');
// Connection pool of database instances // Connection pool of database instances
const connections = new Map<string, DBConnection>(); const connections = new Map<string, DBConnection>();
let defaultConnectionId: string | null = null; let defaultConnectionId: string | null = null;
let cleanupInterval: NodeJS.Timeout | null = null;
// Configuration
const CONNECTION_TIMEOUT = 3600000; // 1 hour in milliseconds
const CLEANUP_INTERVAL = 300000; // 5 minutes in milliseconds
const MAX_RETRY_ATTEMPTS = 3;
const RETRY_DELAY = 2000; // 2 seconds
/** /**
* Initialize the database service * Initialize the database service
* This abstracts away OrbitDB and IPFS from the end user * This abstracts away OrbitDB and IPFS from the end user
*/ */
export const init = async (connectionId?: string): Promise<string> => { export const init = async (connectionId?: string): Promise<string> => {
// Start connection cleanup interval if not already running try {
if (!cleanupInterval) { const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
cleanupInterval = setInterval(cleanupStaleConnections, CLEANUP_INTERVAL); logger.info(`Initializing DB service with connection ID: ${connId}`);
logger.info(`Connection cleanup scheduled every ${CLEANUP_INTERVAL / 60000} minutes`);
}
const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; // Initialize IPFS
logger.info(`Initializing DB service with connection ID: ${connId}`); const ipfsInstance = await initIpfs();
let attempts = 0; // Initialize OrbitDB
let lastError: any = null; const orbitdbInstance = await initOrbitDB();
// Retry initialization with exponential backoff // Store connection in pool
while (attempts < MAX_RETRY_ATTEMPTS) { connections.set(connId, {
try { ipfs: ipfsInstance,
// Initialize IPFS with retry logic orbitdb: orbitdbInstance,
const ipfsInstance = await initIpfs().catch((error) => { timestamp: Date.now(),
logger.error( isActive: true,
`IPFS initialization failed (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS}):`, });
error,
);
throw error;
});
// Initialize OrbitDB // Set as default if no default exists
const orbitdbInstance = await initOrbitDB().catch((error) => { if (!defaultConnectionId) {
logger.error( defaultConnectionId = connId;
`OrbitDB initialization failed (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS}):`,
error,
);
throw error;
});
// Store connection in pool
connections.set(connId, {
ipfs: ipfsInstance,
orbitdb: orbitdbInstance,
timestamp: Date.now(),
isActive: true,
});
// Set as default if no default exists
if (!defaultConnectionId) {
defaultConnectionId = connId;
}
logger.info(`DB service initialized successfully with connection ID: ${connId}`);
return connId;
} catch (error) {
lastError = error;
attempts++;
if (attempts >= MAX_RETRY_ATTEMPTS) {
logger.error(
`Failed to initialize DB service after ${MAX_RETRY_ATTEMPTS} attempts:`,
error,
);
break;
}
// Wait before retrying with exponential backoff
const delay = RETRY_DELAY * Math.pow(2, attempts - 1);
logger.info(
`Retrying initialization in ${delay}ms (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS})...`,
);
await new Promise((resolve) => setTimeout(resolve, delay));
} }
}
throw new DBError( logger.info(`DB service initialized successfully with connection ID: ${connId}`);
ErrorCode.INITIALIZATION_FAILED, return connId;
`Failed to initialize database service after ${MAX_RETRY_ATTEMPTS} attempts`, } catch (error) {
lastError, logger.error('Failed to initialize DB service:', error);
); throw new DBError(ErrorCode.INITIALIZATION_FAILED, 'Failed to initialize database service', error);
}
}; };
/** /**
@ -107,66 +55,22 @@ export const getConnection = (connectionId?: string): DBConnection => {
if (!connId || !connections.has(connId)) { if (!connId || !connections.has(connId)) {
throw new DBError( throw new DBError(
ErrorCode.NOT_INITIALIZED, ErrorCode.NOT_INITIALIZED,
`No active database connection found${connectionId ? ` for ID: ${connectionId}` : ''}`, `No active database connection found${connectionId ? ` for ID: ${connectionId}` : ''}`
); );
} }
const connection = connections.get(connId)!; const connection = connections.get(connId)!;
if (!connection.isActive) { if (!connection.isActive) {
throw new DBError(ErrorCode.CONNECTION_ERROR, `Connection ${connId} is no longer active`); throw new DBError(
ErrorCode.CONNECTION_ERROR,
`Connection ${connId} is no longer active`
);
} }
// Update the timestamp to mark connection as recently used
connection.timestamp = Date.now();
return connection; return connection;
}; };
/**
* Cleanup stale connections to prevent memory leaks
*/
export const cleanupStaleConnections = (): void => {
try {
const now = Date.now();
let removedCount = 0;
// Identify stale connections (older than CONNECTION_TIMEOUT)
for (const [id, connection] of connections.entries()) {
if (connection.isActive && now - connection.timestamp > CONNECTION_TIMEOUT) {
logger.info(
`Closing stale connection: ${id} (inactive for ${(now - connection.timestamp) / 60000} minutes)`,
);
// Close connection asynchronously (don't await to avoid blocking)
closeConnection(id)
.then((success) => {
if (success) {
logger.info(`Successfully closed stale connection: ${id}`);
} else {
logger.warn(`Failed to close stale connection: ${id}`);
}
})
.catch((error) => {
logger.error(`Error closing stale connection ${id}:`, error);
});
removedCount++;
} else if (!connection.isActive) {
// Remove inactive connections from the map
connections.delete(id);
removedCount++;
}
}
if (removedCount > 0) {
logger.info(`Cleaned up ${removedCount} stale or inactive connections`);
}
} catch (error) {
logger.error('Error during connection cleanup:', error);
}
};
/** /**
* Close a specific database connection * Close a specific database connection
*/ */
@ -199,9 +103,6 @@ export const closeConnection = async (connectionId: string): Promise<boolean> =>
} }
} }
// Remove the connection from the pool
connections.delete(connectionId);
logger.info(`Closed database connection: ${connectionId}`); logger.info(`Closed database connection: ${connectionId}`);
return true; return true;
} catch (error) { } catch (error) {
@ -215,36 +116,23 @@ export const closeConnection = async (connectionId: string): Promise<boolean> =>
*/ */
export const stop = async (): Promise<void> => { export const stop = async (): Promise<void> => {
try { try {
// Stop the cleanup interval
if (cleanupInterval) {
clearInterval(cleanupInterval);
cleanupInterval = null;
}
// Close all connections // Close all connections
const promises: Promise<boolean>[] = [];
for (const [id, connection] of connections.entries()) { for (const [id, connection] of connections.entries()) {
if (connection.isActive) { if (connection.isActive) {
promises.push(closeConnection(id)); await closeConnection(id);
} }
} }
// Wait for all connections to close
await Promise.allSettled(promises);
// Stop IPFS if needed // Stop IPFS if needed
const ipfs = connections.get(defaultConnectionId || '')?.ipfs; const ipfs = connections.get(defaultConnectionId || '')?.ipfs;
if (ipfs) { if (ipfs) {
await stopIpfs(); await stopIpfs();
} }
// Clear all connections
connections.clear();
defaultConnectionId = null; defaultConnectionId = null;
logger.info('All DB connections stopped successfully'); logger.info('All DB connections stopped successfully');
} catch (error: any) { } catch (error) {
logger.error('Error stopping DB connections:', error); logger.error('Error stopping DB connections:', error);
throw new Error(`Failed to stop database connections: ${error.message}`); throw error;
} }
}; };

View File

@ -3,6 +3,7 @@ import { ErrorCode } from '../types';
// Re-export error code for easier access // Re-export error code for easier access
export { ErrorCode }; export { ErrorCode };
// Custom error class with error codes // Custom error class with error codes
export class DBError extends Error { export class DBError extends Error {
code: ErrorCode; code: ErrorCode;

View File

@ -1,23 +1,17 @@
import { createServiceLogger } from '../utils/logger'; import { createServiceLogger } from '../utils/logger';
import { init, closeConnection, stop } from './core/connection'; import { init, getConnection, closeConnection, stop } from './core/connection';
import { defineSchema } from './schema/validator'; import { defineSchema, validateDocument } from './schema/validator';
import * as cache from './cache/cacheService';
import * as events from './events/eventService'; import * as events from './events/eventService';
import { getMetrics, resetMetrics } from './metrics/metricsService';
import { Transaction } from './transactions/transactionService'; import { Transaction } from './transactions/transactionService';
import { import { StoreType, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions, ErrorCode } from './types';
StoreType,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
ErrorCode,
} from './types';
import { DBError } from './core/error'; import { DBError } from './core/error';
import { getStore } from './stores/storeFactory'; import { getStore } from './stores/storeFactory';
import { uploadFile, getFile, deleteFile } from './stores/fileStore'; import { uploadFile, getFile, deleteFile } from './stores/fileStore';
// Re-export imported functions // Re-export imported functions
export { init, closeConnection, stop, defineSchema, uploadFile, getFile, deleteFile }; export { init, closeConnection, stop, defineSchema, getMetrics, resetMetrics, uploadFile, getFile, deleteFile };
const logger = createServiceLogger('DB_SERVICE'); const logger = createServiceLogger('DB_SERVICE');
@ -31,9 +25,7 @@ export const createTransaction = (connectionId?: string): Transaction => {
/** /**
* Execute all operations in a transaction * Execute all operations in a transaction
*/ */
export const commitTransaction = async ( export const commitTransaction = async (transaction: Transaction): Promise<{ success: boolean; results: any[] }> => {
transaction: Transaction,
): Promise<{ success: boolean; results: any[] }> => {
try { try {
// Validate that we have operations // Validate that we have operations
const operations = transaction.getOperations(); const operations = transaction.getOperations();
@ -50,19 +42,29 @@ export const commitTransaction = async (
switch (operation.type) { switch (operation.type) {
case 'create': case 'create':
result = await create(operation.collection, operation.id, operation.data, { result = await create(
connectionId, operation.collection,
}); operation.id,
operation.data,
{ connectionId }
);
break; break;
case 'update': case 'update':
result = await update(operation.collection, operation.id, operation.data, { result = await update(
connectionId, operation.collection,
}); operation.id,
operation.data,
{ connectionId }
);
break; break;
case 'delete': case 'delete':
result = await remove(operation.collection, operation.id, { connectionId }); result = await remove(
operation.collection,
operation.id,
{ connectionId }
);
break; break;
} }
@ -83,7 +85,7 @@ export const create = async <T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>, data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: { connectionId?: string; storeType?: StoreType }, options?: { connectionId?: string, storeType?: StoreType }
): Promise<CreateResult> => { ): Promise<CreateResult> => {
const storeType = options?.storeType || StoreType.KEYVALUE; const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType); const store = getStore(storeType);
@ -96,7 +98,7 @@ export const create = async <T extends Record<string, any>>(
export const get = async <T extends Record<string, any>>( export const get = async <T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
options?: { connectionId?: string; skipCache?: boolean; storeType?: StoreType }, options?: { connectionId?: string; skipCache?: boolean, storeType?: StoreType }
): Promise<T | null> => { ): Promise<T | null> => {
const storeType = options?.storeType || StoreType.KEYVALUE; const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType); const store = getStore(storeType);
@ -110,7 +112,7 @@ export const update = async <T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>, data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: { connectionId?: string; upsert?: boolean; storeType?: StoreType }, options?: { connectionId?: string; upsert?: boolean, storeType?: StoreType }
): Promise<UpdateResult> => { ): Promise<UpdateResult> => {
const storeType = options?.storeType || StoreType.KEYVALUE; const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType); const store = getStore(storeType);
@ -123,7 +125,7 @@ export const update = async <T extends Record<string, any>>(
export const remove = async ( export const remove = async (
collection: string, collection: string,
id: string, id: string,
options?: { connectionId?: string; storeType?: StoreType }, options?: { connectionId?: string, storeType?: StoreType }
): Promise<boolean> => { ): Promise<boolean> => {
const storeType = options?.storeType || StoreType.KEYVALUE; const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType); const store = getStore(storeType);
@ -135,7 +137,7 @@ export const remove = async (
*/ */
export const list = async <T extends Record<string, any>>( export const list = async <T extends Record<string, any>>(
collection: string, collection: string,
options?: ListOptions & { storeType?: StoreType }, options?: ListOptions & { storeType?: StoreType }
): Promise<PaginatedResult<T>> => { ): Promise<PaginatedResult<T>> => {
const storeType = options?.storeType || StoreType.KEYVALUE; const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType); const store = getStore(storeType);
@ -151,7 +153,7 @@ export const list = async <T extends Record<string, any>>(
export const query = async <T extends Record<string, any>>( export const query = async <T extends Record<string, any>>(
collection: string, collection: string,
filter: (doc: T) => boolean, filter: (doc: T) => boolean,
options?: QueryOptions & { storeType?: StoreType }, options?: QueryOptions & { storeType?: StoreType }
): Promise<PaginatedResult<T>> => { ): Promise<PaginatedResult<T>> => {
const storeType = options?.storeType || StoreType.KEYVALUE; const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType); const store = getStore(storeType);
@ -167,7 +169,7 @@ export const query = async <T extends Record<string, any>>(
export const createIndex = async ( export const createIndex = async (
collection: string, collection: string,
field: string, field: string,
options?: { connectionId?: string; storeType?: StoreType }, options?: { connectionId?: string, storeType?: StoreType }
): Promise<boolean> => { ): Promise<boolean> => {
const storeType = options?.storeType || StoreType.KEYVALUE; const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType); const store = getStore(storeType);
@ -202,7 +204,9 @@ export default {
getFile, getFile,
deleteFile, deleteFile,
defineSchema, defineSchema,
getMetrics,
resetMetrics,
closeConnection, closeConnection,
stop, stop,
StoreType, StoreType
}; };

View File

@ -6,7 +6,10 @@ type DBEventType = 'document:created' | 'document:updated' | 'document:deleted';
/** /**
* Subscribe to database events * Subscribe to database events
*/ */
export const subscribe = (event: DBEventType, callback: (data: any) => void): (() => void) => { export const subscribe = (
event: DBEventType,
callback: (data: any) => void
): () => void => {
dbEvents.on(event, callback); dbEvents.on(event, callback);
// Return unsubscribe function // Return unsubscribe function
@ -18,7 +21,10 @@ export const subscribe = (event: DBEventType, callback: (data: any) => void): ((
/** /**
* Emit an event * Emit an event
*/ */
export const emit = (event: DBEventType, data: any): void => { export const emit = (
event: DBEventType,
data: any
): void => {
dbEvents.emit(event, data); dbEvents.emit(event, data);
}; };

View File

@ -0,0 +1,101 @@
import { Metrics, ErrorCode } from '../types';
import { DBError } from '../core/error';
import * as cacheService from '../cache/cacheService';
// Metrics tracking
const metrics: Metrics = {
operations: {
creates: 0,
reads: 0,
updates: 0,
deletes: 0,
queries: 0,
fileUploads: 0,
fileDownloads: 0,
},
performance: {
totalOperationTime: 0,
operationCount: 0,
},
errors: {
count: 0,
byCode: {},
},
cacheStats: {
hits: 0,
misses: 0,
},
startTime: Date.now(),
};
/**
* Measure performance of a database operation
*/
export async function measurePerformance<T>(operation: () => Promise<T>): Promise<T> {
const startTime = performance.now();
try {
const result = await operation();
const endTime = performance.now();
metrics.performance.totalOperationTime += (endTime - startTime);
metrics.performance.operationCount++;
return result;
} catch (error) {
const endTime = performance.now();
metrics.performance.totalOperationTime += (endTime - startTime);
metrics.performance.operationCount++;
// Track error metrics
metrics.errors.count++;
if (error instanceof DBError) {
metrics.errors.byCode[error.code] = (metrics.errors.byCode[error.code] || 0) + 1;
}
throw error;
}
}
/**
* Get database metrics
*/
export const getMetrics = (): Metrics => {
return {
...metrics,
// Sync cache stats
cacheStats: cacheService.cacheStats,
// Calculate some derived metrics
performance: {
...metrics.performance,
averageOperationTime: metrics.performance.operationCount > 0 ?
metrics.performance.totalOperationTime / metrics.performance.operationCount :
0
}
};
};
/**
* Reset metrics (useful for testing)
*/
export const resetMetrics = (): void => {
metrics.operations = {
creates: 0,
reads: 0,
updates: 0,
deletes: 0,
queries: 0,
fileUploads: 0,
fileDownloads: 0,
};
metrics.performance = {
totalOperationTime: 0,
operationCount: 0,
};
metrics.errors = {
count: 0,
byCode: {},
};
// Reset cache stats too
cacheService.resetStats();
metrics.startTime = Date.now();
};

View File

@ -29,10 +29,11 @@ export const validateDocument = (collection: string, document: any): boolean =>
if (schema.required) { if (schema.required) {
for (const field of schema.required) { for (const field of schema.required) {
if (document[field] === undefined) { if (document[field] === undefined) {
throw new DBError(ErrorCode.INVALID_SCHEMA, `Required field '${field}' is missing`, { throw new DBError(
collection, ErrorCode.INVALID_SCHEMA,
document, `Required field '${field}' is missing`,
}); { collection, document }
);
} }
} }
} }
@ -44,10 +45,11 @@ export const validateDocument = (collection: string, document: any): boolean =>
// Skip undefined optional fields // Skip undefined optional fields
if (value === undefined) { if (value === undefined) {
if (definition.required) { if (definition.required) {
throw new DBError(ErrorCode.INVALID_SCHEMA, `Required field '${field}' is missing`, { throw new DBError(
collection, ErrorCode.INVALID_SCHEMA,
document, `Required field '${field}' is missing`,
}); { collection, document }
);
} }
continue; continue;
} }
@ -56,11 +58,11 @@ export const validateDocument = (collection: string, document: any): boolean =>
switch (definition.type) { switch (definition.type) {
case 'string': case 'string':
if (typeof value !== 'string') { if (typeof value !== 'string') {
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be a string`, { throw new DBError(
collection, ErrorCode.INVALID_SCHEMA,
field, `Field '${field}' must be a string`,
value, { collection, field, value }
}); );
} }
// Pattern validation // Pattern validation
@ -68,7 +70,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Field '${field}' does not match pattern: ${definition.pattern}`, `Field '${field}' does not match pattern: ${definition.pattern}`,
{ collection, field, value }, { collection, field, value }
); );
} }
@ -77,7 +79,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at least ${definition.min} characters`, `Field '${field}' must have at least ${definition.min} characters`,
{ collection, field, value }, { collection, field, value }
); );
} }
@ -85,18 +87,18 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at most ${definition.max} characters`, `Field '${field}' must have at most ${definition.max} characters`,
{ collection, field, value }, { collection, field, value }
); );
} }
break; break;
case 'number': case 'number':
if (typeof value !== 'number') { if (typeof value !== 'number') {
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be a number`, { throw new DBError(
collection, ErrorCode.INVALID_SCHEMA,
field, `Field '${field}' must be a number`,
value, { collection, field, value }
}); );
} }
// Range validation // Range validation
@ -104,7 +106,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be at least ${definition.min}`, `Field '${field}' must be at least ${definition.min}`,
{ collection, field, value }, { collection, field, value }
); );
} }
@ -112,28 +114,28 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be at most ${definition.max}`, `Field '${field}' must be at most ${definition.max}`,
{ collection, field, value }, { collection, field, value }
); );
} }
break; break;
case 'boolean': case 'boolean':
if (typeof value !== 'boolean') { if (typeof value !== 'boolean') {
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be a boolean`, { throw new DBError(
collection, ErrorCode.INVALID_SCHEMA,
field, `Field '${field}' must be a boolean`,
value, { collection, field, value }
}); );
} }
break; break;
case 'array': case 'array':
if (!Array.isArray(value)) { if (!Array.isArray(value)) {
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be an array`, { throw new DBError(
collection, ErrorCode.INVALID_SCHEMA,
field, `Field '${field}' must be an array`,
value, { collection, field, value }
}); );
} }
// Length validation // Length validation
@ -141,7 +143,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at least ${definition.min} items`, `Field '${field}' must have at least ${definition.min} items`,
{ collection, field, value }, { collection, field, value }
); );
} }
@ -149,7 +151,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at most ${definition.max} items`, `Field '${field}' must have at most ${definition.max} items`,
{ collection, field, value }, { collection, field, value }
); );
} }
@ -166,7 +168,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a string`, `Item at index ${i} in field '${field}' must be a string`,
{ collection, field, item }, { collection, field, item }
); );
} }
break; break;
@ -176,7 +178,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a number`, `Item at index ${i} in field '${field}' must be a number`,
{ collection, field, item }, { collection, field, item }
); );
} }
break; break;
@ -186,7 +188,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a boolean`, `Item at index ${i} in field '${field}' must be a boolean`,
{ collection, field, item }, { collection, field, item }
); );
} }
break; break;
@ -197,11 +199,11 @@ export const validateDocument = (collection: string, document: any): boolean =>
case 'object': case 'object':
if (typeof value !== 'object' || value === null || Array.isArray(value)) { if (typeof value !== 'object' || value === null || Array.isArray(value)) {
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be an object`, { throw new DBError(
collection, ErrorCode.INVALID_SCHEMA,
field, `Field '${field}' must be an object`,
value, { collection, field, value }
}); );
} }
// Nested object validation would go here in a real implementation // Nested object validation would go here in a real implementation
@ -212,7 +214,7 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError( throw new DBError(
ErrorCode.INVALID_SCHEMA, ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be one of: ${definition.enum.join(', ')}`, `Field '${field}' must be one of: ${definition.enum.join(', ')}`,
{ collection, field, value }, { collection, field, value }
); );
} }
break; break;

View File

@ -1,413 +0,0 @@
import { createServiceLogger } from '../../utils/logger';
import {
ErrorCode,
StoreType,
StoreOptions,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
acquireLock,
releaseLock,
isLocked,
} from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as events from '../events/eventService';
/**
* Abstract store implementation with common CRUD operations
* Specific store types extend this class and customize only what's different
*/
export abstract class AbstractStore implements BaseStore {
protected logger = createServiceLogger(this.getLoggerName());
protected storeType: StoreType;
constructor(storeType: StoreType) {
this.storeType = storeType;
}
/**
* Must be implemented by subclasses to provide the logger name
*/
protected abstract getLoggerName(): string;
/**
* Create a new document in the specified collection
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions,
): Promise<CreateResult> {
// Create a lock ID for this resource to prevent concurrent operations
const lockId = `${collection}:${id}:create`;
// Try to acquire a lock
if (!acquireLock(lockId)) {
this.logger.warn(
`Concurrent operation detected on ${collection}/${id}, waiting for completion`,
);
// Wait until the lock is released (poll every 100ms for max 5 seconds)
let attempts = 0;
while (isLocked(lockId) && attempts < 50) {
await new Promise((resolve) => setTimeout(resolve, 100));
attempts++;
}
if (isLocked(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Timed out waiting for lock on ${collection}/${id}`,
);
}
// Try to acquire lock again
if (!acquireLock(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to acquire lock on ${collection}/${id}`,
);
}
}
try {
const db = await openStore(collection, this.storeType, options);
// Prepare document for storage with validation
const document = this.prepareCreateDocument<T>(collection, id, data);
// Add to database - this will be overridden by specific implementations if needed
const hash = await this.performCreate(db, id, document);
// Emit change event
events.emit('document:created', { collection, id, document });
this.logger.info(`Created document in ${collection} with id ${id}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`Error creating document in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to create document in ${collection}: ${error instanceof Error ? error.message : String(error)}`,
error,
);
} finally {
// Always release the lock when done
releaseLock(lockId);
}
}
/**
* Prepare a document for creation - can be overridden by subclasses
*/
protected prepareCreateDocument<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
): any {
return prepareDocument<T>(collection, data);
}
/**
* Perform the actual create operation - should be implemented by subclasses
*/
protected abstract performCreate(db: any, id: string, document: any): Promise<string>;
/**
* Get a document by ID from a collection
*/
async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean },
): Promise<T | null> {
try {
const db = await openStore(collection, this.storeType, options);
const document = await this.performGet<T>(db, id);
return document;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`Error getting document ${id} from ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to get document ${id} from ${collection}: ${error instanceof Error ? error.message : String(error)}`,
error,
);
}
}
/**
* Perform the actual get operation - should be implemented by subclasses
*/
protected abstract performGet<T>(db: any, id: string): Promise<T | null>;
/**
* Update a document in a collection
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean },
): Promise<UpdateResult> {
// Create a lock ID for this resource to prevent concurrent operations
const lockId = `${collection}:${id}:update`;
// Try to acquire a lock
if (!acquireLock(lockId)) {
this.logger.warn(
`Concurrent operation detected on ${collection}/${id}, waiting for completion`,
);
// Wait until the lock is released (poll every 100ms for max 5 seconds)
let attempts = 0;
while (isLocked(lockId) && attempts < 50) {
await new Promise((resolve) => setTimeout(resolve, 100));
attempts++;
}
if (isLocked(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Timed out waiting for lock on ${collection}/${id}`,
);
}
// Try to acquire lock again
if (!acquireLock(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to acquire lock on ${collection}/${id}`,
);
}
}
try {
const db = await openStore(collection, this.storeType, options);
const existing = await this.performGet<T>(db, id);
if (!existing && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Document ${id} not found in ${collection}`,
{ collection, id },
);
}
// Prepare document for update with validation
const document = this.prepareUpdateDocument<T>(collection, id, data, existing || undefined);
// Update in database
const hash = await this.performUpdate(db, id, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existing });
this.logger.info(`Updated document in ${collection} with id ${id}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`Error updating document in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to update document in ${collection}: ${error instanceof Error ? error.message : String(error)}`,
error,
);
} finally {
// Always release the lock when done
releaseLock(lockId);
}
}
/**
* Prepare a document for update - can be overridden by subclasses
*/
protected prepareUpdateDocument<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
existing?: T,
): any {
return prepareDocument<T>(
collection,
data as unknown as Omit<T, 'createdAt' | 'updatedAt'>,
existing,
);
}
/**
* Perform the actual update operation - should be implemented by subclasses
*/
protected abstract performUpdate(db: any, id: string, document: any): Promise<string>;
/**
* Delete a document from a collection
*/
async remove(collection: string, id: string, options?: StoreOptions): Promise<boolean> {
// Create a lock ID for this resource to prevent concurrent operations
const lockId = `${collection}:${id}:remove`;
// Try to acquire a lock
if (!acquireLock(lockId)) {
this.logger.warn(
`Concurrent operation detected on ${collection}/${id}, waiting for completion`,
);
// Wait until the lock is released (poll every 100ms for max 5 seconds)
let attempts = 0;
while (isLocked(lockId) && attempts < 50) {
await new Promise((resolve) => setTimeout(resolve, 100));
attempts++;
}
if (isLocked(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Timed out waiting for lock on ${collection}/${id}`,
);
}
// Try to acquire lock again
if (!acquireLock(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to acquire lock on ${collection}/${id}`,
);
}
}
try {
const db = await openStore(collection, this.storeType, options);
// Get the document before deleting for the event
const document = await this.performGet(db, id);
if (!document) {
this.logger.warn(`Document ${id} not found in ${collection} for deletion`);
return false;
}
// Delete from database
await this.performRemove(db, id);
// Emit change event
events.emit('document:deleted', { collection, id, document });
this.logger.info(`Deleted document in ${collection} with id ${id}`);
return true;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`Error deleting document in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to delete document in ${collection}: ${error instanceof Error ? error.message : String(error)}`,
error,
);
} finally {
// Always release the lock when done
releaseLock(lockId);
}
}
/**
* Perform the actual remove operation - should be implemented by subclasses
*/
protected abstract performRemove(db: any, id: string): Promise<void>;
/**
* Apply sorting to a list of documents
*/
protected applySorting<T extends Record<string, any>>(
documents: T[],
options?: ListOptions | QueryOptions,
): T[] {
if (!options?.sort) {
return documents;
}
const { field, order } = options.sort;
return [...documents].sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc'
? valueA.getTime() - valueB.getTime()
: valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc'
? String(valueA).localeCompare(String(valueB))
: String(valueB).localeCompare(String(valueA));
});
}
/**
* Apply pagination to a list of documents
*/
protected applyPagination<T>(
documents: T[],
options?: ListOptions | QueryOptions,
): {
documents: T[];
total: number;
hasMore: boolean;
} {
const total = documents.length;
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore,
};
}
/**
* List all documents in a collection with pagination
*/
abstract list<T extends Record<string, any>>(
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>>;
/**
* Query documents in a collection with filtering and pagination
*/
abstract query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions,
): Promise<PaginatedResult<T>>;
/**
* Create an index for a collection to speed up queries
*/
abstract createIndex(collection: string, field: string, options?: StoreOptions): Promise<boolean>;
}

View File

@ -1,5 +1,6 @@
import { createServiceLogger } from '../../utils/logger'; import { createServiceLogger } from '../../utils/logger';
import { openDB } from '../../orbit/orbitDBService'; import { openDB } from '../../orbit/orbitDBService';
import { getConnection } from '../core/connection';
import { validateDocument } from '../schema/validator'; import { validateDocument } from '../schema/validator';
import { import {
ErrorCode, ErrorCode,
@ -19,6 +20,9 @@ const logger = createServiceLogger('DB_STORE');
* Base Store interface that all store implementations should extend * Base Store interface that all store implementations should extend
*/ */
export interface BaseStore { export interface BaseStore {
/**
* Create a new document
*/
create<T extends Record<string, any>>( create<T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
@ -26,12 +30,18 @@ export interface BaseStore {
options?: StoreOptions, options?: StoreOptions,
): Promise<CreateResult>; ): Promise<CreateResult>;
/**
* Get a document by ID
*/
get<T extends Record<string, any>>( get<T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
options?: StoreOptions & { skipCache?: boolean }, options?: StoreOptions & { skipCache?: boolean },
): Promise<T | null>; ): Promise<T | null>;
/**
* Update a document
*/
update<T extends Record<string, any>>( update<T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
@ -39,19 +49,31 @@ export interface BaseStore {
options?: StoreOptions & { upsert?: boolean }, options?: StoreOptions & { upsert?: boolean },
): Promise<UpdateResult>; ): Promise<UpdateResult>;
/**
* Delete a document
*/
remove(collection: string, id: string, options?: StoreOptions): Promise<boolean>; remove(collection: string, id: string, options?: StoreOptions): Promise<boolean>;
/**
* List all documents in a collection with pagination
*/
list<T extends Record<string, any>>( list<T extends Record<string, any>>(
collection: string, collection: string,
options?: ListOptions, options?: ListOptions,
): Promise<PaginatedResult<T>>; ): Promise<PaginatedResult<T>>;
/**
* Query documents in a collection with filtering and pagination
*/
query<T extends Record<string, any>>( query<T extends Record<string, any>>(
collection: string, collection: string,
filter: (doc: T) => boolean, filter: (doc: T) => boolean,
options?: QueryOptions, options?: QueryOptions,
): Promise<PaginatedResult<T>>; ): Promise<PaginatedResult<T>>;
/**
* Create an index for a collection to speed up queries
*/
createIndex(collection: string, field: string, options?: StoreOptions): Promise<boolean>; createIndex(collection: string, field: string, options?: StoreOptions): Promise<boolean>;
} }
@ -64,22 +86,16 @@ export async function openStore(
options?: StoreOptions, options?: StoreOptions,
): Promise<any> { ): Promise<any> {
try { try {
// Log minimal connection info to avoid leaking sensitive data const connection = getConnection(options?.connectionId);
logger.info( logger.info(`Connection for ${collection}:`, connection);
`Opening ${storeType} store for collection: ${collection} (connection ID: ${options?.connectionId || 'default'})`,
);
return await openDB(collection, storeType).catch((err) => { return await openDB(collection, storeType).catch((err) => {
throw new Error(`OrbitDB openDB failed: ${err.message}`); throw new Error(`OrbitDB openDB failed: ${err.message}`);
}); });
} catch (error) { } catch (error) {
logger.error(`Error opening ${storeType} store for collection ${collection}:`, error); logger.error(`Error opening ${storeType} store for collection ${collection}:`, error);
// Add more context to the error for improved debugging
const errorMessage = error instanceof Error ? error.message : String(error);
throw new DBError( throw new DBError(
ErrorCode.OPERATION_FAILED, ErrorCode.OPERATION_FAILED,
`Failed to open ${storeType} store for collection ${collection}: ${errorMessage}`, `Failed to open ${storeType} store for collection ${collection}`,
error, error,
); );
} }
@ -100,28 +116,27 @@ export function prepareDocument<T extends Record<string, any>>(
Object.entries(data).map(([key, value]) => [key, value === undefined ? null : value]), Object.entries(data).map(([key, value]) => [key, value === undefined ? null : value]),
) as Omit<T, 'createdAt' | 'updatedAt'>; ) as Omit<T, 'createdAt' | 'updatedAt'>;
// Prepare document for validation
let docToValidate: T;
// If it's an update to an existing document // If it's an update to an existing document
if (existingDoc) { if (existingDoc) {
docToValidate = { const doc = {
...existingDoc, ...existingDoc,
...sanitizedData, ...sanitizedData,
updatedAt: timestamp, updatedAt: timestamp,
} as T; } as T;
} else {
// Otherwise it's a new document // Validate the document against its schema
docToValidate = { validateDocument(collection, doc);
...sanitizedData, return doc;
createdAt: timestamp,
updatedAt: timestamp,
} as unknown as T;
} }
// Validate the document BEFORE processing // Otherwise it's a new document
validateDocument(collection, docToValidate); const doc = {
...sanitizedData,
createdAt: timestamp,
updatedAt: timestamp,
} as unknown as T;
// Return the validated document // Validate the document against its schema
return docToValidate; validateDocument(collection, doc);
return doc;
} }

View File

@ -1,17 +1,10 @@
import { createServiceLogger } from '../../utils/logger'; import { createServiceLogger } from '../../utils/logger';
import { import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
ErrorCode,
StoreType,
StoreOptions,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
} from '../types';
import { DBError } from '../core/error'; import { DBError } from '../core/error';
import { BaseStore, openStore } from './baseStore'; import { BaseStore, openStore } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService'; import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('COUNTER_STORE'); const logger = createServiceLogger('COUNTER_STORE');
@ -27,43 +20,45 @@ export class CounterStore implements BaseStore {
collection: string, collection: string,
id: string, id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>, data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions, options?: StoreOptions
): Promise<CreateResult> { ): Promise<CreateResult> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.COUNTER, options); try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Extract value from data, default to 0 // Extract value from data, default to 0
const value = const value = typeof data === 'object' && data !== null && 'value' in data ?
typeof data === 'object' && data !== null && 'value' in data ? Number(data.value) : 0; Number(data.value) : 0;
// Set the counter value // Set the counter value
const hash = await db.set(value); const hash = await db.set(value);
// Construct document representation // Construct document representation
const document = { const document = {
id, id,
value, value,
createdAt: Date.now(), createdAt: Date.now(),
updatedAt: Date.now(), updatedAt: Date.now()
}; };
// Emit change event // Add to cache
events.emit('document:created', { collection, id, document }); const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
logger.info(`Set counter in ${collection} to ${value}`); // Emit change event
return { id, hash }; events.emit('document:created', { collection, id, document });
} catch (error: unknown) {
if (error instanceof DBError) { logger.info(`Set counter in ${collection} to ${value}`);
throw error; return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error setting counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to set counter in ${collection}`, error);
} }
});
logger.error(`Error setting counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to set counter in ${collection}`,
error,
);
}
} }
/** /**
@ -72,37 +67,47 @@ export class CounterStore implements BaseStore {
async get<T extends Record<string, any>>( async get<T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
options?: StoreOptions & { skipCache?: boolean }, options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> { ): Promise<T | null> {
try { return measurePerformance(async () => {
// Note: for counters, id is not used in the underlying store (there's only one counter per db) try {
// but we use it for consistency with the API // Note: for counters, id is not used in the underlying store (there's only one counter per db)
// but we use it for consistency with the API
const db = await openStore(collection, StoreType.COUNTER, options); // Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
// Get the counter value const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// Construct document representation // Get the counter value
const document = { const value = await db.value();
id,
value,
updatedAt: Date.now(),
} as unknown as T;
return document; // Construct document representation
} catch (error: unknown) { const document = {
if (error instanceof DBError) { id,
throw error; value,
updatedAt: Date.now()
} as unknown as T;
// Update cache
cache.set(cacheKey, document);
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting counter from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get counter from ${collection}`, error);
} }
});
logger.error(`Error getting counter from ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to get counter from ${collection}`,
error,
);
}
} }
/** /**
@ -112,122 +117,130 @@ export class CounterStore implements BaseStore {
collection: string, collection: string,
id: string, id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>, data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }, options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> { ): Promise<UpdateResult> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.COUNTER, options); try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Get current value before update // Get current value before update
const currentValue = await db.value(); const currentValue = await db.value();
// Extract value from data // Extract value from data
let value: number; let value: number;
let operation: 'increment' | 'decrement' | 'set' = 'set'; let operation: 'increment' | 'decrement' | 'set' = 'set';
// Check what kind of operation we're doing // Check what kind of operation we're doing
if (typeof data === 'object' && data !== null) { if (typeof data === 'object' && data !== null) {
if ('increment' in data) { if ('increment' in data) {
value = Number(data.increment); value = Number(data.increment);
operation = 'increment'; operation = 'increment';
} else if ('decrement' in data) { } else if ('decrement' in data) {
value = Number(data.decrement); value = Number(data.decrement);
operation = 'decrement'; operation = 'decrement';
} else if ('value' in data) { } else if ('value' in data) {
value = Number(data.value); value = Number(data.value);
operation = 'set'; operation = 'set';
} else {
value = 0;
operation = 'set';
}
} else { } else {
value = 0; value = 0;
operation = 'set'; operation = 'set';
} }
} else {
value = 0; // Update the counter
operation = 'set'; let hash;
let newValue;
switch (operation) {
case 'increment':
hash = await db.inc(value);
newValue = currentValue + value;
break;
case 'decrement':
hash = await db.inc(-value); // Counter store uses inc with negative value
newValue = currentValue - value;
break;
case 'set':
hash = await db.set(value);
newValue = value;
break;
}
// Construct document representation
const document = {
id,
value: newValue,
updatedAt: Date.now()
};
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', {
collection,
id,
document,
previous: { id, value: currentValue }
});
logger.info(`Updated counter in ${collection} from ${currentValue} to ${newValue}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update counter in ${collection}`, error);
} }
});
// Update the counter
let hash;
let newValue;
switch (operation) {
case 'increment':
hash = await db.inc(value);
newValue = currentValue + value;
break;
case 'decrement':
hash = await db.inc(-value); // Counter store uses inc with negative value
newValue = currentValue - value;
break;
case 'set':
hash = await db.set(value);
newValue = value;
break;
}
// Construct document representation
const document = {
id,
value: newValue,
updatedAt: Date.now(),
};
// Emit change event
events.emit('document:updated', {
collection,
id,
document,
previous: { id, value: currentValue },
});
logger.info(`Updated counter in ${collection} from ${currentValue} to ${newValue}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to update counter in ${collection}`,
error,
);
}
} }
/** /**
* Delete/reset counter * Delete/reset counter
*/ */
async remove(collection: string, id: string, options?: StoreOptions): Promise<boolean> { async remove(
try { collection: string,
const db = await openStore(collection, StoreType.COUNTER, options); id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Get the current value for the event // Get the current value for the event
const currentValue = await db.value(); const currentValue = await db.value();
// Reset the counter to 0 (counters can't be truly deleted) // Reset the counter to 0 (counters can't be truly deleted)
await db.set(0); await db.set(0);
// Emit change event // Remove from cache
events.emit('document:deleted', { const cacheKey = `${collection}:${id}`;
collection, cache.del(cacheKey);
id,
document: { id, value: currentValue },
});
logger.info(`Reset counter in ${collection} from ${currentValue} to 0`); // Emit change event
return true; events.emit('document:deleted', {
} catch (error: unknown) { collection,
if (error instanceof DBError) { id,
throw error; document: { id, value: currentValue }
});
logger.info(`Reset counter in ${collection} from ${currentValue} to 0`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error resetting counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to reset counter in ${collection}`, error);
} }
});
logger.error(`Error resetting counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to reset counter in ${collection}`,
error,
);
}
} }
/** /**
@ -235,36 +248,34 @@ export class CounterStore implements BaseStore {
*/ */
async list<T extends Record<string, any>>( async list<T extends Record<string, any>>(
collection: string, collection: string,
options?: ListOptions, options?: ListOptions
): Promise<PaginatedResult<T>> { ): Promise<PaginatedResult<T>> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.COUNTER, options); try {
const value = await db.value(); const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// For counter stores, we just return one document with the counter value // For counter stores, we just return one document with the counter value
const document = { const document = {
id: '0', // Default ID since counters don't have IDs id: '0', // Default ID since counters don't have IDs
value, value,
updatedAt: Date.now(), updatedAt: Date.now()
} as unknown as T; } as unknown as T;
return { return {
documents: [document], documents: [document],
total: 1, total: 1,
hasMore: false, hasMore: false
}; };
} catch (error: unknown) { } catch (error) {
if (error instanceof DBError) { if (error instanceof DBError) {
throw error; throw error;
}
logger.error(`Error listing counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list counter in ${collection}`, error);
} }
});
logger.error(`Error listing counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to list counter in ${collection}`,
error,
);
}
} }
/** /**
@ -273,48 +284,48 @@ export class CounterStore implements BaseStore {
async query<T extends Record<string, any>>( async query<T extends Record<string, any>>(
collection: string, collection: string,
filter: (doc: T) => boolean, filter: (doc: T) => boolean,
options?: QueryOptions, options?: QueryOptions
): Promise<PaginatedResult<T>> { ): Promise<PaginatedResult<T>> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.COUNTER, options); try {
const value = await db.value(); const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// Create document // Create document
const document = { const document = {
id: '0', // Default ID since counters don't have IDs id: '0', // Default ID since counters don't have IDs
value, value,
updatedAt: Date.now(), updatedAt: Date.now()
} as unknown as T; } as unknown as T;
// Apply filter // Apply filter
const documents = filter(document) ? [document] : []; const documents = filter(document) ? [document] : [];
return { return {
documents, documents,
total: documents.length, total: documents.length,
hasMore: false, hasMore: false
}; };
} catch (error: unknown) { } catch (error) {
if (error instanceof DBError) { if (error instanceof DBError) {
throw error; throw error;
}
logger.error(`Error querying counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query counter in ${collection}`, error);
} }
});
logger.error(`Error querying counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to query counter in ${collection}`,
error,
);
}
} }
/** /**
* Create an index - not applicable for counter stores * Create an index - not applicable for counter stores
*/ */
async createIndex(collection: string, _field: string, _options?: StoreOptions): Promise<boolean> { async createIndex(
logger.warn( collection: string,
`Index creation not supported for counter collections, ignoring request for ${collection}`, field: string,
); options?: StoreOptions
): Promise<boolean> {
logger.warn(`Index creation not supported for counter collections, ignoring request for ${collection}`);
return false; return false;
} }
} }

View File

@ -1,79 +1,186 @@
import { StoreType, StoreOptions, PaginatedResult, QueryOptions, ListOptions } from '../types'; import { createServiceLogger } from '../../utils/logger';
import { AbstractStore } from './abstractStore'; import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { prepareDocument } from './baseStore'; import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('DOCSTORE');
/** /**
* DocStore implementation * DocStore implementation
* Uses OrbitDB's document store which allows for more complex document storage with indices * Uses OrbitDB's document store which allows for more complex document storage with indices
*/ */
export class DocStore extends AbstractStore { export class DocStore implements BaseStore {
constructor() {
super(StoreType.DOCSTORE);
}
protected getLoggerName(): string {
return 'DOCSTORE';
}
/** /**
* Prepare a document for creation - override to add _id which is required for docstore * Create a new document in the specified collection
*/ */
protected prepareCreateDocument<T extends Record<string, any>>( async create<T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>, data: Omit<T, 'createdAt' | 'updatedAt'>,
): any { options?: StoreOptions
return { ): Promise<CreateResult> {
_id: id, return measurePerformance(async () => {
...prepareDocument<T>(collection, data), try {
}; const db = await openStore(collection, StoreType.DOCSTORE, options);
// Prepare document for storage (including _id which is required for docstore)
const document = {
_id: id,
...prepareDocument<T>(collection, data)
};
// Add to database
const hash = await db.put(document);
// Add to cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Created document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create document in ${collection}`, error);
}
});
} }
/** /**
* Prepare a document for update - override to add _id which is required for docstore * Get a document by ID from a collection
*/ */
protected prepareUpdateDocument<T extends Record<string, any>>( async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.DOCSTORE, options);
const document = await db.get(id) as T | null;
// Update cache if document exists
if (document) {
cache.set(cacheKey, document);
}
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting document ${id} from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get document ${id} from ${collection}`, error);
}
});
}
/**
* Update a document in a collection
*/
async update<T extends Record<string, any>>(
collection: string, collection: string,
id: string, id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>, data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
existing?: T, options?: StoreOptions & { upsert?: boolean }
): any { ): Promise<UpdateResult> {
return { return measurePerformance(async () => {
_id: id, try {
...prepareDocument<T>( const db = await openStore(collection, StoreType.DOCSTORE, options);
collection, const existing = await db.get(id) as T | null;
data as unknown as Omit<T, 'createdAt' | 'updatedAt'>,
existing, if (!existing && !options?.upsert) {
), throw new DBError(
}; ErrorCode.DOCUMENT_NOT_FOUND,
`Document ${id} not found in ${collection}`,
{ collection, id }
);
}
// Prepare document for update
const document = {
_id: id,
...prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existing || undefined)
};
// Update in database
const hash = await db.put(document);
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existing });
logger.info(`Updated document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update document in ${collection}`, error);
}
});
} }
/** /**
* Implementation for the DocStore create operation * Delete a document from a collection
*/ */
protected async performCreate(db: any, id: string, document: any): Promise<string> { async remove(
return await db.put(document); collection: string,
} id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
/** // Get the document before deleting for the event
* Implementation for the DocStore get operation const document = await db.get(id);
*/
protected async performGet<T>(db: any, id: string): Promise<T | null> {
return (await db.get(id)) as T | null;
}
/** // Delete from database
* Implementation for the DocStore update operation await db.del(id);
*/
protected async performUpdate(db: any, id: string, document: any): Promise<string> {
return await db.put(document);
}
/** // Remove from cache
* Implementation for the DocStore remove operation const cacheKey = `${collection}:${id}`;
*/ cache.del(cacheKey);
protected async performRemove(db: any, id: string): Promise<void> {
await db.del(id); // Emit change event
events.emit('document:deleted', { collection, id, document });
logger.info(`Deleted document in ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete document in ${collection}`, error);
}
});
} }
/** /**
@ -81,26 +188,64 @@ export class DocStore extends AbstractStore {
*/ */
async list<T extends Record<string, any>>( async list<T extends Record<string, any>>(
collection: string, collection: string,
options?: ListOptions, options?: ListOptions
): Promise<PaginatedResult<T>> { ): Promise<PaginatedResult<T>> {
try { return measurePerformance(async () => {
const db = await this.openStore(collection, options); try {
const allDocs = await db.query((_doc: any) => true); const db = await openStore(collection, StoreType.DOCSTORE, options);
const allDocs = await db.query((doc: any) => true);
// Map the documents to include id let documents = allDocs.map((doc: any) => ({
let documents = allDocs.map((doc: any) => ({ id: doc._id,
id: doc._id, ...doc
...doc, })) as T[];
})) as T[];
// Apply sorting // Sort if requested
documents = this.applySorting(documents, options); if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Apply pagination // Handle different data types for sorting
return this.applyPagination(documents, options); if (typeof valueA === 'string' && typeof valueB === 'string') {
} catch (error) { return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
this.handleError(`Error listing documents in ${collection}`, error); } else if (typeof valueA === 'number' && typeof valueB === 'number') {
} return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list documents in ${collection}`, error);
}
});
} }
/** /**
@ -109,73 +254,97 @@ export class DocStore extends AbstractStore {
async query<T extends Record<string, any>>( async query<T extends Record<string, any>>(
collection: string, collection: string,
filter: (doc: T) => boolean, filter: (doc: T) => boolean,
options?: QueryOptions, options?: QueryOptions
): Promise<PaginatedResult<T>> { ): Promise<PaginatedResult<T>> {
try { return measurePerformance(async () => {
const db = await this.openStore(collection, options); try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
// Apply filter using docstore's query capability // Apply filter using docstore's query capability
const filtered = await db.query((doc: any) => filter(doc as T)); const filtered = await db.query((doc: any) => filter(doc as T));
// Map the documents to include id // Map the documents to include id
let documents = filtered.map((doc: any) => ({ let documents = filtered.map((doc: any) => ({
id: doc._id, id: doc._id,
...doc, ...doc
})) as T[]; })) as T[];
// Apply sorting // Sort if requested
documents = this.applySorting(documents, options); if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Apply pagination // Handle different data types for sorting
return this.applyPagination(documents, options); if (typeof valueA === 'string' && typeof valueB === 'string') {
} catch (error) { return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
this.handleError(`Error querying documents in ${collection}`, error); } else if (typeof valueA === 'number' && typeof valueB === 'number') {
} return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query documents in ${collection}`, error);
}
});
} }
/** /**
* Create an index for a collection to speed up queries * Create an index for a collection to speed up queries
* DocStore has built-in indexing capabilities * DocStore has built-in indexing capabilities
*/ */
async createIndex(collection: string, field: string, options?: StoreOptions): Promise<boolean> { async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
try { try {
const db = await this.openStore(collection, options); const db = await openStore(collection, StoreType.DOCSTORE, options);
// DocStore supports indexing, so we create the index // DocStore supports indexing, so we create the index
if (typeof db.createIndex === 'function') { if (typeof db.createIndex === 'function') {
await db.createIndex(field); await db.createIndex(field);
this.logger.info(`Index created on ${field} for collection ${collection}`); logger.info(`Index created on ${field} for collection ${collection}`);
return true; return true;
} }
this.logger.info( logger.info(`Index creation not supported for this DB instance, but DocStore has built-in indices`);
`Index creation not supported for this DB instance, but DocStore has built-in indices`,
);
return true; return true;
} catch (error) { } catch (error) {
this.handleError(`Error creating index for ${collection}`, error); if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating index for ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create index for ${collection}`, error);
} }
} }
/**
* Helper to open a store of the correct type
*/
private async openStore(collection: string, options?: StoreOptions): Promise<any> {
const { openStore } = await import('./baseStore');
return await openStore(collection, this.storeType, options);
}
/**
* Helper to handle errors consistently
*/
private handleError(message: string, error: any): never {
const { DBError, ErrorCode } = require('../core/error');
if (error instanceof DBError) {
throw error;
}
this.logger.error(`${message}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `${message}: ${error.message}`, error);
}
} }

View File

@ -1,17 +1,10 @@
import { createServiceLogger } from '../../utils/logger'; import { createServiceLogger } from '../../utils/logger';
import { import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
ErrorCode,
StoreType,
StoreOptions,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
} from '../types';
import { DBError } from '../core/error'; import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore'; import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService'; import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('FEED_STORE'); const logger = createServiceLogger('FEED_STORE');
@ -28,37 +21,39 @@ export class FeedStore implements BaseStore {
collection: string, collection: string,
id: string, id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>, data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions, options?: StoreOptions
): Promise<CreateResult> { ): Promise<CreateResult> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.FEED, options); try {
const db = await openStore(collection, StoreType.FEED, options);
// Prepare document for storage with ID // Prepare document for storage with ID
const document = { const document = {
id, id,
...prepareDocument<T>(collection, data), ...prepareDocument<T>(collection, data)
}; };
// Add to database // Add to database
const hash = await db.add(document); const hash = await db.add(document);
// Emit change event // Feed entries are append-only, so we use a different cache key pattern
events.emit('document:created', { collection, id, document, hash }); const cacheKey = `${collection}:entry:${hash}`;
cache.set(cacheKey, document);
logger.info(`Created entry in feed ${collection} with id ${id} and hash ${hash}`); // Emit change event
return { id, hash }; events.emit('document:created', { collection, id, document, hash });
} catch (error: unknown) {
if (error instanceof DBError) { logger.info(`Created entry in feed ${collection} with id ${id} and hash ${hash}`);
throw error; return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating entry in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create entry in feed ${collection}`, error);
} }
});
logger.error(`Error creating entry in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to create entry in feed ${collection}`,
error,
);
}
} }
/** /**
@ -68,32 +63,42 @@ export class FeedStore implements BaseStore {
async get<T extends Record<string, any>>( async get<T extends Record<string, any>>(
collection: string, collection: string,
hash: string, hash: string,
options?: StoreOptions & { skipCache?: boolean }, options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> { ): Promise<T | null> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.FEED, options); try {
// Check cache first if not skipped
const cacheKey = `${collection}:entry:${hash}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
// Get the specific entry by hash const db = await openStore(collection, StoreType.FEED, options);
const entry = await db.get(hash);
if (!entry) { // Get the specific entry by hash
return null; const entry = await db.get(hash);
if (!entry) {
return null;
}
const document = entry.payload.value as T;
// Update cache
cache.set(cacheKey, document);
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting entry ${hash} from feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get entry ${hash} from feed ${collection}`, error);
} }
});
const document = entry.payload.value as T;
return document;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting entry ${hash} from feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to get entry ${hash} from feed ${collection}`,
error,
);
}
} }
/** /**
@ -105,115 +110,115 @@ export class FeedStore implements BaseStore {
collection: string, collection: string,
id: string, id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>, data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }, options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> { ): Promise<UpdateResult> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.FEED, options); try {
const db = await openStore(collection, StoreType.FEED, options);
const entries = await db.iterator({ limit: -1 }).collect(); // Find the latest entry with the given id
const existingEntryIndex = entries.findIndex((e: any) => { const entries = await db.iterator({ limit: -1 }).collect();
const value = e.payload.value; const existingEntryIndex = entries.findIndex((e: any) => {
return value && value.id === id; const value = e.payload.value;
}); return value && value.id === id;
});
if (existingEntryIndex === -1 && !options?.upsert) { if (existingEntryIndex === -1 && !options?.upsert) {
throw new DBError( throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND, ErrorCode.DOCUMENT_NOT_FOUND,
`Entry with id ${id} not found in feed ${collection}`, `Entry with id ${id} not found in feed ${collection}`,
{ collection, id }, { collection, id }
); );
}
const existingEntry = existingEntryIndex !== -1 ? entries[existingEntryIndex].payload.value : null;
// Prepare document with update
const document = {
id,
...prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existingEntry),
// Add reference to the previous entry if it exists
previousEntryHash: existingEntryIndex !== -1 ? entries[existingEntryIndex].hash : undefined
};
// Add to feed (append new entry)
const hash = await db.add(document);
// Cache the new entry
const cacheKey = `${collection}:entry:${hash}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existingEntry });
logger.info(`Updated entry in feed ${collection} with id ${id} (new hash: ${hash})`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating entry in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update entry in feed ${collection}`, error);
} }
});
const existingEntry =
existingEntryIndex !== -1 ? entries[existingEntryIndex].payload.value : null;
// Prepare document with update
const document = {
id,
...prepareDocument<T>(
collection,
data as unknown as Omit<T, 'createdAt' | 'updatedAt'>,
existingEntry,
),
// Add reference to the previous entry if it exists
previousEntryHash: existingEntryIndex !== -1 ? entries[existingEntryIndex].hash : undefined,
};
// Add to feed (append new entry)
const hash = await db.add(document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existingEntry });
logger.info(`Updated entry in feed ${collection} with id ${id} (new hash: ${hash})`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating entry in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to update entry in feed ${collection}`,
error,
);
}
} }
/** /**
* Delete is not supported in feed/eventlog stores since they're append-only * Delete is not supported in feed/eventlog stores since they're append-only
* Instead, we add a "tombstone" entry that marks the entry as deleted * Instead, we add a "tombstone" entry that marks the entry as deleted
*/ */
async remove(collection: string, id: string, options?: StoreOptions): Promise<boolean> { async remove(
try { collection: string,
const db = await openStore(collection, StoreType.FEED, options); id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Find the entry with the given id // Find the entry with the given id
const entries = await db.iterator({ limit: -1 }).collect(); const entries = await db.iterator({ limit: -1 }).collect();
const existingEntryIndex = entries.findIndex((e: any) => { const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value; const value = e.payload.value;
return value && value.id === id; return value && value.id === id;
}); });
if (existingEntryIndex === -1) { if (existingEntryIndex === -1) {
throw new DBError( throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND, ErrorCode.DOCUMENT_NOT_FOUND,
`Entry with id ${id} not found in feed ${collection}`, `Entry with id ${id} not found in feed ${collection}`,
{ collection, id }, { collection, id }
); );
}
const existingEntry = entries[existingEntryIndex].payload.value;
const existingHash = entries[existingEntryIndex].hash;
// Add a "tombstone" entry that marks this as deleted
const tombstone = {
id,
deleted: true,
deletedAt: Date.now(),
previousEntryHash: existingHash
};
await db.add(tombstone);
// Emit change event
events.emit('document:deleted', { collection, id, document: existingEntry });
logger.info(`Marked entry as deleted in feed ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error marking entry as deleted in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to mark entry as deleted in feed ${collection}`, error);
} }
});
const existingEntry = entries[existingEntryIndex].payload.value;
const existingHash = entries[existingEntryIndex].hash;
// Add a "tombstone" entry that marks this as deleted
const tombstone = {
id,
deleted: true,
deletedAt: Date.now(),
previousEntryHash: existingHash,
};
await db.add(tombstone);
// Emit change event
events.emit('document:deleted', { collection, id, document: existingEntry });
logger.info(`Marked entry as deleted in feed ${collection} with id ${id}`);
return true;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error marking entry as deleted in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to mark entry as deleted in feed ${collection}`,
error,
);
}
} }
/** /**
@ -222,90 +227,87 @@ export class FeedStore implements BaseStore {
*/ */
async list<T extends Record<string, any>>( async list<T extends Record<string, any>>(
collection: string, collection: string,
options?: ListOptions, options?: ListOptions
): Promise<PaginatedResult<T>> { ): Promise<PaginatedResult<T>> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.FEED, options); try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries // Get all entries
const entries = await db.iterator({ limit: -1 }).collect(); const entries = await db.iterator({ limit: -1 }).collect();
// Group by ID and keep only the latest entry for each ID // Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries // Also filter out tombstone entries
const latestEntries = new Map<string, any>(); const latestEntries = new Map<string, any>();
for (const entry of entries) { for (const entry of entries) {
const value = entry.payload.value; const value = entry.payload.value;
if (!value || value.deleted) continue; if (!value || value.deleted) continue;
const id = value.id; const id = value.id;
if (!id) continue; if (!id) continue;
// If we already have an entry with this ID, check which is newer // If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) { if (latestEntries.has(id)) {
const existing = latestEntries.get(id); const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) { if (value.updatedAt > existing.value.updatedAt) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
latestEntries.set(id, { hash: entry.hash, value }); latestEntries.set(id, { hash: entry.hash, value });
} }
} else {
latestEntries.set(id, { hash: entry.hash, value });
} }
// Convert to array of documents
let documents = Array.from(latestEntries.values()).map(entry => ({
...entry.value
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing entries in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list entries in feed ${collection}`, error);
} }
});
// Convert to array of documents
let documents = Array.from(latestEntries.values()).map((entry) => ({
...entry.value,
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc'
? valueA.getTime() - valueB.getTime()
: valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc'
? String(valueA).localeCompare(String(valueB))
: String(valueB).localeCompare(String(valueA));
});
}
// Apply pagination
const total = documents.length;
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore,
};
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing entries in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to list entries in feed ${collection}`,
error,
);
}
} }
/** /**
@ -315,101 +317,100 @@ export class FeedStore implements BaseStore {
async query<T extends Record<string, any>>( async query<T extends Record<string, any>>(
collection: string, collection: string,
filter: (doc: T) => boolean, filter: (doc: T) => boolean,
options?: QueryOptions, options?: QueryOptions
): Promise<PaginatedResult<T>> { ): Promise<PaginatedResult<T>> {
try { return measurePerformance(async () => {
const db = await openStore(collection, StoreType.FEED, options); try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries // Get all entries
const entries = await db.iterator({ limit: -1 }).collect(); const entries = await db.iterator({ limit: -1 }).collect();
// Group by ID and keep only the latest entry for each ID // Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries // Also filter out tombstone entries
const latestEntries = new Map<string, any>(); const latestEntries = new Map<string, any>();
for (const entry of entries) { for (const entry of entries) {
const value = entry.payload.value; const value = entry.payload.value;
if (!value || value.deleted) continue; if (!value || value.deleted) continue;
const id = value.id; const id = value.id;
if (!id) continue; if (!id) continue;
// If we already have an entry with this ID, check which is newer // If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) { if (latestEntries.has(id)) {
const existing = latestEntries.get(id); const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) { if (value.updatedAt > existing.value.updatedAt) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
latestEntries.set(id, { hash: entry.hash, value }); latestEntries.set(id, { hash: entry.hash, value });
} }
} else {
latestEntries.set(id, { hash: entry.hash, value });
} }
// Convert to array of documents and apply filter
let filtered = Array.from(latestEntries.values())
.filter(entry => filter(entry.value as T))
.map(entry => ({
...entry.value
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
filtered.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = filtered.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = filtered.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying entries in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query entries in feed ${collection}`, error);
} }
});
// Convert to array of documents and apply filter
let filtered = Array.from(latestEntries.values())
.filter((entry) => filter(entry.value as T))
.map((entry) => ({
...entry.value,
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
filtered.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc'
? valueA.getTime() - valueB.getTime()
: valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc'
? String(valueA).localeCompare(String(valueB))
: String(valueB).localeCompare(String(valueA));
});
}
// Apply pagination
const total = filtered.length;
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = filtered.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore,
};
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying entries in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to query entries in feed ${collection}`,
error,
);
}
} }
/** /**
* Create an index for a collection - not supported for feeds * Create an index for a collection - not supported for feeds
*/ */
async createIndex(collection: string, _field: string, _options?: StoreOptions): Promise<boolean> { async createIndex(
logger.warn( collection: string,
`Index creation not supported for feed collections, ignoring request for ${collection}`, field: string,
); options?: StoreOptions
): Promise<boolean> {
logger.warn(`Index creation not supported for feed collections, ignoring request for ${collection}`);
return false; return false;
} }
} }

View File

@ -1,13 +1,13 @@
import { createServiceLogger } from '../../utils/logger'; import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, FileUploadResult, FileResult } from '../types'; import { ErrorCode, StoreType, FileUploadResult, FileResult } from '../types';
import { DBError } from '../core/error'; import { DBError } from '../core/error';
import { getConnection } from '../core/connection';
import { openStore } from './baseStore'; import { openStore } from './baseStore';
import { getHelia } from '../../ipfs/ipfsService'; import { getHelia } from '../../ipfs/ipfsService';
import { CreateResult, StoreOptions } from '../types'; import { measurePerformance } from '../metrics/metricsService';
async function readAsyncIterableToBuffer( // Helper function to convert AsyncIterable to Buffer
asyncIterable: AsyncIterable<Uint8Array>, async function readAsyncIterableToBuffer(asyncIterable: AsyncIterable<Uint8Array>): Promise<Buffer> {
): Promise<Buffer> {
const chunks: Uint8Array[] = []; const chunks: Uint8Array[] = [];
for await (const chunk of asyncIterable) { for await (const chunk of asyncIterable) {
chunks.push(chunk); chunks.push(chunk);
@ -26,143 +26,124 @@ export const uploadFile = async (
filename?: string; filename?: string;
connectionId?: string; connectionId?: string;
metadata?: Record<string, any>; metadata?: Record<string, any>;
},
): Promise<FileUploadResult> => {
try {
const ipfs = getHelia();
if (!ipfs) {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
// Add to IPFS
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const cid = await fs.addBytes(fileData);
const cidStr = cid.toString();
// Store metadata
const filesDb = await openStore('_files', StoreType.KEYVALUE);
await filesDb.put(cidStr, {
filename: options?.filename,
size: fileData.length,
uploadedAt: Date.now(),
...options?.metadata,
});
logger.info(`Uploaded file with CID: ${cidStr}`);
return { cid: cidStr };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error('Error uploading file:', error);
throw new DBError(ErrorCode.OPERATION_FAILED, 'Failed to upload file', error);
} }
): Promise<FileUploadResult> => {
return measurePerformance(async () => {
try {
const connection = getConnection(options?.connectionId);
const ipfs = getHelia();
if (!ipfs) {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
// Add to IPFS
const blockstore = ipfs.blockstore;
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const cid = await fs.addBytes(fileData);
const cidStr = cid.toString();
// Store metadata
const filesDb = await openStore('_files', StoreType.KEYVALUE);
await filesDb.put(cidStr, {
filename: options?.filename,
size: fileData.length,
uploadedAt: Date.now(),
...options?.metadata
});
logger.info(`Uploaded file with CID: ${cidStr}`);
return { cid: cidStr };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error('Error uploading file:', error);
throw new DBError(ErrorCode.OPERATION_FAILED, 'Failed to upload file', error);
}
});
}; };
/** /**
* Get a file from IPFS by CID * Get a file from IPFS by CID
*/ */
export const getFile = async (cid: string): Promise<FileResult> => { export const getFile = async (
try { cid: string,
const ipfs = getHelia(); options?: { connectionId?: string }
if (!ipfs) { ): Promise<FileResult> => {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available'); return measurePerformance(async () => {
}
// Get from IPFS
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const { CID } = await import('multiformats/cid');
const resolvedCid = CID.parse(cid);
try { try {
// Convert AsyncIterable to Buffer const connection = getConnection(options?.connectionId);
const bytes = await readAsyncIterableToBuffer(fs.cat(resolvedCid)); const ipfs = getHelia();
if (!ipfs) {
// Get metadata if available throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
let metadata = null;
try {
const filesDb = await openStore('_files', StoreType.KEYVALUE);
metadata = await filesDb.get(cid);
} catch (_err) {
// Metadata might not exist, continue without it
} }
return { data: bytes, metadata }; // Get from IPFS
} catch (error) { const unixfs = await import('@helia/unixfs');
throw new DBError(ErrorCode.FILE_NOT_FOUND, `File with CID ${cid} not found`, error); const fs = unixfs.unixfs(ipfs);
} const { CID } = await import('multiformats/cid');
} catch (error: unknown) { const resolvedCid = CID.parse(cid);
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting file with CID ${cid}:`, error); try {
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get file with CID ${cid}`, error); // Convert AsyncIterable to Buffer
} const bytes = await readAsyncIterableToBuffer(fs.cat(resolvedCid));
// Get metadata if available
let metadata = null;
try {
const filesDb = await openStore('_files', StoreType.KEYVALUE);
metadata = await filesDb.get(cid);
} catch (err) {
// Metadata might not exist, continue without it
}
return { data: bytes, metadata };
} catch (error) {
throw new DBError(ErrorCode.FILE_NOT_FOUND, `File with CID ${cid} not found`, error);
}
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting file with CID ${cid}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get file with CID ${cid}`, error);
}
});
}; };
/** /**
* Delete a file from IPFS by CID * Delete a file from IPFS by CID
*/ */
export const deleteFile = async (cid: string): Promise<boolean> => { export const deleteFile = async (
try { cid: string,
// Delete metadata options?: { connectionId?: string }
): Promise<boolean> => {
return measurePerformance(async () => {
try { try {
const filesDb = await openStore('_files', StoreType.KEYVALUE); const connection = getConnection(options?.connectionId);
await filesDb.del(cid);
} catch (_err) {
// Ignore if metadata doesn't exist
}
logger.info(`Deleted file with CID: ${cid}`); // Delete metadata
return true; try {
} catch (error: unknown) { const filesDb = await openStore('_files', StoreType.KEYVALUE);
if (error instanceof DBError) { await filesDb.del(cid);
throw error; } catch (err) {
} // Ignore if metadata doesn't exist
}
logger.error(`Error deleting file with CID ${cid}:`, error); // In IPFS we can't really delete files, but we can remove them from our local blockstore
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete file with CID ${cid}`, error); // and they will eventually be garbage collected if no one else has pinned them
} logger.info(`Deleted file with CID: ${cid}`);
}; return true;
} catch (error) {
export const create = async <T extends Record<string, any>>( if (error instanceof DBError) {
collection: string, throw error;
id: string, }
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions, logger.error(`Error deleting file with CID ${cid}:`, error);
): Promise<CreateResult> => { throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete file with CID ${cid}`, error);
try { }
const db = await openStore(collection, StoreType.KEYVALUE, options); });
// Prepare document for storage with ID
// const document = {
// id,
// ...prepareDocument<T>(collection, data)
// };
const document = { id, ...data };
// Add to database
const hash = await db.add(document);
// Emit change event
// events.emit('document:created', { collection, id, document, hash });
logger.info(`Created entry in file ${collection} with id ${id} and hash ${hash}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating entry in file ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to create entry in file ${collection}`,
error,
);
}
}; };

View File

@ -1,45 +1,179 @@
import { StoreType, StoreOptions, PaginatedResult, QueryOptions, ListOptions } from '../types'; import { createServiceLogger } from '../../utils/logger';
import { AbstractStore } from './abstractStore'; import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { DBError, ErrorCode } from '../core/error'; import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('KEYVALUE_STORE');
/** /**
* KeyValue Store implementation using the AbstractStore base class * KeyValue Store implementation
*/ */
export class KeyValueStore extends AbstractStore { export class KeyValueStore implements BaseStore {
constructor() { /**
super(StoreType.KEYVALUE); * Create a new document in the specified collection
} */
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
protected getLoggerName(): string { // Prepare document for storage
return 'KEYVALUE_STORE'; const document = prepareDocument<T>(collection, data);
// Add to database
const hash = await db.put(id, document);
// Add to cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Created document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create document in ${collection}`, error);
}
});
} }
/** /**
* Implementation for the KeyValue store create operation * Get a document by ID from a collection
*/ */
protected async performCreate(db: any, id: string, document: any): Promise<string> { async get<T extends Record<string, any>>(
return await db.put(id, document); collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.KEYVALUE, options);
const document = await db.get(id) as T | null;
// Update cache if document exists
if (document) {
cache.set(cacheKey, document);
}
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting document ${id} from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get document ${id} from ${collection}`, error);
}
});
} }
/** /**
* Implementation for the KeyValue store get operation * Update a document in a collection
*/ */
protected async performGet<T>(db: any, id: string): Promise<T | null> { async update<T extends Record<string, any>>(
return (await db.get(id)) as T | null; collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
const existing = await db.get(id) as T | null;
if (!existing && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Document ${id} not found in ${collection}`,
{ collection, id }
);
}
// Prepare document for update
const document = prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existing || undefined);
// Update in database
const hash = await db.put(id, document);
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existing });
logger.info(`Updated document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update document in ${collection}`, error);
}
});
} }
/** /**
* Implementation for the KeyValue store update operation * Delete a document from a collection
*/ */
protected async performUpdate(db: any, id: string, document: any): Promise<string> { async remove(
return await db.put(id, document); collection: string,
} id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
/** // Get the document before deleting for the event
* Implementation for the KeyValue store remove operation const document = await db.get(id);
*/
protected async performRemove(db: any, id: string): Promise<void> { // Delete from database
await db.del(id); await db.del(id);
// Remove from cache
const cacheKey = `${collection}:${id}`;
cache.del(cacheKey);
// Emit change event
events.emit('document:deleted', { collection, id, document });
logger.info(`Deleted document in ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete document in ${collection}`, error);
}
});
} }
/** /**
@ -47,26 +181,64 @@ export class KeyValueStore extends AbstractStore {
*/ */
async list<T extends Record<string, any>>( async list<T extends Record<string, any>>(
collection: string, collection: string,
options?: ListOptions, options?: ListOptions
): Promise<PaginatedResult<T>> { ): Promise<PaginatedResult<T>> {
try { return measurePerformance(async () => {
const db = await this.openStore(collection, options); try {
const all = await db.all(); const db = await openStore(collection, StoreType.KEYVALUE, options);
const all = await db.all();
// Convert the key-value pairs to an array of documents with IDs let documents = Object.entries(all).map(([key, value]) => ({
let documents = Object.entries(all).map(([key, value]) => ({ id: key,
id: key, ...(value as any)
...(value as any), })) as T[];
})) as T[];
// Apply sorting // Sort if requested
documents = this.applySorting(documents, options); if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Apply pagination // Handle different data types for sorting
return this.applyPagination(documents, options); if (typeof valueA === 'string' && typeof valueB === 'string') {
} catch (error) { return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
this.handleError(`Error listing documents in ${collection}`, error); } else if (typeof valueA === 'number' && typeof valueB === 'number') {
} return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list documents in ${collection}`, error);
}
});
} }
/** /**
@ -75,62 +247,89 @@ export class KeyValueStore extends AbstractStore {
async query<T extends Record<string, any>>( async query<T extends Record<string, any>>(
collection: string, collection: string,
filter: (doc: T) => boolean, filter: (doc: T) => boolean,
options?: QueryOptions, options?: QueryOptions
): Promise<PaginatedResult<T>> { ): Promise<PaginatedResult<T>> {
try { return measurePerformance(async () => {
const db = await this.openStore(collection, options); try {
const all = await db.all(); const db = await openStore(collection, StoreType.KEYVALUE, options);
const all = await db.all();
// Apply filter // Apply filter
let filtered = Object.entries(all) let filtered = Object.entries(all)
.filter(([_, value]) => filter(value as T)) .filter(([_, value]) => filter(value as T))
.map(([key, value]) => ({ .map(([key, value]) => ({
id: key, id: key,
...(value as any), ...(value as any)
})) as T[]; })) as T[];
// Apply sorting // Sort if requested
filtered = this.applySorting(filtered, options); if (options?.sort) {
const { field, order } = options.sort;
filtered.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Apply pagination // Handle different data types for sorting
return this.applyPagination(filtered, options); if (typeof valueA === 'string' && typeof valueB === 'string') {
} catch (error) { return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
this.handleError(`Error querying documents in ${collection}`, error); } else if (typeof valueA === 'number' && typeof valueB === 'number') {
} return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = filtered.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = filtered.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query documents in ${collection}`, error);
}
});
} }
/** /**
* Create an index for a collection to speed up queries * Create an index for a collection to speed up queries
*/ */
async createIndex(collection: string, field: string): Promise<boolean> { async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
try { try {
// KeyValueStore doesn't support real indexing - this is just a placeholder // In a real implementation, this would register the index with OrbitDB
this.logger.info( // or create a specialized data structure. For now, we'll just log the request.
`Index created on ${field} for collection ${collection} (not supported in KeyValueStore)`, logger.info(`Index created on ${field} for collection ${collection}`);
);
return true; return true;
} catch (error) { } catch (error) {
this.handleError(`Error creating index for ${collection}`, error); if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating index for ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create index for ${collection}`, error);
} }
} }
/**
* Helper to open a store of the correct type
*/
private async openStore(collection: string, options?: StoreOptions): Promise<any> {
const { openStore } = await import('./baseStore');
return await openStore(collection, this.storeType, options);
}
/**
* Helper to handle errors consistently
*/
private handleError(message: string, error: any): never {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`${message}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `${message}: ${error.message}`, error);
}
} }

View File

@ -9,39 +9,45 @@ import { CounterStore } from './counterStore';
const logger = createServiceLogger('STORE_FACTORY'); const logger = createServiceLogger('STORE_FACTORY');
// Initialize instances for each store type - singleton pattern // Initialize instances for each store type
const storeInstances = new Map<StoreType, BaseStore>(); const storeInstances = new Map<StoreType, BaseStore>();
// Store type mapping to implementations
const storeImplementations = {
[StoreType.KEYVALUE]: KeyValueStore,
[StoreType.DOCSTORE]: DocStore,
[StoreType.FEED]: FeedStore,
[StoreType.EVENTLOG]: FeedStore, // Alias for feed
[StoreType.COUNTER]: CounterStore,
};
/** /**
* Get a store instance by type (factory and singleton pattern) * Get a store instance by type
*/ */
export function getStore(type: StoreType): BaseStore { export function getStore(type: StoreType): BaseStore {
// Return cached instance if available (singleton pattern) // Check if we already have an instance
if (storeInstances.has(type)) { if (storeInstances.has(type)) {
return storeInstances.get(type)!; return storeInstances.get(type)!;
} }
// Get the store implementation class // Create a new instance based on type
const StoreClass = storeImplementations[type]; let store: BaseStore;
if (!StoreClass) { switch (type) {
logger.error(`Unsupported store type: ${type}`); case StoreType.KEYVALUE:
throw new DBError(ErrorCode.STORE_TYPE_ERROR, `Unsupported store type: ${type}`); store = new KeyValueStore();
break;
case StoreType.DOCSTORE:
store = new DocStore();
break;
case StoreType.FEED:
case StoreType.EVENTLOG: // Alias for feed
store = new FeedStore();
break;
case StoreType.COUNTER:
store = new CounterStore();
break;
default:
logger.error(`Unsupported store type: ${type}`);
throw new DBError(ErrorCode.STORE_TYPE_ERROR, `Unsupported store type: ${type}`);
} }
// Create a new instance of the store // Cache the instance
const store = new StoreClass();
// Cache the instance for future use
storeInstances.set(type, store); storeInstances.set(type, store);
return store; return store;

View File

@ -1,3 +1,9 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode } from '../types';
import { DBError } from '../core/error';
const logger = createServiceLogger('DB_TRANSACTION');
// Transaction operation type // Transaction operation type
interface TransactionOperation { interface TransactionOperation {
type: 'create' | 'update' | 'delete'; type: 'create' | 'update' | 'delete';
@ -25,7 +31,7 @@ export class Transaction {
type: 'create', type: 'create',
collection, collection,
id, id,
data, data
}); });
return this; return this;
} }
@ -38,7 +44,7 @@ export class Transaction {
type: 'update', type: 'update',
collection, collection,
id, id,
data, data
}); });
return this; return this;
} }
@ -50,7 +56,7 @@ export class Transaction {
this.operations.push({ this.operations.push({
type: 'delete', type: 'delete',
collection, collection,
id, id
}); });
return this; return this;
} }

View File

@ -4,32 +4,13 @@ import { Transaction } from '../transactions/transactionService';
export type { Transaction }; export type { Transaction };
// Resource locking for concurrent operations
const locks = new Map<string, boolean>();
export const acquireLock = (resourceId: string): boolean => {
if (locks.has(resourceId)) {
return false;
}
locks.set(resourceId, true);
return true;
};
export const releaseLock = (resourceId: string): void => {
locks.delete(resourceId);
};
export const isLocked = (resourceId: string): boolean => {
return locks.has(resourceId);
};
// Database Types // Database Types
export enum StoreType { export enum StoreType {
KEYVALUE = 'keyvalue', KEYVALUE = 'keyvalue',
DOCSTORE = 'docstore', DOCSTORE = 'docstore',
FEED = 'feed', FEED = 'feed',
EVENTLOG = 'eventlog', EVENTLOG = 'eventlog',
COUNTER = 'counter', COUNTER = 'counter'
} }
// Common result types // Common result types

View File

@ -19,6 +19,8 @@ import {
getFile, getFile,
deleteFile, deleteFile,
defineSchema, defineSchema,
getMetrics,
resetMetrics,
closeConnection, closeConnection,
stop as stopDB, stop as stopDB,
} from './db/dbService'; } from './db/dbService';
@ -79,6 +81,8 @@ export {
getFile, getFile,
deleteFile, deleteFile,
defineSchema, defineSchema,
getMetrics,
resetMetrics,
closeConnection, closeConnection,
stopDB, stopDB,
ErrorCode, ErrorCode,
@ -131,6 +135,8 @@ export default {
getFile, getFile,
deleteFile, deleteFile,
defineSchema, defineSchema,
getMetrics,
resetMetrics,
closeConnection, closeConnection,
stop: stopDB, stop: stopDB,
ErrorCode, ErrorCode,

View File

@ -8,12 +8,7 @@ import {
getProxyAgentInstance, getProxyAgentInstance,
} from './services/ipfsCoreService'; } from './services/ipfsCoreService';
import { import { getConnectedPeers, getOptimalPeer, updateNodeLoad, logPeersStatus } from './services/discoveryService';
getConnectedPeers,
getOptimalPeer,
updateNodeLoad,
logPeersStatus,
} from './services/discoveryService';
import { createServiceLogger } from '../utils/logger'; import { createServiceLogger } from '../utils/logger';
// Create logger for IPFS service // Create logger for IPFS service

View File

@ -1,80 +1,102 @@
// Load balancer service - Implements load balancing strategies for distributing connections
import * as ipfsService from './ipfsService'; import * as ipfsService from './ipfsService';
import { config } from '../config'; import { config } from '../config';
import { createServiceLogger } from '../utils/logger';
const logger = createServiceLogger('LOAD_BALANCER');
// Track last peer chosen for round-robin strategy // Track last peer chosen for round-robin strategy
let lastPeerIndex = -1; let lastPeerIndex = -1;
// Type definitions interface PeerInfo {
export interface PeerInfo {
peerId: string; peerId: string;
load: number; load: number;
publicAddress: string; publicAddress: string;
} }
export interface PeerStatus extends PeerInfo { interface PeerStatus extends PeerInfo {
lastSeen: number; lastSeen: number;
} }
export interface NodeStatus { interface NodeStatus {
fingerprint: string; fingerprint: string;
peerCount: number; peerCount: number;
isHealthy: boolean; isHealthy: boolean;
} }
type LoadBalancerStrategy = 'leastLoaded' | 'roundRobin' | 'random'; interface LoadBalancerServiceModule {
getOptimalPeer: () => PeerInfo | null;
/** getAllPeers: () => PeerStatus[];
* Strategies for peer selection getNodeStatus: () => NodeStatus;
*/ }
const strategies = {
leastLoaded: (peers: PeerStatus[]): PeerStatus => {
return peers.reduce((min, current) => (current.load < min.load ? current : min), peers[0]);
},
roundRobin: (peers: PeerStatus[]): PeerStatus => {
lastPeerIndex = (lastPeerIndex + 1) % peers.length;
return peers[lastPeerIndex];
},
random: (peers: PeerStatus[]): PeerStatus => {
const randomIndex = Math.floor(Math.random() * peers.length);
return peers[randomIndex];
},
};
/** /**
* Get the optimal peer based on the configured load balancing strategy * Get the optimal peer based on the configured load balancing strategy
* @returns Object containing the selected peer information or null if no peers available
*/ */
export const getOptimalPeer = (): PeerInfo | null => { export const getOptimalPeer = (): { peerId: string; load: number; publicAddress: string } | null => {
// Get all available peers
const connectedPeers = ipfsService.getConnectedPeers(); const connectedPeers = ipfsService.getConnectedPeers();
// If no peers are available, return null
if (connectedPeers.size === 0) { if (connectedPeers.size === 0) {
logger.info('No peers available for load balancing'); console.log('[LOAD-BALANCER] No peers available for load balancing');
return null; return null;
} }
// Convert Map to Array for easier manipulation // Convert Map to Array for easier manipulation
const peersArray = Array.from(connectedPeers.entries()).map(([peerId, data]) => ({ const peersArray = Array.from(connectedPeers.entries()).map(([peerId, data]) => {
peerId, return {
load: data.load, peerId,
lastSeen: data.lastSeen, load: data.load,
publicAddress: data.publicAddress, lastSeen: data.lastSeen,
})); publicAddress: data.publicAddress,
};
});
// Apply the selected load balancing strategy // Apply the load balancing strategy
const strategy = config.loadBalancer.strategy as LoadBalancerStrategy; const strategy = config.loadBalancer.strategy;
let selectedPeer; let selectedPeer;
// Select strategy function or default to least loaded switch (strategy) {
const strategyFn = strategies[strategy] || strategies.leastLoaded; case 'least-loaded':
selectedPeer = strategyFn(peersArray); // Find the peer with the lowest load
selectedPeer = peersArray.reduce((min, current) => (current.load < min.load ? current : min), peersArray[0]);
console.log(
`[LOAD-BALANCER] Selected least loaded peer: ${selectedPeer.peerId.substring(0, 15)}... with load ${
selectedPeer.load
}%`
);
break;
logger.info( case 'round-robin':
`Selected peer (${strategy}): ${selectedPeer.peerId.substring(0, 15)}... with load ${selectedPeer.load}%`, // Simple round-robin strategy
); lastPeerIndex = (lastPeerIndex + 1) % peersArray.length;
selectedPeer = peersArray[lastPeerIndex];
console.log(
`[LOAD-BALANCER] Selected round-robin peer: ${selectedPeer.peerId.substring(0, 15)}... with load ${
selectedPeer.load
}%`
);
break;
case 'random':
// Random selection
const randomIndex = Math.floor(Math.random() * peersArray.length);
selectedPeer = peersArray[randomIndex];
console.log(
`[LOAD-BALANCER] Selected random peer: ${selectedPeer.peerId.substring(0, 15)}... with load ${
selectedPeer.load
}%`
);
break;
default:
// Default to least-loaded if unknown strategy
selectedPeer = peersArray.reduce((min, current) => (current.load < min.load ? current : min), peersArray[0]);
console.log(
`[LOAD-BALANCER] Selected least loaded peer: ${selectedPeer.peerId.substring(0, 15)}... with load ${
selectedPeer.load
}%`
);
}
return { return {
peerId: selectedPeer.peerId, peerId: selectedPeer.peerId,
@ -85,22 +107,27 @@ export const getOptimalPeer = (): PeerInfo | null => {
/** /**
* Get all available peers with their load information * Get all available peers with their load information
* @returns Array of peer information objects
*/ */
export const getAllPeers = (): PeerStatus[] => { export const getAllPeers = () => {
const connectedPeers = ipfsService.getConnectedPeers(); const connectedPeers = ipfsService.getConnectedPeers();
const result: any = [];
return Array.from(connectedPeers.entries()).map(([peerId, data]) => ({ connectedPeers.forEach((data, peerId) => {
peerId, result.push({
load: data.load, peerId,
lastSeen: data.lastSeen, load: data.load,
publicAddress: data.publicAddress, lastSeen: data.lastSeen,
})); });
});
return result;
}; };
/** /**
* Get information about the current node's load * Get information about the current node's load
*/ */
export const getNodeStatus = (): NodeStatus => { export const getNodeStatus = () => {
const connectedPeers = ipfsService.getConnectedPeers(); const connectedPeers = ipfsService.getConnectedPeers();
return { return {
fingerprint: config.env.fingerprint, fingerprint: config.env.fingerprint,
@ -109,4 +136,8 @@ export const getNodeStatus = (): NodeStatus => {
}; };
}; };
export default { getOptimalPeer, getAllPeers, getNodeStatus }; export default {
getOptimalPeer,
getAllPeers,
getNodeStatus,
} as LoadBalancerServiceModule;

View File

@ -10,10 +10,8 @@ const heartbeatLogger = createServiceLogger('HEARTBEAT');
// Node metadata // Node metadata
const fingerprint = config.env.fingerprint; const fingerprint = config.env.fingerprint;
const connectedPeers: Map< const connectedPeers: Map<string, { lastSeen: number; load: number; publicAddress: string; fingerprint: string }> =
string, new Map();
{ lastSeen: number; load: number; publicAddress: string; fingerprint: string }
> = new Map();
const SERVICE_DISCOVERY_TOPIC = ipfsConfig.serviceDiscovery.topic; const SERVICE_DISCOVERY_TOPIC = ipfsConfig.serviceDiscovery.topic;
const HEARTBEAT_INTERVAL = ipfsConfig.serviceDiscovery.heartbeatInterval; const HEARTBEAT_INTERVAL = ipfsConfig.serviceDiscovery.heartbeatInterval;
let heartbeatInterval: NodeJS.Timeout; let heartbeatInterval: NodeJS.Timeout;
@ -39,13 +37,9 @@ export const setupServiceDiscovery = async (pubsub: PubSub) => {
}); });
if (!existingPeer) { if (!existingPeer) {
discoveryLogger.info( discoveryLogger.info(`New peer discovered: ${peerId} (fingerprint=${message.fingerprint})`);
`New peer discovered: ${peerId} (fingerprint=${message.fingerprint})`,
);
} }
heartbeatLogger.info( heartbeatLogger.info(`Received from ${peerId}: load=${message.load}, addr=${message.publicAddress}`);
`Received from ${peerId}: load=${message.load}, addr=${message.publicAddress}`,
);
} }
} catch (err) { } catch (err) {
discoveryLogger.error(`Error processing message:`, err); discoveryLogger.error(`Error processing message:`, err);
@ -64,22 +58,15 @@ export const setupServiceDiscovery = async (pubsub: PubSub) => {
publicAddress: ipfsConfig.serviceDiscovery.publicAddress, publicAddress: ipfsConfig.serviceDiscovery.publicAddress,
}; };
await pubsub.publish( await pubsub.publish(SERVICE_DISCOVERY_TOPIC, new TextEncoder().encode(JSON.stringify(heartbeatMsg)));
SERVICE_DISCOVERY_TOPIC, heartbeatLogger.info(`Sent: fingerprint=${fingerprint}, load=${nodeLoad}, addr=${heartbeatMsg.publicAddress}`);
new TextEncoder().encode(JSON.stringify(heartbeatMsg)),
);
heartbeatLogger.info(
`Sent: fingerprint=${fingerprint}, load=${nodeLoad}, addr=${heartbeatMsg.publicAddress}`,
);
const now = Date.now(); const now = Date.now();
const staleTime = ipfsConfig.serviceDiscovery.staleTimeout; const staleTime = ipfsConfig.serviceDiscovery.staleTimeout;
for (const [peerId, peerData] of connectedPeers.entries()) { for (const [peerId, peerData] of connectedPeers.entries()) {
if (now - peerData.lastSeen > staleTime) { if (now - peerData.lastSeen > staleTime) {
discoveryLogger.info( discoveryLogger.info(`Peer ${peerId.substring(0, 15)}... is stale, removing from load balancer`);
`Peer ${peerId.substring(0, 15)}... is stale, removing from load balancer`,
);
connectedPeers.delete(peerId); connectedPeers.delete(peerId);
} }
} }
@ -115,9 +102,7 @@ export const logPeersStatus = () => {
if (peerCount > 0) { if (peerCount > 0) {
discoveryLogger.info('Peer status:'); discoveryLogger.info('Peer status:');
connectedPeers.forEach((data, peerId) => { connectedPeers.forEach((data, peerId) => {
discoveryLogger.debug( discoveryLogger.debug(` - ${peerId} Load: ${data.load}% Last seen: ${new Date(data.lastSeen).toISOString()}`);
` - ${peerId} Load: ${data.load}% Last seen: ${new Date(data.lastSeen).toISOString()}`,
);
}); });
} }
}; };

View File

@ -29,18 +29,9 @@ export const initIpfsNode = async (externalProxyAgent: any = null) => {
proxyAgent = externalProxyAgent; proxyAgent = externalProxyAgent;
const blockstorePath = ipfsConfig.blockstorePath; const blockstorePath = ipfsConfig.blockstorePath;
try { if (!fs.existsSync(blockstorePath)) {
if (!fs.existsSync(blockstorePath)) { fs.mkdirSync(blockstorePath, { recursive: true });
fs.mkdirSync(blockstorePath, { recursive: true, mode: 0o755 }); logger.info(`Created blockstore directory: ${blockstorePath}`);
logger.info(`Created blockstore directory: ${blockstorePath}`);
}
// Check write permissions
fs.accessSync(blockstorePath, fs.constants.W_OK);
logger.info(`Verified write permissions for blockstore directory: ${blockstorePath}`);
} catch (permError: any) {
logger.error(`Permission error with blockstore directory: ${blockstorePath}`, permError);
throw new Error(`Cannot access or write to blockstore directory: ${permError.message}`);
} }
const blockstore = new FsBlockstore(blockstorePath); const blockstore = new FsBlockstore(blockstorePath);
@ -86,7 +77,7 @@ export const initIpfsNode = async (externalProxyAgent: any = null) => {
`Listening on: ${libp2pNode `Listening on: ${libp2pNode
.getMultiaddrs() .getMultiaddrs()
.map((addr: any) => addr.toString()) .map((addr: any) => addr.toString())
.join(', ')}`, .join(', ')}`
); );
helia = await createHelia({ helia = await createHelia({
@ -127,9 +118,7 @@ function setupPeerEventListeners(node: Libp2p) {
node.peerStore node.peerStore
.get(event.detail) .get(event.detail)
.then((peerInfo) => { .then((peerInfo) => {
const multiaddrs = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()) || [ const multiaddrs = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()) || ['unknown'];
'unknown',
];
logger.info(`Peer multiaddrs: ${multiaddrs.join(', ')}`); logger.info(`Peer multiaddrs: ${multiaddrs.join(', ')}`);
}) })
.catch((error) => { .catch((error) => {
@ -148,9 +137,7 @@ function setupPeerEventListeners(node: Libp2p) {
node.peerStore node.peerStore
.get(event.detail) .get(event.detail)
.then((peerInfo) => { .then((peerInfo) => {
const multiaddrs = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()) || [ const multiaddrs = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()) || ['unknown'];
'unknown',
];
logger.error(`Peer multiaddrs: ${multiaddrs.join(', ')}`); logger.error(`Peer multiaddrs: ${multiaddrs.join(', ')}`);
}) })
.catch((error) => { .catch((error) => {
@ -216,8 +203,7 @@ async function attemptPeerConnections(node: Libp2p) {
try { try {
// Get peer info including addresses // Get peer info including addresses
const peerInfo = await node.peerStore.get(peerId); const peerInfo = await node.peerStore.get(peerId);
const addresses = const addresses = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()).join(', ') || 'unknown';
peerInfo?.addresses.map((addr) => addr.multiaddr.toString()).join(', ') || 'unknown';
logger.info(` - Connected to peer: ${peerId.toString()}`); logger.info(` - Connected to peer: ${peerId.toString()}`);
logger.info(` Addresses: ${addresses}`); logger.info(` Addresses: ${addresses}`);
} catch (_error) { } catch (_error) {

View File

@ -14,67 +14,30 @@ let orbitdb: any;
export const getOrbitDBDir = (): string => { export const getOrbitDBDir = (): string => {
const baseDir = config.orbitdb.directory; const baseDir = config.orbitdb.directory;
const fingerprint = config.env.fingerprint; const fingerprint = config.env.fingerprint;
// Use path.join for proper cross-platform path handling return `${baseDir}-${fingerprint}`;
return path.join(baseDir, `debros-${fingerprint}`);
}; };
const ORBITDB_DIR = getOrbitDBDir(); const ORBITDB_DIR = getOrbitDBDir();
const ADDRESS_DIR = path.join(ORBITDB_DIR, 'addresses');
export const getDBAddress = (name: string): string | null => { export const getDBAddress = (name: string): string | null => {
try { const addressFile = path.join(ORBITDB_DIR, `${name}.address`);
const addressFile = path.join(ADDRESS_DIR, `${name}.address`); if (fs.existsSync(addressFile)) {
if (fs.existsSync(addressFile)) { return fs.readFileSync(addressFile, 'utf-8').trim();
return fs.readFileSync(addressFile, 'utf-8').trim();
}
} catch (error) {
logger.error(`Error reading DB address for ${name}:`, error);
} }
return null; return null;
}; };
export const saveDBAddress = (name: string, address: string): boolean => { export const saveDBAddress = (name: string, address: string) => {
try { const addressFile = path.join(ORBITDB_DIR, `${name}.address`);
// Ensure the address directory exists fs.writeFileSync(addressFile, address);
if (!fs.existsSync(ADDRESS_DIR)) {
fs.mkdirSync(ADDRESS_DIR, { recursive: true, mode: 0o755 });
}
const addressFile = path.join(ADDRESS_DIR, `${name}.address`);
fs.writeFileSync(addressFile, address, { mode: 0o644 });
logger.info(`Saved DB address for ${name} at ${addressFile}`);
return true;
} catch (error) {
logger.error(`Failed to save DB address for ${name}:`, error);
return false;
}
}; };
export const init = async () => { export const init = async () => {
try { try {
// Create directory with proper permissions if it doesn't exist // Create directory if it doesn't exist
try { if (!fs.existsSync(ORBITDB_DIR)) {
if (!fs.existsSync(ORBITDB_DIR)) { fs.mkdirSync(ORBITDB_DIR, { recursive: true });
fs.mkdirSync(ORBITDB_DIR, { recursive: true, mode: 0o755 }); logger.info(`Created OrbitDB directory: ${ORBITDB_DIR}`);
logger.info(`Created OrbitDB directory: ${ORBITDB_DIR}`);
}
// Check write permissions
fs.accessSync(ORBITDB_DIR, fs.constants.W_OK);
} catch (permError: any) {
logger.error(`Permission error with OrbitDB directory: ${ORBITDB_DIR}`, permError);
throw new Error(`Cannot access or write to OrbitDB directory: ${permError.message}`);
}
// Create the addresses directory
try {
if (!fs.existsSync(ADDRESS_DIR)) {
fs.mkdirSync(ADDRESS_DIR, { recursive: true, mode: 0o755 });
logger.info(`Created OrbitDB addresses directory: ${ADDRESS_DIR}`);
}
} catch (dirError) {
logger.error(`Error creating addresses directory: ${ADDRESS_DIR}`, dirError);
// Continue anyway, we'll handle failures when saving addresses
} }
registerFeed(); registerFeed();
@ -93,9 +56,9 @@ export const init = async () => {
logger.info('OrbitDB initialized successfully.'); logger.info('OrbitDB initialized successfully.');
return orbitdb; return orbitdb;
} catch (e: any) { } catch (e) {
logger.error('Failed to initialize OrbitDB:', e); logger.error('Failed to initialize OrbitDB:', e);
throw new Error(`OrbitDB initialization failed: ${e.message}`); throw e;
} }
}; };
@ -111,9 +74,7 @@ export const openDB = async (name: string, type: string) => {
const dbOptions = { const dbOptions = {
type, type,
overwrite: false, overwrite: false,
AccessController: IPFSAccessController({ AccessController: IPFSAccessController({ write: ['*'] }),
write: ['*'],
}),
}; };
if (existingAddress) { if (existingAddress) {

View File

@ -55,8 +55,7 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
} else { } else {
try { try {
message = JSON.stringify(message, null, 2); message = JSON.stringify(message, null, 2);
} catch (e: any) { } catch (e) {
console.error(e);
message = '[Object]'; message = '[Object]';
} }
} }
@ -78,8 +77,7 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
} else { } else {
try { try {
message = JSON.stringify(message); message = JSON.stringify(message);
} catch (e: any) { } catch (e) {
console.error(e);
message = '[Object]'; message = '[Object]';
} }
} }
@ -96,11 +94,8 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
if (!options.disableConsole) { if (!options.disableConsole) {
loggerTransports.push( loggerTransports.push(
new transports.Console({ new transports.Console({
format: format.combine( format: format.combine(format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), customConsoleFormat),
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), })
customConsoleFormat,
),
}),
); );
} }
@ -110,20 +105,14 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
// Combined log file // Combined log file
new transports.File({ new transports.File({
filename: path.join(logsDir, 'app.log'), filename: path.join(logsDir, 'app.log'),
format: format.combine( format: format.combine(format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), customFileFormat),
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
customFileFormat,
),
}), }),
// Error log file // Error log file
new transports.File({ new transports.File({
filename: path.join(logsDir, 'error.log'), filename: path.join(logsDir, 'error.log'),
level: 'error', level: 'error',
format: format.combine( format: format.combine(format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), customFileFormat),
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), })
customFileFormat,
),
}),
); );
} }
@ -138,14 +127,10 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
// Helper function to create a logger for a specific service // Helper function to create a logger for a specific service
const createServiceLogger = (serviceName: string) => { const createServiceLogger = (serviceName: string) => {
return { return {
error: (message: any, ...meta: any[]) => error: (message: any, ...meta: any[]) => logger.error(message, { service: serviceName, ...meta }),
logger.error(message, { service: serviceName, ...meta }), warn: (message: any, ...meta: any[]) => logger.warn(message, { service: serviceName, ...meta }),
warn: (message: any, ...meta: any[]) => info: (message: any, ...meta: any[]) => logger.info(message, { service: serviceName, ...meta }),
logger.warn(message, { service: serviceName, ...meta }), debug: (message: any, ...meta: any[]) => logger.debug(message, { service: serviceName, ...meta }),
info: (message: any, ...meta: any[]) =>
logger.info(message, { service: serviceName, ...meta }),
debug: (message: any, ...meta: any[]) =>
logger.debug(message, { service: serviceName, ...meta }),
}; };
}; };

41
types.d.ts vendored
View File

@ -230,41 +230,11 @@ declare module '@debros/network' {
options?: { connectionId?: string; storeType?: StoreType }, options?: { connectionId?: string; storeType?: StoreType },
): Promise<boolean>; ): Promise<boolean>;
// Event Subscription API // Subscription API
export interface DocumentCreatedEvent {
collection: string;
id: string;
document: any;
}
export interface DocumentUpdatedEvent {
collection: string;
id: string;
document: any;
previous: any;
}
export interface DocumentDeletedEvent {
collection: string;
id: string;
document: any;
}
export type DBEventType = 'document:created' | 'document:updated' | 'document:deleted';
export function subscribe( export function subscribe(
event: 'document:created', event: 'document:created' | 'document:updated' | 'document:deleted',
callback: (data: DocumentCreatedEvent) => void, callback: (data: any) => void,
): () => void; ): () => void;
export function subscribe(
event: 'document:updated',
callback: (data: DocumentUpdatedEvent) => void,
): () => void;
export function subscribe(
event: 'document:deleted',
callback: (data: DocumentDeletedEvent) => void,
): () => void;
export function subscribe(event: DBEventType, callback: (data: any) => void): () => void;
// File operations // File operations
export function uploadFile( export function uploadFile(
@ -278,6 +248,9 @@ declare module '@debros/network' {
export function closeConnection(connectionId: string): Promise<boolean>; export function closeConnection(connectionId: string): Promise<boolean>;
// Metrics // Metrics
export function getMetrics(): Metrics;
export function resetMetrics(): void;
// Stop // Stop
export function stopDB(): Promise<void>; export function stopDB(): Promise<void>;
@ -331,6 +304,8 @@ declare module '@debros/network' {
getFile: typeof getFile; getFile: typeof getFile;
deleteFile: typeof deleteFile; deleteFile: typeof deleteFile;
defineSchema: typeof defineSchema; defineSchema: typeof defineSchema;
getMetrics: typeof getMetrics;
resetMetrics: typeof resetMetrics;
closeConnection: typeof closeConnection; closeConnection: typeof closeConnection;
stop: typeof stopDB; stop: typeof stopDB;
ErrorCode: typeof ErrorCode; ErrorCode: typeof ErrorCode;