Merge pull request 'bug fixes' (#1) from alpha-0.0.23 into main
All checks were successful
Publish Alpha Package to npm / publish (push) Successful in 49s

Reviewed-on: #1
This commit is contained in:
anonpenguin 2025-05-16 08:57:43 +00:00
commit e3dfa052f8
34 changed files with 6439 additions and 3918 deletions

125
README.md
View File

@ -26,45 +26,45 @@ npm install @debros/network
## Basic Usage
```typescript
import { initDB, create, get, query, uploadFile, logger } from "@debros/network";
import { initDB, create, get, query, uploadFile, logger } from '@debros/network';
// Initialize the database service
async function startApp() {
try {
// Initialize with default configuration
await initDB();
logger.info("Database initialized successfully");
logger.info('Database initialized successfully');
// Create a new user document
const userId = 'user123';
const user = {
username: 'johndoe',
walletAddress: '0x1234567890',
avatar: null
avatar: null,
};
const result = await create('users', userId, user);
logger.info(`Created user with ID: ${result.id}`);
// Get a user by ID
const retrievedUser = await get('users', userId);
logger.info('User:', retrievedUser);
// Query users with filtering
const activeUsers = await query('users',
user => user.isActive === true,
{ limit: 10, sort: { field: 'createdAt', order: 'desc' } }
);
const activeUsers = await query('users', (user) => user.isActive === true, {
limit: 10,
sort: { field: 'createdAt', order: 'desc' },
});
logger.info(`Found ${activeUsers.total} active users`);
// Upload a file
const fileData = Buffer.from('File content');
const fileUpload = await uploadFile(fileData, { filename: 'document.txt' });
logger.info(`Uploaded file with CID: ${fileUpload.cid}`);
return true;
} catch (error) {
logger.error("Failed to start app:", error);
logger.error('Failed to start app:', error);
throw error;
}
}
@ -77,30 +77,27 @@ startApp();
The library supports multiple OrbitDB store types, each optimized for different use cases:
```typescript
import { create, get, update, StoreType } from "@debros/network";
import { create, get, update, StoreType } from '@debros/network';
// Default KeyValue store (for general use)
await create('users', 'user1', { name: 'Alice' });
// Document store (better for complex documents with indexing)
await create('posts', 'post1', { title: 'Hello', content: '...' },
{ storeType: StoreType.DOCSTORE }
await create(
'posts',
'post1',
{ title: 'Hello', content: '...' },
{ storeType: StoreType.DOCSTORE },
);
// Feed/EventLog store (append-only, good for immutable logs)
await create('events', 'evt1', { type: 'login', user: 'alice' },
{ storeType: StoreType.FEED }
);
await create('events', 'evt1', { type: 'login', user: 'alice' }, { storeType: StoreType.FEED });
// Counter store (for numeric counters)
await create('stats', 'visits', { value: 0 },
{ storeType: StoreType.COUNTER }
);
await create('stats', 'visits', { value: 0 }, { storeType: StoreType.COUNTER });
// Increment a counter
await update('stats', 'visits', { increment: 1 },
{ storeType: StoreType.COUNTER }
);
await update('stats', 'visits', { increment: 1 }, { storeType: StoreType.COUNTER });
// Get counter value
const stats = await get('stats', 'visits', { storeType: StoreType.COUNTER });
@ -112,7 +109,7 @@ console.log(`Visit count: ${stats.value}`);
### Schema Validation
```typescript
import { defineSchema, create } from "@debros/network";
import { defineSchema, create } from '@debros/network';
// Define a schema
defineSchema('users', {
@ -121,32 +118,32 @@ defineSchema('users', {
type: 'string',
required: true,
min: 3,
max: 20
max: 20,
},
email: {
type: 'string',
pattern: '^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$'
pattern: '^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$',
},
age: {
type: 'number',
min: 18
}
min: 18,
},
},
required: ['username']
required: ['username'],
});
// Document creation will be validated against the schema
await create('users', 'user1', {
username: 'alice',
email: 'alice@example.com',
age: 25
age: 25,
});
```
### Transactions
```typescript
import { createTransaction, commitTransaction } from "@debros/network";
import { createTransaction, commitTransaction } from '@debros/network';
// Create a transaction
const transaction = createTransaction();
@ -162,37 +159,39 @@ const result = await commitTransaction(transaction);
console.log(`Transaction completed with ${result.results.length} operations`);
```
### Subscriptions
### Event Subscriptions
```typescript
import { subscribe } from "@debros/network";
import { subscribe } from '@debros/network';
// Subscribe to document changes
const unsubscribe = subscribe('document:created', (data) => {
console.log(`New document created in ${data.collection}:`, data.id);
console.log('Document data:', data.document);
});
// Later, unsubscribe
// Other event types
// subscribe('document:updated', (data) => { ... });
// subscribe('document:deleted', (data) => { ... });
// Later, unsubscribe when done
unsubscribe();
```
### Pagination and Sorting
```typescript
import { list, query } from "@debros/network";
import { list, query } from '@debros/network';
// List with pagination and sorting
const page1 = await list('users', {
limit: 10,
offset: 0,
sort: { field: 'createdAt', order: 'desc' }
sort: { field: 'createdAt', order: 'desc' },
});
// Query with pagination
const results = await query('users',
(user) => user.age > 21,
{ limit: 10, offset: 20 }
);
const results = await query('users', (user) => user.age > 21, { limit: 10, offset: 20 });
console.log(`Found ${results.total} matches, showing ${results.documents.length}`);
console.log(`Has more pages: ${results.hasMore}`);
@ -201,7 +200,7 @@ console.log(`Has more pages: ${results.hasMore}`);
### TypeScript Support
```typescript
import { get, update, query } from "@debros/network";
import { get, update, query } from '@debros/network';
interface User {
username: string;
@ -216,15 +215,13 @@ const user = await get<User>('users', 'user1');
await update<User>('users', 'user1', { age: 26 });
const results = await query<User>('users',
(user) => user.age > 21
);
const results = await query<User>('users', (user) => user.age > 21);
```
### Connection Management
```typescript
import { initDB, closeConnection } from "@debros/network";
import { initDB, closeConnection } from '@debros/network';
// Create multiple connections
const conn1 = await initDB('connection1');
@ -237,21 +234,6 @@ await create('users', 'user1', { name: 'Alice' }, { connectionId: conn1 });
await closeConnection(conn1);
```
### Performance Metrics
```typescript
import { getMetrics, resetMetrics } from "@debros/network";
// Get performance metrics
const metrics = getMetrics();
console.log('Operations:', metrics.operations);
console.log('Avg operation time:', metrics.performance.averageOperationTime, 'ms');
console.log('Cache hits/misses:', metrics.cacheStats);
// Reset metrics (e.g., after deployment)
resetMetrics();
```
## API Reference
### Core Database Operations
@ -285,9 +267,10 @@ resetMetrics();
- `Transaction.update<T>(collection, id, data): Transaction` - Add an update operation
- `Transaction.delete(collection, id): Transaction` - Add a delete operation
### Subscriptions
### Event Subscriptions
- `subscribe(event, callback): () => void` - Subscribe to events, returns unsubscribe function
- `subscribe(event, callback): () => void` - Subscribe to database events, returns unsubscribe function
- Event types: 'document:created', 'document:updated', 'document:deleted'
### File Operations
@ -302,20 +285,18 @@ resetMetrics();
### Indexes and Performance
- `createIndex(collection, field, options?): Promise<boolean>` - Create an index
- `getMetrics(): Metrics` - Get performance metrics
- `resetMetrics(): void` - Reset performance metrics
## Configuration
```typescript
import { config, initDB } from "@debros/network";
import { config, initDB } from '@debros/network';
// Configure (optional)
config.env.fingerprint = "my-unique-app-id";
config.env.fingerprint = 'my-unique-app-id';
config.env.port = 9000;
config.ipfs.blockstorePath = "./custom-path/blockstore";
config.orbitdb.directory = "./custom-path/orbitdb";
config.ipfs.blockstorePath = './custom-path/blockstore';
config.orbitdb.directory = './custom-path/orbitdb';
// Initialize with configuration
await initDB();
```
```

View File

@ -1,73 +0,0 @@
import { initDB, create, get, update, remove, list, query, uploadFile, getFile, deleteFile, stopDB, logger } from '../src';
// Alternative import method
// import debros from '../src';
// const { db } = debros;
async function databaseExample() {
try {
logger.info('Starting database example...');
// Initialize the database service (abstracts away IPFS and OrbitDB)
await initDB();
logger.info('Database service initialized');
// Create a new user document
const userId = 'user123';
const userData = {
username: 'johndoe',
walletAddress: '0x1234567890',
avatar: null
};
const createResult = await create('users', userId, userData);
logger.info(`Created user with ID: ${createResult.id} and hash: ${createResult.hash}`);
// Retrieve the user
const user = await get('users', userId);
logger.info('Retrieved user:', user);
// Update the user
const updateResult = await update('users', userId, {
avatar: 'profile.jpg',
bio: 'Software developer'
});
logger.info(`Updated user with hash: ${updateResult.hash}`);
// Query users
const filteredUsers = await query('users', (user) => user.username === 'johndoe');
logger.info(`Found ${filteredUsers.length} matching users`);
// List all users
const allUsers = await list('users', { limit: 10 });
logger.info(`Retrieved ${allUsers.length} users`);
// Upload a file
const fileData = Buffer.from('This is a test file content');
const fileUpload = await uploadFile(fileData, { filename: 'test.txt' });
logger.info(`Uploaded file with CID: ${fileUpload.cid}`);
// Retrieve the file
const file = await getFile(fileUpload.cid);
logger.info('Retrieved file:', {
content: file.data.toString(),
metadata: file.metadata
});
// Delete the file
await deleteFile(fileUpload.cid);
logger.info('File deleted');
// Delete the user
await remove('users', userId);
logger.info('User deleted');
// Stop the database service
await stopDB();
logger.info('Database service stopped');
} catch (error) {
logger.error('Error in database example:', error);
}
}
databaseExample();

View File

@ -1,266 +0,0 @@
import {
initDB,
create,
get,
update,
remove,
list,
query,
uploadFile,
getFile,
createTransaction,
commitTransaction,
subscribe,
defineSchema,
getMetrics,
createIndex,
ErrorCode,
StoreType,
logger
} from '../src';
// Define a user schema
const userSchema = {
properties: {
username: {
type: 'string',
required: true,
min: 3,
max: 20,
},
email: {
type: 'string',
pattern: '^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$',
},
age: {
type: 'number',
min: 18,
max: 120,
},
roles: {
type: 'array',
items: {
type: 'string',
},
},
isActive: {
type: 'boolean',
},
},
required: ['username'],
};
async function advancedDatabaseExample() {
try {
logger.info('Starting advanced database example...');
// Initialize the database service with a specific connection ID
const connectionId = await initDB('example-connection');
logger.info(`Database service initialized with connection ID: ${connectionId}`);
// Define schema for validation
defineSchema('users', userSchema);
logger.info('User schema defined');
// Create index for faster queries (works with docstore)
await createIndex('users', 'username', { storeType: StoreType.DOCSTORE });
logger.info('Created index on username field');
// Set up subscription for real-time updates
const unsubscribe = subscribe('document:created', (data) => {
logger.info('Document created:', data.collection, data.id);
});
// Create multiple users using a transaction
const transaction = createTransaction(connectionId);
transaction
.create('users', 'user1', {
username: 'alice',
email: 'alice@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
})
.create('users', 'user2', {
username: 'bob',
email: 'bob@example.com',
age: 34,
roles: ['user'],
isActive: true,
})
.create('users', 'user3', {
username: 'charlie',
email: 'charlie@example.com',
age: 45,
roles: ['moderator', 'user'],
isActive: false,
});
const txResult = await commitTransaction(transaction);
logger.info(`Transaction committed with ${txResult.results.length} operations`);
// Get a specific user with type safety
interface User {
username: string;
email: string;
age: number;
roles: string[];
isActive: boolean;
createdAt: number;
updatedAt: number;
}
// Using KeyValue store (default)
const user = await get<User>('users', 'user1');
logger.info('Retrieved user from KeyValue store:', user?.username, user?.email);
// Using DocStore
await create<User>(
'users_docstore',
'user1',
{
username: 'alice_doc',
email: 'alice_doc@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
},
{ storeType: StoreType.DOCSTORE }
);
const docStoreUser = await get<User>('users_docstore', 'user1', { storeType: StoreType.DOCSTORE });
logger.info('Retrieved user from DocStore:', docStoreUser?.username, docStoreUser?.email);
// Using Feed/EventLog store
await create<User>(
'users_feed',
'user1',
{
username: 'alice_feed',
email: 'alice_feed@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
},
{ storeType: StoreType.FEED }
);
// Update the feed entry (creates a new entry with the same ID)
await update<User>(
'users_feed',
'user1',
{
roles: ['admin', 'user', 'tester'],
},
{ storeType: StoreType.FEED }
);
// List all entries in the feed
const feedUsers = await list<User>('users_feed', { storeType: StoreType.FEED });
logger.info(`Found ${feedUsers.total} feed entries:`);
feedUsers.documents.forEach(user => {
logger.info(`- ${user.username} (${user.email})`);
});
// Using Counter store
await create(
'counters',
'visitors',
{ value: 100 },
{ storeType: StoreType.COUNTER }
);
// Increment the counter
await update(
'counters',
'visitors',
{ increment: 5 },
{ storeType: StoreType.COUNTER }
);
// Get the counter value
const counter = await get('counters', 'visitors', { storeType: StoreType.COUNTER });
logger.info(`Counter value: ${counter?.value}`);
// Update a user in KeyValue store
await update<User>('users', 'user1', {
roles: ['admin', 'user', 'tester'],
});
// Query users from KeyValue store
const result = await query<User>(
'users',
(user) => user.isActive === true,
{
limit: 10,
offset: 0,
sort: { field: 'username', order: 'asc' },
}
);
logger.info(`Found ${result.total} active users:`);
result.documents.forEach(user => {
logger.info(`- ${user.username} (${user.email})`);
});
// List all users from KeyValue store with pagination
const allUsers = await list<User>('users', {
limit: 10,
offset: 0,
sort: { field: 'age', order: 'desc' },
});
logger.info(`Listed ${allUsers.total} users sorted by age (desc):`);
allUsers.documents.forEach(user => {
logger.info(`- ${user.username}: ${user.age} years old`);
});
// Upload a file with metadata
const fileData = Buffer.from('This is a test file with advanced features.');
const fileUpload = await uploadFile(fileData, {
filename: 'advanced-test.txt',
metadata: {
contentType: 'text/plain',
tags: ['example', 'test'],
owner: 'user1',
}
});
logger.info(`Uploaded file with CID: ${fileUpload.cid}`);
// Retrieve the file
const file = await getFile(fileUpload.cid);
logger.info('Retrieved file content:', file.data.toString());
logger.info('File metadata:', file.metadata);
// Get performance metrics
const metrics = getMetrics();
logger.info('Database metrics:', {
operations: metrics.operations,
performance: {
averageOperationTime: `${metrics.performance.averageOperationTime?.toFixed(2)}ms`,
},
cache: metrics.cacheStats,
});
// Unsubscribe from events
unsubscribe();
logger.info('Unsubscribed from events');
// Delete a user
await remove('users', 'user3');
logger.info('Deleted user user3');
return true;
} catch (error) {
if (error && typeof error === 'object' && 'code' in error) {
logger.error(`Database error (${error.code}):`, error.message);
} else {
logger.error('Error in advanced database example:', error);
}
return false;
}
}
advancedDatabaseExample();

View File

@ -1,49 +0,0 @@
import { init as initIpfs, stop as stopIpfs } from '../src/ipfs/ipfsService';
import { init as initOrbitDB } from '../src/orbit/orbitDBService';
import { createServiceLogger } from '../src/utils/logger';
const appLogger = createServiceLogger('APP');
async function startNode() {
try {
appLogger.info('Starting Debros node...');
// Initialize IPFS
const ipfs = await initIpfs();
appLogger.info('IPFS node initialized');
// Initialize OrbitDB
const orbitdb = await initOrbitDB();
appLogger.info('OrbitDB initialized');
// Create a test database
const db = await orbitdb.open('test-db', {
type: 'feed',
overwrite: false,
});
appLogger.info(`Database opened: ${db.address.toString()}`);
// Add some data
const hash = await db.add('Hello from Debros Network!');
appLogger.info(`Added entry: ${hash}`);
// Query data
const allEntries = db.iterator({ limit: 10 }).collect();
appLogger.info('Database entries:', allEntries);
// Keep the process running
appLogger.info('Node is running. Press Ctrl+C to stop.');
// Handle shutdown
process.on('SIGINT', async () => {
appLogger.info('Shutting down...');
await orbitdb.stop();
await stopIpfs();
process.exit(0);
});
} catch (error) {
appLogger.error('Failed to start node:', error);
}
}
startNode();

View File

@ -1,6 +1,6 @@
{
"name": "@debros/network",
"version": "0.0.22-alpha",
"version": "0.0.23-alpha",
"description": "Debros network core functionality for IPFS, libp2p and OrbitDB",
"type": "module",
"main": "dist/index.js",

5935
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@ -57,7 +57,8 @@ export const defaultConfig: DebrosConfig = {
heartbeatInterval: parseInt(process.env.HEARTBEAT_INTERVAL || '5000'),
staleTimeout: parseInt(process.env.STALE_PEER_TIMEOUT || '30000'),
logInterval: parseInt(process.env.PEER_LOG_INTERVAL || '60000'),
publicAddress: process.env.NODE_PUBLIC_ADDRESS || `http://localhost:${process.env.PORT || 7777}`,
publicAddress:
process.env.NODE_PUBLIC_ADDRESS || `http://localhost:${process.env.PORT || 7777}`,
},
},
orbitdb: {

View File

@ -1,62 +0,0 @@
import NodeCache from 'node-cache';
import { createServiceLogger } from '../../utils/logger';
const logger = createServiceLogger('DB_CACHE');
// Cache for frequently accessed documents
const cache = new NodeCache({
stdTTL: 300, // 5 minutes default TTL
checkperiod: 60, // Check for expired items every 60 seconds
useClones: false, // Don't clone objects (for performance)
});
// Cache statistics
export const cacheStats = {
hits: 0,
misses: 0,
};
/**
* Get an item from cache
*/
export const get = <T>(key: string): T | undefined => {
const value = cache.get<T>(key);
if (value !== undefined) {
cacheStats.hits++;
return value;
}
cacheStats.misses++;
return undefined;
};
/**
* Set an item in cache
*/
export const set = <T>(key: string, value: T, ttl?: number): boolean => {
if (ttl === undefined) {
return cache.set(key, value);
}
return cache.set(key, value, ttl);
};
/**
* Delete an item from cache
*/
export const del = (key: string | string[]): number => {
return cache.del(key);
};
/**
* Flush the entire cache
*/
export const flushAll = (): void => {
cache.flushAll();
};
/**
* Reset cache statistics
*/
export const resetStats = (): void => {
cacheStats.hits = 0;
cacheStats.misses = 0;
};

View File

@ -9,41 +9,93 @@ const logger = createServiceLogger('DB_CONNECTION');
// Connection pool of database instances
const connections = new Map<string, DBConnection>();
let defaultConnectionId: string | null = null;
let cleanupInterval: NodeJS.Timeout | null = null;
// Configuration
const CONNECTION_TIMEOUT = 3600000; // 1 hour in milliseconds
const CLEANUP_INTERVAL = 300000; // 5 minutes in milliseconds
const MAX_RETRY_ATTEMPTS = 3;
const RETRY_DELAY = 2000; // 2 seconds
/**
* Initialize the database service
* This abstracts away OrbitDB and IPFS from the end user
*/
export const init = async (connectionId?: string): Promise<string> => {
try {
const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
logger.info(`Initializing DB service with connection ID: ${connId}`);
// Initialize IPFS
const ipfsInstance = await initIpfs();
// Initialize OrbitDB
const orbitdbInstance = await initOrbitDB();
// Store connection in pool
connections.set(connId, {
ipfs: ipfsInstance,
orbitdb: orbitdbInstance,
timestamp: Date.now(),
isActive: true,
});
// Set as default if no default exists
if (!defaultConnectionId) {
defaultConnectionId = connId;
}
logger.info(`DB service initialized successfully with connection ID: ${connId}`);
return connId;
} catch (error) {
logger.error('Failed to initialize DB service:', error);
throw new DBError(ErrorCode.INITIALIZATION_FAILED, 'Failed to initialize database service', error);
// Start connection cleanup interval if not already running
if (!cleanupInterval) {
cleanupInterval = setInterval(cleanupStaleConnections, CLEANUP_INTERVAL);
logger.info(`Connection cleanup scheduled every ${CLEANUP_INTERVAL / 60000} minutes`);
}
const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
logger.info(`Initializing DB service with connection ID: ${connId}`);
let attempts = 0;
let lastError: any = null;
// Retry initialization with exponential backoff
while (attempts < MAX_RETRY_ATTEMPTS) {
try {
// Initialize IPFS with retry logic
const ipfsInstance = await initIpfs().catch((error) => {
logger.error(
`IPFS initialization failed (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS}):`,
error,
);
throw error;
});
// Initialize OrbitDB
const orbitdbInstance = await initOrbitDB().catch((error) => {
logger.error(
`OrbitDB initialization failed (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS}):`,
error,
);
throw error;
});
// Store connection in pool
connections.set(connId, {
ipfs: ipfsInstance,
orbitdb: orbitdbInstance,
timestamp: Date.now(),
isActive: true,
});
// Set as default if no default exists
if (!defaultConnectionId) {
defaultConnectionId = connId;
}
logger.info(`DB service initialized successfully with connection ID: ${connId}`);
return connId;
} catch (error) {
lastError = error;
attempts++;
if (attempts >= MAX_RETRY_ATTEMPTS) {
logger.error(
`Failed to initialize DB service after ${MAX_RETRY_ATTEMPTS} attempts:`,
error,
);
break;
}
// Wait before retrying with exponential backoff
const delay = RETRY_DELAY * Math.pow(2, attempts - 1);
logger.info(
`Retrying initialization in ${delay}ms (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS})...`,
);
await new Promise((resolve) => setTimeout(resolve, delay));
}
}
throw new DBError(
ErrorCode.INITIALIZATION_FAILED,
`Failed to initialize database service after ${MAX_RETRY_ATTEMPTS} attempts`,
lastError,
);
};
/**
@ -51,26 +103,70 @@ export const init = async (connectionId?: string): Promise<string> => {
*/
export const getConnection = (connectionId?: string): DBConnection => {
const connId = connectionId || defaultConnectionId;
if (!connId || !connections.has(connId)) {
throw new DBError(
ErrorCode.NOT_INITIALIZED,
`No active database connection found${connectionId ? ` for ID: ${connectionId}` : ''}`
ErrorCode.NOT_INITIALIZED,
`No active database connection found${connectionId ? ` for ID: ${connectionId}` : ''}`,
);
}
const connection = connections.get(connId)!;
if (!connection.isActive) {
throw new DBError(
ErrorCode.CONNECTION_ERROR,
`Connection ${connId} is no longer active`
);
throw new DBError(ErrorCode.CONNECTION_ERROR, `Connection ${connId} is no longer active`);
}
// Update the timestamp to mark connection as recently used
connection.timestamp = Date.now();
return connection;
};
/**
* Cleanup stale connections to prevent memory leaks
*/
export const cleanupStaleConnections = (): void => {
try {
const now = Date.now();
let removedCount = 0;
// Identify stale connections (older than CONNECTION_TIMEOUT)
for (const [id, connection] of connections.entries()) {
if (connection.isActive && now - connection.timestamp > CONNECTION_TIMEOUT) {
logger.info(
`Closing stale connection: ${id} (inactive for ${(now - connection.timestamp) / 60000} minutes)`,
);
// Close connection asynchronously (don't await to avoid blocking)
closeConnection(id)
.then((success) => {
if (success) {
logger.info(`Successfully closed stale connection: ${id}`);
} else {
logger.warn(`Failed to close stale connection: ${id}`);
}
})
.catch((error) => {
logger.error(`Error closing stale connection ${id}:`, error);
});
removedCount++;
} else if (!connection.isActive) {
// Remove inactive connections from the map
connections.delete(id);
removedCount++;
}
}
if (removedCount > 0) {
logger.info(`Cleaned up ${removedCount} stale or inactive connections`);
}
} catch (error) {
logger.error('Error during connection cleanup:', error);
}
};
/**
* Close a specific database connection
*/
@ -78,22 +174,22 @@ export const closeConnection = async (connectionId: string): Promise<boolean> =>
if (!connections.has(connectionId)) {
return false;
}
try {
const connection = connections.get(connectionId)!;
// Stop OrbitDB
if (connection.orbitdb) {
await connection.orbitdb.stop();
}
// Mark connection as inactive
connection.isActive = false;
// If this was the default connection, clear it
if (defaultConnectionId === connectionId) {
defaultConnectionId = null;
// Try to find another active connection to be the default
for (const [id, conn] of connections.entries()) {
if (conn.isActive) {
@ -102,7 +198,10 @@ export const closeConnection = async (connectionId: string): Promise<boolean> =>
}
}
}
// Remove the connection from the pool
connections.delete(connectionId);
logger.info(`Closed database connection: ${connectionId}`);
return true;
} catch (error) {
@ -116,23 +215,36 @@ export const closeConnection = async (connectionId: string): Promise<boolean> =>
*/
export const stop = async (): Promise<void> => {
try {
// Stop the cleanup interval
if (cleanupInterval) {
clearInterval(cleanupInterval);
cleanupInterval = null;
}
// Close all connections
const promises: Promise<boolean>[] = [];
for (const [id, connection] of connections.entries()) {
if (connection.isActive) {
await closeConnection(id);
promises.push(closeConnection(id));
}
}
// Wait for all connections to close
await Promise.allSettled(promises);
// Stop IPFS if needed
const ipfs = connections.get(defaultConnectionId || '')?.ipfs;
if (ipfs) {
await stopIpfs();
}
// Clear all connections
connections.clear();
defaultConnectionId = null;
logger.info('All DB connections stopped successfully');
} catch (error) {
} catch (error: any) {
logger.error('Error stopping DB connections:', error);
throw error;
throw new Error(`Failed to stop database connections: ${error.message}`);
}
};
};

View File

@ -3,7 +3,6 @@ import { ErrorCode } from '../types';
// Re-export error code for easier access
export { ErrorCode };
// Custom error class with error codes
export class DBError extends Error {
code: ErrorCode;
@ -15,4 +14,4 @@ export class DBError extends Error {
this.code = code;
this.details = details;
}
}
}

View File

@ -1,17 +1,23 @@
import { createServiceLogger } from '../utils/logger';
import { init, getConnection, closeConnection, stop } from './core/connection';
import { defineSchema, validateDocument } from './schema/validator';
import * as cache from './cache/cacheService';
import { init, closeConnection, stop } from './core/connection';
import { defineSchema } from './schema/validator';
import * as events from './events/eventService';
import { getMetrics, resetMetrics } from './metrics/metricsService';
import { Transaction } from './transactions/transactionService';
import { StoreType, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions, ErrorCode } from './types';
import {
StoreType,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
ErrorCode,
} from './types';
import { DBError } from './core/error';
import { getStore } from './stores/storeFactory';
import { uploadFile, getFile, deleteFile } from './stores/fileStore';
// Re-export imported functions
export { init, closeConnection, stop, defineSchema, getMetrics, resetMetrics, uploadFile, getFile, deleteFile };
export { init, closeConnection, stop, defineSchema, uploadFile, getFile, deleteFile };
const logger = createServiceLogger('DB_SERVICE');
@ -25,52 +31,44 @@ export const createTransaction = (connectionId?: string): Transaction => {
/**
* Execute all operations in a transaction
*/
export const commitTransaction = async (transaction: Transaction): Promise<{ success: boolean; results: any[] }> => {
export const commitTransaction = async (
transaction: Transaction,
): Promise<{ success: boolean; results: any[] }> => {
try {
// Validate that we have operations
const operations = transaction.getOperations();
if (operations.length === 0) {
return { success: true, results: [] };
}
const connectionId = transaction.getConnectionId();
const results = [];
// Execute all operations
for (const operation of operations) {
let result;
switch (operation.type) {
case 'create':
result = await create(
operation.collection,
operation.id,
operation.data,
{ connectionId }
);
result = await create(operation.collection, operation.id, operation.data, {
connectionId,
});
break;
case 'update':
result = await update(
operation.collection,
operation.id,
operation.data,
{ connectionId }
);
result = await update(operation.collection, operation.id, operation.data, {
connectionId,
});
break;
case 'delete':
result = await remove(
operation.collection,
operation.id,
{ connectionId }
);
result = await remove(operation.collection, operation.id, { connectionId });
break;
}
results.push(result);
}
return { success: true, results };
} catch (error) {
logger.error('Transaction failed:', error);
@ -82,10 +80,10 @@ export const commitTransaction = async (transaction: Transaction): Promise<{ suc
* Create a new document in the specified collection using the appropriate store
*/
export const create = async <T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: { connectionId?: string, storeType?: StoreType }
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: { connectionId?: string; storeType?: StoreType },
): Promise<CreateResult> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
@ -96,9 +94,9 @@ export const create = async <T extends Record<string, any>>(
* Get a document by ID from a collection
*/
export const get = async <T extends Record<string, any>>(
collection: string,
id: string,
options?: { connectionId?: string; skipCache?: boolean, storeType?: StoreType }
collection: string,
id: string,
options?: { connectionId?: string; skipCache?: boolean; storeType?: StoreType },
): Promise<T | null> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
@ -109,10 +107,10 @@ export const get = async <T extends Record<string, any>>(
* Update a document in a collection
*/
export const update = async <T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: { connectionId?: string; upsert?: boolean, storeType?: StoreType }
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: { connectionId?: string; upsert?: boolean; storeType?: StoreType },
): Promise<UpdateResult> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
@ -123,9 +121,9 @@ export const update = async <T extends Record<string, any>>(
* Delete a document from a collection
*/
export const remove = async (
collection: string,
id: string,
options?: { connectionId?: string, storeType?: StoreType }
collection: string,
id: string,
options?: { connectionId?: string; storeType?: StoreType },
): Promise<boolean> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
@ -136,12 +134,12 @@ export const remove = async (
* List all documents in a collection with pagination
*/
export const list = async <T extends Record<string, any>>(
collection: string,
options?: ListOptions & { storeType?: StoreType }
collection: string,
options?: ListOptions & { storeType?: StoreType },
): Promise<PaginatedResult<T>> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
// Remove storeType from options
const { storeType: _, ...storeOptions } = options || {};
return store.list(collection, storeOptions);
@ -151,13 +149,13 @@ export const list = async <T extends Record<string, any>>(
* Query documents in a collection with filtering and pagination
*/
export const query = async <T extends Record<string, any>>(
collection: string,
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions & { storeType?: StoreType }
options?: QueryOptions & { storeType?: StoreType },
): Promise<PaginatedResult<T>> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
// Remove storeType from options
const { storeType: _, ...storeOptions } = options || {};
return store.query(collection, filter, storeOptions);
@ -169,7 +167,7 @@ export const query = async <T extends Record<string, any>>(
export const createIndex = async (
collection: string,
field: string,
options?: { connectionId?: string, storeType?: StoreType }
options?: { connectionId?: string; storeType?: StoreType },
): Promise<boolean> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
@ -204,9 +202,7 @@ export default {
getFile,
deleteFile,
defineSchema,
getMetrics,
resetMetrics,
closeConnection,
stop,
StoreType
};
StoreType,
};

View File

@ -6,12 +6,9 @@ type DBEventType = 'document:created' | 'document:updated' | 'document:deleted';
/**
* Subscribe to database events
*/
export const subscribe = (
event: DBEventType,
callback: (data: any) => void
): () => void => {
export const subscribe = (event: DBEventType, callback: (data: any) => void): (() => void) => {
dbEvents.on(event, callback);
// Return unsubscribe function
return () => {
dbEvents.off(event, callback);
@ -21,10 +18,7 @@ export const subscribe = (
/**
* Emit an event
*/
export const emit = (
event: DBEventType,
data: any
): void => {
export const emit = (event: DBEventType, data: any): void => {
dbEvents.emit(event, data);
};
@ -33,4 +27,4 @@ export const emit = (
*/
export const removeAllListeners = (): void => {
dbEvents.removeAllListeners();
};
};

View File

@ -1,101 +0,0 @@
import { Metrics, ErrorCode } from '../types';
import { DBError } from '../core/error';
import * as cacheService from '../cache/cacheService';
// Metrics tracking
const metrics: Metrics = {
operations: {
creates: 0,
reads: 0,
updates: 0,
deletes: 0,
queries: 0,
fileUploads: 0,
fileDownloads: 0,
},
performance: {
totalOperationTime: 0,
operationCount: 0,
},
errors: {
count: 0,
byCode: {},
},
cacheStats: {
hits: 0,
misses: 0,
},
startTime: Date.now(),
};
/**
* Measure performance of a database operation
*/
export async function measurePerformance<T>(operation: () => Promise<T>): Promise<T> {
const startTime = performance.now();
try {
const result = await operation();
const endTime = performance.now();
metrics.performance.totalOperationTime += (endTime - startTime);
metrics.performance.operationCount++;
return result;
} catch (error) {
const endTime = performance.now();
metrics.performance.totalOperationTime += (endTime - startTime);
metrics.performance.operationCount++;
// Track error metrics
metrics.errors.count++;
if (error instanceof DBError) {
metrics.errors.byCode[error.code] = (metrics.errors.byCode[error.code] || 0) + 1;
}
throw error;
}
}
/**
* Get database metrics
*/
export const getMetrics = (): Metrics => {
return {
...metrics,
// Sync cache stats
cacheStats: cacheService.cacheStats,
// Calculate some derived metrics
performance: {
...metrics.performance,
averageOperationTime: metrics.performance.operationCount > 0 ?
metrics.performance.totalOperationTime / metrics.performance.operationCount :
0
}
};
};
/**
* Reset metrics (useful for testing)
*/
export const resetMetrics = (): void => {
metrics.operations = {
creates: 0,
reads: 0,
updates: 0,
deletes: 0,
queries: 0,
fileUploads: 0,
fileDownloads: 0,
};
metrics.performance = {
totalOperationTime: 0,
operationCount: 0,
};
metrics.errors = {
count: 0,
byCode: {},
};
// Reset cache stats too
cacheService.resetStats();
metrics.startTime = Date.now();
};

View File

@ -20,146 +20,144 @@ export const defineSchema = (collection: string, schema: CollectionSchema): void
*/
export const validateDocument = (collection: string, document: any): boolean => {
const schema = schemas.get(collection);
if (!schema) {
return true; // No schema defined, so validation passes
}
// Check required fields
if (schema.required) {
for (const field of schema.required) {
if (document[field] === undefined) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Required field '${field}' is missing`,
{ collection, document }
);
throw new DBError(ErrorCode.INVALID_SCHEMA, `Required field '${field}' is missing`, {
collection,
document,
});
}
}
}
// Validate properties
for (const [field, definition] of Object.entries(schema.properties)) {
const value = document[field];
// Skip undefined optional fields
if (value === undefined) {
if (definition.required) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Required field '${field}' is missing`,
{ collection, document }
);
throw new DBError(ErrorCode.INVALID_SCHEMA, `Required field '${field}' is missing`, {
collection,
document,
});
}
continue;
}
// Type validation
switch (definition.type) {
case 'string':
if (typeof value !== 'string') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be a string`,
{ collection, field, value }
);
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be a string`, {
collection,
field,
value,
});
}
// Pattern validation
if (definition.pattern && !new RegExp(definition.pattern).test(value)) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' does not match pattern: ${definition.pattern}`,
{ collection, field, value }
{ collection, field, value },
);
}
// Length validation
if (definition.min !== undefined && value.length < definition.min) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at least ${definition.min} characters`,
{ collection, field, value }
{ collection, field, value },
);
}
if (definition.max !== undefined && value.length > definition.max) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at most ${definition.max} characters`,
{ collection, field, value }
{ collection, field, value },
);
}
break;
case 'number':
if (typeof value !== 'number') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be a number`,
{ collection, field, value }
);
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be a number`, {
collection,
field,
value,
});
}
// Range validation
if (definition.min !== undefined && value < definition.min) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be at least ${definition.min}`,
{ collection, field, value }
{ collection, field, value },
);
}
if (definition.max !== undefined && value > definition.max) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be at most ${definition.max}`,
{ collection, field, value }
{ collection, field, value },
);
}
break;
case 'boolean':
if (typeof value !== 'boolean') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be a boolean`,
{ collection, field, value }
);
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be a boolean`, {
collection,
field,
value,
});
}
break;
case 'array':
if (!Array.isArray(value)) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be an array`,
{ collection, field, value }
);
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be an array`, {
collection,
field,
value,
});
}
// Length validation
if (definition.min !== undefined && value.length < definition.min) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at least ${definition.min} items`,
{ collection, field, value }
{ collection, field, value },
);
}
if (definition.max !== undefined && value.length > definition.max) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at most ${definition.max} items`,
{ collection, field, value }
{ collection, field, value },
);
}
// Validate array items if item schema is defined
if (definition.items && value.length > 0) {
for (let i = 0; i < value.length; i++) {
const item = value[i];
// This is a simplified item validation
// In a real implementation, this would recursively validate complex objects
switch (definition.items.type) {
@ -168,27 +166,27 @@ export const validateDocument = (collection: string, document: any): boolean =>
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a string`,
{ collection, field, item }
{ collection, field, item },
);
}
break;
case 'number':
if (typeof item !== 'number') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a number`,
{ collection, field, item }
{ collection, field, item },
);
}
break;
case 'boolean':
if (typeof item !== 'boolean') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a boolean`,
{ collection, field, item }
{ collection, field, item },
);
}
break;
@ -196,30 +194,30 @@ export const validateDocument = (collection: string, document: any): boolean =>
}
}
break;
case 'object':
if (typeof value !== 'object' || value === null || Array.isArray(value)) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be an object`,
{ collection, field, value }
);
throw new DBError(ErrorCode.INVALID_SCHEMA, `Field '${field}' must be an object`, {
collection,
field,
value,
});
}
// Nested object validation would go here in a real implementation
break;
case 'enum':
if (definition.enum && !definition.enum.includes(value)) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be one of: ${definition.enum.join(', ')}`,
{ collection, field, value }
{ collection, field, value },
);
}
break;
}
}
return true;
};
};

View File

@ -0,0 +1,413 @@
import { createServiceLogger } from '../../utils/logger';
import {
ErrorCode,
StoreType,
StoreOptions,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
acquireLock,
releaseLock,
isLocked,
} from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as events from '../events/eventService';
/**
* Abstract store implementation with common CRUD operations
* Specific store types extend this class and customize only what's different
*/
export abstract class AbstractStore implements BaseStore {
protected logger = createServiceLogger(this.getLoggerName());
protected storeType: StoreType;
constructor(storeType: StoreType) {
this.storeType = storeType;
}
/**
* Must be implemented by subclasses to provide the logger name
*/
protected abstract getLoggerName(): string;
/**
* Create a new document in the specified collection
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions,
): Promise<CreateResult> {
// Create a lock ID for this resource to prevent concurrent operations
const lockId = `${collection}:${id}:create`;
// Try to acquire a lock
if (!acquireLock(lockId)) {
this.logger.warn(
`Concurrent operation detected on ${collection}/${id}, waiting for completion`,
);
// Wait until the lock is released (poll every 100ms for max 5 seconds)
let attempts = 0;
while (isLocked(lockId) && attempts < 50) {
await new Promise((resolve) => setTimeout(resolve, 100));
attempts++;
}
if (isLocked(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Timed out waiting for lock on ${collection}/${id}`,
);
}
// Try to acquire lock again
if (!acquireLock(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to acquire lock on ${collection}/${id}`,
);
}
}
try {
const db = await openStore(collection, this.storeType, options);
// Prepare document for storage with validation
const document = this.prepareCreateDocument<T>(collection, id, data);
// Add to database - this will be overridden by specific implementations if needed
const hash = await this.performCreate(db, id, document);
// Emit change event
events.emit('document:created', { collection, id, document });
this.logger.info(`Created document in ${collection} with id ${id}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`Error creating document in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to create document in ${collection}: ${error instanceof Error ? error.message : String(error)}`,
error,
);
} finally {
// Always release the lock when done
releaseLock(lockId);
}
}
/**
* Prepare a document for creation - can be overridden by subclasses
*/
protected prepareCreateDocument<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
): any {
return prepareDocument<T>(collection, data);
}
/**
* Perform the actual create operation - should be implemented by subclasses
*/
protected abstract performCreate(db: any, id: string, document: any): Promise<string>;
/**
* Get a document by ID from a collection
*/
async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean },
): Promise<T | null> {
try {
const db = await openStore(collection, this.storeType, options);
const document = await this.performGet<T>(db, id);
return document;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`Error getting document ${id} from ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to get document ${id} from ${collection}: ${error instanceof Error ? error.message : String(error)}`,
error,
);
}
}
/**
* Perform the actual get operation - should be implemented by subclasses
*/
protected abstract performGet<T>(db: any, id: string): Promise<T | null>;
/**
* Update a document in a collection
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean },
): Promise<UpdateResult> {
// Create a lock ID for this resource to prevent concurrent operations
const lockId = `${collection}:${id}:update`;
// Try to acquire a lock
if (!acquireLock(lockId)) {
this.logger.warn(
`Concurrent operation detected on ${collection}/${id}, waiting for completion`,
);
// Wait until the lock is released (poll every 100ms for max 5 seconds)
let attempts = 0;
while (isLocked(lockId) && attempts < 50) {
await new Promise((resolve) => setTimeout(resolve, 100));
attempts++;
}
if (isLocked(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Timed out waiting for lock on ${collection}/${id}`,
);
}
// Try to acquire lock again
if (!acquireLock(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to acquire lock on ${collection}/${id}`,
);
}
}
try {
const db = await openStore(collection, this.storeType, options);
const existing = await this.performGet<T>(db, id);
if (!existing && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Document ${id} not found in ${collection}`,
{ collection, id },
);
}
// Prepare document for update with validation
const document = this.prepareUpdateDocument<T>(collection, id, data, existing || undefined);
// Update in database
const hash = await this.performUpdate(db, id, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existing });
this.logger.info(`Updated document in ${collection} with id ${id}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`Error updating document in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to update document in ${collection}: ${error instanceof Error ? error.message : String(error)}`,
error,
);
} finally {
// Always release the lock when done
releaseLock(lockId);
}
}
/**
* Prepare a document for update - can be overridden by subclasses
*/
protected prepareUpdateDocument<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
existing?: T,
): any {
return prepareDocument<T>(
collection,
data as unknown as Omit<T, 'createdAt' | 'updatedAt'>,
existing,
);
}
/**
* Perform the actual update operation - should be implemented by subclasses
*/
protected abstract performUpdate(db: any, id: string, document: any): Promise<string>;
/**
* Delete a document from a collection
*/
async remove(collection: string, id: string, options?: StoreOptions): Promise<boolean> {
// Create a lock ID for this resource to prevent concurrent operations
const lockId = `${collection}:${id}:remove`;
// Try to acquire a lock
if (!acquireLock(lockId)) {
this.logger.warn(
`Concurrent operation detected on ${collection}/${id}, waiting for completion`,
);
// Wait until the lock is released (poll every 100ms for max 5 seconds)
let attempts = 0;
while (isLocked(lockId) && attempts < 50) {
await new Promise((resolve) => setTimeout(resolve, 100));
attempts++;
}
if (isLocked(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Timed out waiting for lock on ${collection}/${id}`,
);
}
// Try to acquire lock again
if (!acquireLock(lockId)) {
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to acquire lock on ${collection}/${id}`,
);
}
}
try {
const db = await openStore(collection, this.storeType, options);
// Get the document before deleting for the event
const document = await this.performGet(db, id);
if (!document) {
this.logger.warn(`Document ${id} not found in ${collection} for deletion`);
return false;
}
// Delete from database
await this.performRemove(db, id);
// Emit change event
events.emit('document:deleted', { collection, id, document });
this.logger.info(`Deleted document in ${collection} with id ${id}`);
return true;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`Error deleting document in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to delete document in ${collection}: ${error instanceof Error ? error.message : String(error)}`,
error,
);
} finally {
// Always release the lock when done
releaseLock(lockId);
}
}
/**
* Perform the actual remove operation - should be implemented by subclasses
*/
protected abstract performRemove(db: any, id: string): Promise<void>;
/**
* Apply sorting to a list of documents
*/
protected applySorting<T extends Record<string, any>>(
documents: T[],
options?: ListOptions | QueryOptions,
): T[] {
if (!options?.sort) {
return documents;
}
const { field, order } = options.sort;
return [...documents].sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc'
? valueA.getTime() - valueB.getTime()
: valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc'
? String(valueA).localeCompare(String(valueB))
: String(valueB).localeCompare(String(valueA));
});
}
/**
* Apply pagination to a list of documents
*/
protected applyPagination<T>(
documents: T[],
options?: ListOptions | QueryOptions,
): {
documents: T[];
total: number;
hasMore: boolean;
} {
const total = documents.length;
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore,
};
}
/**
* List all documents in a collection with pagination
*/
abstract list<T extends Record<string, any>>(
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>>;
/**
* Query documents in a collection with filtering and pagination
*/
abstract query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions,
): Promise<PaginatedResult<T>>;
/**
* Create an index for a collection to speed up queries
*/
abstract createIndex(collection: string, field: string, options?: StoreOptions): Promise<boolean>;
}

View File

@ -1,6 +1,5 @@
import { createServiceLogger } from '../../utils/logger';
import { openDB } from '../../orbit/orbitDBService';
import { getConnection } from '../core/connection';
import { validateDocument } from '../schema/validator';
import {
ErrorCode,
@ -20,9 +19,6 @@ const logger = createServiceLogger('DB_STORE');
* Base Store interface that all store implementations should extend
*/
export interface BaseStore {
/**
* Create a new document
*/
create<T extends Record<string, any>>(
collection: string,
id: string,
@ -30,18 +26,12 @@ export interface BaseStore {
options?: StoreOptions,
): Promise<CreateResult>;
/**
* Get a document by ID
*/
get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean },
): Promise<T | null>;
/**
* Update a document
*/
update<T extends Record<string, any>>(
collection: string,
id: string,
@ -49,31 +39,19 @@ export interface BaseStore {
options?: StoreOptions & { upsert?: boolean },
): Promise<UpdateResult>;
/**
* Delete a document
*/
remove(collection: string, id: string, options?: StoreOptions): Promise<boolean>;
/**
* List all documents in a collection with pagination
*/
list<T extends Record<string, any>>(
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>>;
/**
* Query documents in a collection with filtering and pagination
*/
query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions,
): Promise<PaginatedResult<T>>;
/**
* Create an index for a collection to speed up queries
*/
createIndex(collection: string, field: string, options?: StoreOptions): Promise<boolean>;
}
@ -86,16 +64,22 @@ export async function openStore(
options?: StoreOptions,
): Promise<any> {
try {
const connection = getConnection(options?.connectionId);
logger.info(`Connection for ${collection}:`, connection);
// Log minimal connection info to avoid leaking sensitive data
logger.info(
`Opening ${storeType} store for collection: ${collection} (connection ID: ${options?.connectionId || 'default'})`,
);
return await openDB(collection, storeType).catch((err) => {
throw new Error(`OrbitDB openDB failed: ${err.message}`);
});
} catch (error) {
logger.error(`Error opening ${storeType} store for collection ${collection}:`, error);
// Add more context to the error for improved debugging
const errorMessage = error instanceof Error ? error.message : String(error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to open ${storeType} store for collection ${collection}`,
`Failed to open ${storeType} store for collection ${collection}: ${errorMessage}`,
error,
);
}
@ -116,27 +100,28 @@ export function prepareDocument<T extends Record<string, any>>(
Object.entries(data).map(([key, value]) => [key, value === undefined ? null : value]),
) as Omit<T, 'createdAt' | 'updatedAt'>;
// Prepare document for validation
let docToValidate: T;
// If it's an update to an existing document
if (existingDoc) {
const doc = {
docToValidate = {
...existingDoc,
...sanitizedData,
updatedAt: timestamp,
} as T;
// Validate the document against its schema
validateDocument(collection, doc);
return doc;
} else {
// Otherwise it's a new document
docToValidate = {
...sanitizedData,
createdAt: timestamp,
updatedAt: timestamp,
} as unknown as T;
}
// Otherwise it's a new document
const doc = {
...sanitizedData,
createdAt: timestamp,
updatedAt: timestamp,
} as unknown as T;
// Validate the document BEFORE processing
validateDocument(collection, docToValidate);
// Validate the document against its schema
validateDocument(collection, doc);
return doc;
// Return the validated document
return docToValidate;
}

View File

@ -1,10 +1,17 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import {
ErrorCode,
StoreType,
StoreOptions,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
} from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('COUNTER_STORE');
@ -17,315 +24,297 @@ export class CounterStore implements BaseStore {
* Create or set counter value
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions,
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Extract value from data, default to 0
const value = typeof data === 'object' && data !== null && 'value' in data ?
Number(data.value) : 0;
// Set the counter value
const hash = await db.set(value);
// Construct document representation
const document = {
id,
value,
createdAt: Date.now(),
updatedAt: Date.now()
};
// Add to cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Set counter in ${collection} to ${value}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error setting counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to set counter in ${collection}`, error);
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Extract value from data, default to 0
const value =
typeof data === 'object' && data !== null && 'value' in data ? Number(data.value) : 0;
// Set the counter value
const hash = await db.set(value);
// Construct document representation
const document = {
id,
value,
createdAt: Date.now(),
updatedAt: Date.now(),
};
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Set counter in ${collection} to ${value}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
});
logger.error(`Error setting counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to set counter in ${collection}`,
error,
);
}
}
/**
* Get counter value
*/
async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean }
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean },
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Note: for counters, id is not used in the underlying store (there's only one counter per db)
// but we use it for consistency with the API
// Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.COUNTER, options);
// Get the counter value
const value = await db.value();
// Construct document representation
const document = {
id,
value,
updatedAt: Date.now()
} as unknown as T;
// Update cache
cache.set(cacheKey, document);
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting counter from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get counter from ${collection}`, error);
try {
// Note: for counters, id is not used in the underlying store (there's only one counter per db)
// but we use it for consistency with the API
const db = await openStore(collection, StoreType.COUNTER, options);
// Get the counter value
const value = await db.value();
// Construct document representation
const document = {
id,
value,
updatedAt: Date.now(),
} as unknown as T;
return document;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
});
logger.error(`Error getting counter from ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to get counter from ${collection}`,
error,
);
}
}
/**
* Update counter (increment/decrement)
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean },
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Get current value before update
const currentValue = await db.value();
// Extract value from data
let value: number;
let operation: 'increment' | 'decrement' | 'set' = 'set';
// Check what kind of operation we're doing
if (typeof data === 'object' && data !== null) {
if ('increment' in data) {
value = Number(data.increment);
operation = 'increment';
} else if ('decrement' in data) {
value = Number(data.decrement);
operation = 'decrement';
} else if ('value' in data) {
value = Number(data.value);
operation = 'set';
} else {
value = 0;
operation = 'set';
}
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Get current value before update
const currentValue = await db.value();
// Extract value from data
let value: number;
let operation: 'increment' | 'decrement' | 'set' = 'set';
// Check what kind of operation we're doing
if (typeof data === 'object' && data !== null) {
if ('increment' in data) {
value = Number(data.increment);
operation = 'increment';
} else if ('decrement' in data) {
value = Number(data.decrement);
operation = 'decrement';
} else if ('value' in data) {
value = Number(data.value);
operation = 'set';
} else {
value = 0;
operation = 'set';
}
// Update the counter
let hash;
let newValue;
switch (operation) {
case 'increment':
hash = await db.inc(value);
newValue = currentValue + value;
break;
case 'decrement':
hash = await db.inc(-value); // Counter store uses inc with negative value
newValue = currentValue - value;
break;
case 'set':
hash = await db.set(value);
newValue = value;
break;
}
// Construct document representation
const document = {
id,
value: newValue,
updatedAt: Date.now()
};
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', {
collection,
id,
document,
previous: { id, value: currentValue }
});
logger.info(`Updated counter in ${collection} from ${currentValue} to ${newValue}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update counter in ${collection}`, error);
} else {
value = 0;
operation = 'set';
}
});
// Update the counter
let hash;
let newValue;
switch (operation) {
case 'increment':
hash = await db.inc(value);
newValue = currentValue + value;
break;
case 'decrement':
hash = await db.inc(-value); // Counter store uses inc with negative value
newValue = currentValue - value;
break;
case 'set':
hash = await db.set(value);
newValue = value;
break;
}
// Construct document representation
const document = {
id,
value: newValue,
updatedAt: Date.now(),
};
// Emit change event
events.emit('document:updated', {
collection,
id,
document,
previous: { id, value: currentValue },
});
logger.info(`Updated counter in ${collection} from ${currentValue} to ${newValue}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to update counter in ${collection}`,
error,
);
}
}
/**
* Delete/reset counter
*/
async remove(
collection: string,
id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Get the current value for the event
const currentValue = await db.value();
// Reset the counter to 0 (counters can't be truly deleted)
await db.set(0);
// Remove from cache
const cacheKey = `${collection}:${id}`;
cache.del(cacheKey);
// Emit change event
events.emit('document:deleted', {
collection,
id,
document: { id, value: currentValue }
});
logger.info(`Reset counter in ${collection} from ${currentValue} to 0`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error resetting counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to reset counter in ${collection}`, error);
async remove(collection: string, id: string, options?: StoreOptions): Promise<boolean> {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Get the current value for the event
const currentValue = await db.value();
// Reset the counter to 0 (counters can't be truly deleted)
await db.set(0);
// Emit change event
events.emit('document:deleted', {
collection,
id,
document: { id, value: currentValue },
});
logger.info(`Reset counter in ${collection} from ${currentValue} to 0`);
return true;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
});
logger.error(`Error resetting counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to reset counter in ${collection}`,
error,
);
}
}
/**
* List all counters (for counter stores, there's only one counter per db)
*/
async list<T extends Record<string, any>>(
collection: string,
options?: ListOptions
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// For counter stores, we just return one document with the counter value
const document = {
id: '0', // Default ID since counters don't have IDs
value,
updatedAt: Date.now()
} as unknown as T;
return {
documents: [document],
total: 1,
hasMore: false
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list counter in ${collection}`, error);
try {
const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// For counter stores, we just return one document with the counter value
const document = {
id: '0', // Default ID since counters don't have IDs
value,
updatedAt: Date.now(),
} as unknown as T;
return {
documents: [document],
total: 1,
hasMore: false,
};
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
});
logger.error(`Error listing counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to list counter in ${collection}`,
error,
);
}
}
/**
* Query is not applicable for counter stores, but we implement for API consistency
*/
async query<T extends Record<string, any>>(
collection: string,
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions
options?: QueryOptions,
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// Create document
const document = {
id: '0', // Default ID since counters don't have IDs
value,
updatedAt: Date.now()
} as unknown as T;
// Apply filter
const documents = filter(document) ? [document] : [];
return {
documents,
total: documents.length,
hasMore: false
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query counter in ${collection}`, error);
try {
const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// Create document
const document = {
id: '0', // Default ID since counters don't have IDs
value,
updatedAt: Date.now(),
} as unknown as T;
// Apply filter
const documents = filter(document) ? [document] : [];
return {
documents,
total: documents.length,
hasMore: false,
};
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
});
logger.error(`Error querying counter in ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to query counter in ${collection}`,
error,
);
}
}
/**
* Create an index - not applicable for counter stores
*/
async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
logger.warn(`Index creation not supported for counter collections, ignoring request for ${collection}`);
async createIndex(collection: string, _field: string, _options?: StoreOptions): Promise<boolean> {
logger.warn(
`Index creation not supported for counter collections, ignoring request for ${collection}`,
);
return false;
}
}
}

View File

@ -1,350 +1,181 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('DOCSTORE');
import { StoreType, StoreOptions, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { AbstractStore } from './abstractStore';
import { prepareDocument } from './baseStore';
/**
* DocStore implementation
* Uses OrbitDB's document store which allows for more complex document storage with indices
*/
export class DocStore implements BaseStore {
/**
* Create a new document in the specified collection
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
// Prepare document for storage (including _id which is required for docstore)
const document = {
_id: id,
...prepareDocument<T>(collection, data)
};
// Add to database
const hash = await db.put(document);
// Add to cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Created document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create document in ${collection}`, error);
}
});
export class DocStore extends AbstractStore {
constructor() {
super(StoreType.DOCSTORE);
}
/**
* Get a document by ID from a collection
*/
async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.DOCSTORE, options);
const document = await db.get(id) as T | null;
// Update cache if document exists
if (document) {
cache.set(cacheKey, document);
}
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting document ${id} from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get document ${id} from ${collection}`, error);
}
});
protected getLoggerName(): string {
return 'DOCSTORE';
}
/**
* Update a document in a collection
* Prepare a document for creation - override to add _id which is required for docstore
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
const existing = await db.get(id) as T | null;
if (!existing && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Document ${id} not found in ${collection}`,
{ collection, id }
);
}
// Prepare document for update
const document = {
_id: id,
...prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existing || undefined)
};
// Update in database
const hash = await db.put(document);
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existing });
logger.info(`Updated document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update document in ${collection}`, error);
}
});
protected prepareCreateDocument<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
): any {
return {
_id: id,
...prepareDocument<T>(collection, data),
};
}
/**
* Delete a document from a collection
* Prepare a document for update - override to add _id which is required for docstore
*/
async remove(
collection: string,
id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
// Get the document before deleting for the event
const document = await db.get(id);
// Delete from database
await db.del(id);
// Remove from cache
const cacheKey = `${collection}:${id}`;
cache.del(cacheKey);
// Emit change event
events.emit('document:deleted', { collection, id, document });
logger.info(`Deleted document in ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete document in ${collection}`, error);
}
});
protected prepareUpdateDocument<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
existing?: T,
): any {
return {
_id: id,
...prepareDocument<T>(
collection,
data as unknown as Omit<T, 'createdAt' | 'updatedAt'>,
existing,
),
};
}
/**
* Implementation for the DocStore create operation
*/
protected async performCreate(db: any, id: string, document: any): Promise<string> {
return await db.put(document);
}
/**
* Implementation for the DocStore get operation
*/
protected async performGet<T>(db: any, id: string): Promise<T | null> {
return (await db.get(id)) as T | null;
}
/**
* Implementation for the DocStore update operation
*/
protected async performUpdate(db: any, id: string, document: any): Promise<string> {
return await db.put(document);
}
/**
* Implementation for the DocStore remove operation
*/
protected async performRemove(db: any, id: string): Promise<void> {
await db.del(id);
}
/**
* List all documents in a collection with pagination
*/
async list<T extends Record<string, any>>(
collection: string,
options?: ListOptions
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
const allDocs = await db.query((doc: any) => true);
let documents = allDocs.map((doc: any) => ({
id: doc._id,
...doc
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list documents in ${collection}`, error);
}
});
try {
const db = await this.openStore(collection, options);
const allDocs = await db.query((_doc: any) => true);
// Map the documents to include id
let documents = allDocs.map((doc: any) => ({
id: doc._id,
...doc,
})) as T[];
// Apply sorting
documents = this.applySorting(documents, options);
// Apply pagination
return this.applyPagination(documents, options);
} catch (error) {
this.handleError(`Error listing documents in ${collection}`, error);
}
}
/**
* Query documents in a collection with filtering and pagination
*/
async query<T extends Record<string, any>>(
collection: string,
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions
options?: QueryOptions,
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
// Apply filter using docstore's query capability
const filtered = await db.query((doc: any) => filter(doc as T));
// Map the documents to include id
let documents = filtered.map((doc: any) => ({
id: doc._id,
...doc
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query documents in ${collection}`, error);
}
});
try {
const db = await this.openStore(collection, options);
// Apply filter using docstore's query capability
const filtered = await db.query((doc: any) => filter(doc as T));
// Map the documents to include id
let documents = filtered.map((doc: any) => ({
id: doc._id,
...doc,
})) as T[];
// Apply sorting
documents = this.applySorting(documents, options);
// Apply pagination
return this.applyPagination(documents, options);
} catch (error) {
this.handleError(`Error querying documents in ${collection}`, error);
}
}
/**
* Create an index for a collection to speed up queries
* DocStore has built-in indexing capabilities
*/
async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
async createIndex(collection: string, field: string, options?: StoreOptions): Promise<boolean> {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
const db = await this.openStore(collection, options);
// DocStore supports indexing, so we create the index
if (typeof db.createIndex === 'function') {
await db.createIndex(field);
logger.info(`Index created on ${field} for collection ${collection}`);
this.logger.info(`Index created on ${field} for collection ${collection}`);
return true;
}
logger.info(`Index creation not supported for this DB instance, but DocStore has built-in indices`);
this.logger.info(
`Index creation not supported for this DB instance, but DocStore has built-in indices`,
);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating index for ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create index for ${collection}`, error);
this.handleError(`Error creating index for ${collection}`, error);
}
}
}
/**
* Helper to open a store of the correct type
*/
private async openStore(collection: string, options?: StoreOptions): Promise<any> {
const { openStore } = await import('./baseStore');
return await openStore(collection, this.storeType, options);
}
/**
* Helper to handle errors consistently
*/
private handleError(message: string, error: any): never {
const { DBError, ErrorCode } = require('../core/error');
if (error instanceof DBError) {
throw error;
}
this.logger.error(`${message}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `${message}: ${error.message}`, error);
}
}

View File

@ -1,10 +1,17 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import {
ErrorCode,
StoreType,
StoreOptions,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
} from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('FEED_STORE');
@ -18,399 +25,391 @@ export class FeedStore implements BaseStore {
* For feeds, this appends a new entry
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions,
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Prepare document for storage with ID
const document = {
id,
...prepareDocument<T>(collection, data)
};
// Add to database
const hash = await db.add(document);
// Feed entries are append-only, so we use a different cache key pattern
const cacheKey = `${collection}:entry:${hash}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document, hash });
logger.info(`Created entry in feed ${collection} with id ${id} and hash ${hash}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating entry in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create entry in feed ${collection}`, error);
try {
const db = await openStore(collection, StoreType.FEED, options);
// Prepare document for storage with ID
const document = {
id,
...prepareDocument<T>(collection, data),
};
// Add to database
const hash = await db.add(document);
// Emit change event
events.emit('document:created', { collection, id, document, hash });
logger.info(`Created entry in feed ${collection} with id ${id} and hash ${hash}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
});
logger.error(`Error creating entry in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to create entry in feed ${collection}`,
error,
);
}
}
/**
* Get a specific entry in a feed - note this works differently than other stores
* as feeds are append-only logs identified by hash
*/
async get<T extends Record<string, any>>(
collection: string,
hash: string,
options?: StoreOptions & { skipCache?: boolean }
collection: string,
hash: string,
options?: StoreOptions & { skipCache?: boolean },
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Check cache first if not skipped
const cacheKey = `${collection}:entry:${hash}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.FEED, options);
// Get the specific entry by hash
const entry = await db.get(hash);
if (!entry) {
return null;
}
const document = entry.payload.value as T;
// Update cache
cache.set(cacheKey, document);
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting entry ${hash} from feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get entry ${hash} from feed ${collection}`, error);
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get the specific entry by hash
const entry = await db.get(hash);
if (!entry) {
return null;
}
});
const document = entry.payload.value as T;
return document;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting entry ${hash} from feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to get entry ${hash} from feed ${collection}`,
error,
);
}
}
/**
* Update an entry in a feed
* Note: Feeds are append-only, so we can't actually update existing entries
* Instead, we append a new entry with the updated data and link it to the original
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean },
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Find the latest entry with the given id
const entries = await db.iterator({ limit: -1 }).collect();
const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value;
return value && value.id === id;
});
if (existingEntryIndex === -1 && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Entry with id ${id} not found in feed ${collection}`,
{ collection, id }
);
}
const existingEntry = existingEntryIndex !== -1 ? entries[existingEntryIndex].payload.value : null;
// Prepare document with update
const document = {
id,
...prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existingEntry),
// Add reference to the previous entry if it exists
previousEntryHash: existingEntryIndex !== -1 ? entries[existingEntryIndex].hash : undefined
};
// Add to feed (append new entry)
const hash = await db.add(document);
// Cache the new entry
const cacheKey = `${collection}:entry:${hash}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existingEntry });
logger.info(`Updated entry in feed ${collection} with id ${id} (new hash: ${hash})`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating entry in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update entry in feed ${collection}`, error);
try {
const db = await openStore(collection, StoreType.FEED, options);
const entries = await db.iterator({ limit: -1 }).collect();
const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value;
return value && value.id === id;
});
if (existingEntryIndex === -1 && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Entry with id ${id} not found in feed ${collection}`,
{ collection, id },
);
}
});
const existingEntry =
existingEntryIndex !== -1 ? entries[existingEntryIndex].payload.value : null;
// Prepare document with update
const document = {
id,
...prepareDocument<T>(
collection,
data as unknown as Omit<T, 'createdAt' | 'updatedAt'>,
existingEntry,
),
// Add reference to the previous entry if it exists
previousEntryHash: existingEntryIndex !== -1 ? entries[existingEntryIndex].hash : undefined,
};
// Add to feed (append new entry)
const hash = await db.add(document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existingEntry });
logger.info(`Updated entry in feed ${collection} with id ${id} (new hash: ${hash})`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating entry in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to update entry in feed ${collection}`,
error,
);
}
}
/**
* Delete is not supported in feed/eventlog stores since they're append-only
* Instead, we add a "tombstone" entry that marks the entry as deleted
*/
async remove(
collection: string,
id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Find the entry with the given id
const entries = await db.iterator({ limit: -1 }).collect();
const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value;
return value && value.id === id;
});
if (existingEntryIndex === -1) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Entry with id ${id} not found in feed ${collection}`,
{ collection, id }
);
}
const existingEntry = entries[existingEntryIndex].payload.value;
const existingHash = entries[existingEntryIndex].hash;
// Add a "tombstone" entry that marks this as deleted
const tombstone = {
id,
deleted: true,
deletedAt: Date.now(),
previousEntryHash: existingHash
};
await db.add(tombstone);
// Emit change event
events.emit('document:deleted', { collection, id, document: existingEntry });
logger.info(`Marked entry as deleted in feed ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error marking entry as deleted in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to mark entry as deleted in feed ${collection}`, error);
async remove(collection: string, id: string, options?: StoreOptions): Promise<boolean> {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Find the entry with the given id
const entries = await db.iterator({ limit: -1 }).collect();
const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value;
return value && value.id === id;
});
if (existingEntryIndex === -1) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Entry with id ${id} not found in feed ${collection}`,
{ collection, id },
);
}
});
const existingEntry = entries[existingEntryIndex].payload.value;
const existingHash = entries[existingEntryIndex].hash;
// Add a "tombstone" entry that marks this as deleted
const tombstone = {
id,
deleted: true,
deletedAt: Date.now(),
previousEntryHash: existingHash,
};
await db.add(tombstone);
// Emit change event
events.emit('document:deleted', { collection, id, document: existingEntry });
logger.info(`Marked entry as deleted in feed ${collection} with id ${id}`);
return true;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error marking entry as deleted in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to mark entry as deleted in feed ${collection}`,
error,
);
}
}
/**
* List all entries in a feed with pagination
* Note: This will only return the latest entry for each unique ID
*/
async list<T extends Record<string, any>>(
collection: string,
options?: ListOptions
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries
const entries = await db.iterator({ limit: -1 }).collect();
// Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries
const latestEntries = new Map<string, any>();
for (const entry of entries) {
const value = entry.payload.value;
if (!value || value.deleted) continue;
const id = value.id;
if (!id) continue;
// If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) {
const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries
const entries = await db.iterator({ limit: -1 }).collect();
// Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries
const latestEntries = new Map<string, any>();
for (const entry of entries) {
const value = entry.payload.value;
if (!value || value.deleted) continue;
const id = value.id;
if (!id) continue;
// If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) {
const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
latestEntries.set(id, { hash: entry.hash, value });
}
// Convert to array of documents
let documents = Array.from(latestEntries.values()).map(entry => ({
...entry.value
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing entries in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list entries in feed ${collection}`, error);
}
});
// Convert to array of documents
let documents = Array.from(latestEntries.values()).map((entry) => ({
...entry.value,
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc'
? valueA.getTime() - valueB.getTime()
: valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc'
? String(valueA).localeCompare(String(valueB))
: String(valueB).localeCompare(String(valueA));
});
}
// Apply pagination
const total = documents.length;
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore,
};
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing entries in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to list entries in feed ${collection}`,
error,
);
}
}
/**
* Query entries in a feed with filtering and pagination
* Note: This queries the latest entry for each unique ID
*/
async query<T extends Record<string, any>>(
collection: string,
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions
options?: QueryOptions,
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries
const entries = await db.iterator({ limit: -1 }).collect();
// Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries
const latestEntries = new Map<string, any>();
for (const entry of entries) {
const value = entry.payload.value;
if (!value || value.deleted) continue;
const id = value.id;
if (!id) continue;
// If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) {
const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries
const entries = await db.iterator({ limit: -1 }).collect();
// Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries
const latestEntries = new Map<string, any>();
for (const entry of entries) {
const value = entry.payload.value;
if (!value || value.deleted) continue;
const id = value.id;
if (!id) continue;
// If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) {
const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
latestEntries.set(id, { hash: entry.hash, value });
}
// Convert to array of documents and apply filter
let filtered = Array.from(latestEntries.values())
.filter(entry => filter(entry.value as T))
.map(entry => ({
...entry.value
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
filtered.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = filtered.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = filtered.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying entries in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query entries in feed ${collection}`, error);
}
});
// Convert to array of documents and apply filter
let filtered = Array.from(latestEntries.values())
.filter((entry) => filter(entry.value as T))
.map((entry) => ({
...entry.value,
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
filtered.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc'
? valueA.getTime() - valueB.getTime()
: valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc'
? String(valueA).localeCompare(String(valueB))
: String(valueB).localeCompare(String(valueA));
});
}
// Apply pagination
const total = filtered.length;
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = filtered.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore,
};
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying entries in feed ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to query entries in feed ${collection}`,
error,
);
}
}
/**
* Create an index for a collection - not supported for feeds
*/
async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
logger.warn(`Index creation not supported for feed collections, ignoring request for ${collection}`);
async createIndex(collection: string, _field: string, _options?: StoreOptions): Promise<boolean> {
logger.warn(
`Index creation not supported for feed collections, ignoring request for ${collection}`,
);
return false;
}
}
}

View File

@ -1,13 +1,13 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, FileUploadResult, FileResult } from '../types';
import { DBError } from '../core/error';
import { getConnection } from '../core/connection';
import { openStore } from './baseStore';
import { getHelia } from '../../ipfs/ipfsService';
import { measurePerformance } from '../metrics/metricsService';
import { CreateResult, StoreOptions } from '../types';
// Helper function to convert AsyncIterable to Buffer
async function readAsyncIterableToBuffer(asyncIterable: AsyncIterable<Uint8Array>): Promise<Buffer> {
async function readAsyncIterableToBuffer(
asyncIterable: AsyncIterable<Uint8Array>,
): Promise<Buffer> {
const chunks: Uint8Array[] = [];
for await (const chunk of asyncIterable) {
chunks.push(chunk);
@ -21,129 +21,148 @@ const logger = createServiceLogger('FILE_STORE');
* Upload a file to IPFS
*/
export const uploadFile = async (
fileData: Buffer,
options?: {
fileData: Buffer,
options?: {
filename?: string;
connectionId?: string;
metadata?: Record<string, any>;
}
},
): Promise<FileUploadResult> => {
return measurePerformance(async () => {
try {
const connection = getConnection(options?.connectionId);
const ipfs = getHelia();
if (!ipfs) {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
// Add to IPFS
const blockstore = ipfs.blockstore;
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const cid = await fs.addBytes(fileData);
const cidStr = cid.toString();
// Store metadata
const filesDb = await openStore('_files', StoreType.KEYVALUE);
await filesDb.put(cidStr, {
filename: options?.filename,
size: fileData.length,
uploadedAt: Date.now(),
...options?.metadata
});
logger.info(`Uploaded file with CID: ${cidStr}`);
return { cid: cidStr };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error('Error uploading file:', error);
throw new DBError(ErrorCode.OPERATION_FAILED, 'Failed to upload file', error);
try {
const ipfs = getHelia();
if (!ipfs) {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
});
// Add to IPFS
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const cid = await fs.addBytes(fileData);
const cidStr = cid.toString();
// Store metadata
const filesDb = await openStore('_files', StoreType.KEYVALUE);
await filesDb.put(cidStr, {
filename: options?.filename,
size: fileData.length,
uploadedAt: Date.now(),
...options?.metadata,
});
logger.info(`Uploaded file with CID: ${cidStr}`);
return { cid: cidStr };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error('Error uploading file:', error);
throw new DBError(ErrorCode.OPERATION_FAILED, 'Failed to upload file', error);
}
};
/**
* Get a file from IPFS by CID
*/
export const getFile = async (
cid: string,
options?: { connectionId?: string }
): Promise<FileResult> => {
return measurePerformance(async () => {
try {
const connection = getConnection(options?.connectionId);
const ipfs = getHelia();
if (!ipfs) {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
// Get from IPFS
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const { CID } = await import('multiformats/cid');
const resolvedCid = CID.parse(cid);
try {
// Convert AsyncIterable to Buffer
const bytes = await readAsyncIterableToBuffer(fs.cat(resolvedCid));
// Get metadata if available
let metadata = null;
try {
const filesDb = await openStore('_files', StoreType.KEYVALUE);
metadata = await filesDb.get(cid);
} catch (err) {
// Metadata might not exist, continue without it
}
return { data: bytes, metadata };
} catch (error) {
throw new DBError(ErrorCode.FILE_NOT_FOUND, `File with CID ${cid} not found`, error);
}
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting file with CID ${cid}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get file with CID ${cid}`, error);
export const getFile = async (cid: string): Promise<FileResult> => {
try {
const ipfs = getHelia();
if (!ipfs) {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
});
// Get from IPFS
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const { CID } = await import('multiformats/cid');
const resolvedCid = CID.parse(cid);
try {
// Convert AsyncIterable to Buffer
const bytes = await readAsyncIterableToBuffer(fs.cat(resolvedCid));
// Get metadata if available
let metadata = null;
try {
const filesDb = await openStore('_files', StoreType.KEYVALUE);
metadata = await filesDb.get(cid);
} catch (_err) {
// Metadata might not exist, continue without it
}
return { data: bytes, metadata };
} catch (error) {
throw new DBError(ErrorCode.FILE_NOT_FOUND, `File with CID ${cid} not found`, error);
}
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting file with CID ${cid}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get file with CID ${cid}`, error);
}
};
/**
* Delete a file from IPFS by CID
*/
export const deleteFile = async (
cid: string,
options?: { connectionId?: string }
): Promise<boolean> => {
return measurePerformance(async () => {
export const deleteFile = async (cid: string): Promise<boolean> => {
try {
// Delete metadata
try {
const connection = getConnection(options?.connectionId);
// Delete metadata
try {
const filesDb = await openStore('_files', StoreType.KEYVALUE);
await filesDb.del(cid);
} catch (err) {
// Ignore if metadata doesn't exist
}
// In IPFS we can't really delete files, but we can remove them from our local blockstore
// and they will eventually be garbage collected if no one else has pinned them
logger.info(`Deleted file with CID: ${cid}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting file with CID ${cid}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete file with CID ${cid}`, error);
const filesDb = await openStore('_files', StoreType.KEYVALUE);
await filesDb.del(cid);
} catch (_err) {
// Ignore if metadata doesn't exist
}
});
};
logger.info(`Deleted file with CID: ${cid}`);
return true;
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting file with CID ${cid}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete file with CID ${cid}`, error);
}
};
export const create = async <T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions,
): Promise<CreateResult> => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
// Prepare document for storage with ID
// const document = {
// id,
// ...prepareDocument<T>(collection, data)
// };
const document = { id, ...data };
// Add to database
const hash = await db.add(document);
// Emit change event
// events.emit('document:created', { collection, id, document, hash });
logger.info(`Created entry in file ${collection} with id ${id} and hash ${hash}`);
return { id, hash };
} catch (error: unknown) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating entry in file ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to create entry in file ${collection}`,
error,
);
}
};

View File

@ -1,335 +1,136 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('KEYVALUE_STORE');
import { StoreType, StoreOptions, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { AbstractStore } from './abstractStore';
import { DBError, ErrorCode } from '../core/error';
/**
* KeyValue Store implementation
* KeyValue Store implementation using the AbstractStore base class
*/
export class KeyValueStore implements BaseStore {
/**
* Create a new document in the specified collection
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
// Prepare document for storage
const document = prepareDocument<T>(collection, data);
// Add to database
const hash = await db.put(id, document);
// Add to cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Created document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create document in ${collection}`, error);
}
});
export class KeyValueStore extends AbstractStore {
constructor() {
super(StoreType.KEYVALUE);
}
/**
* Get a document by ID from a collection
*/
async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.KEYVALUE, options);
const document = await db.get(id) as T | null;
// Update cache if document exists
if (document) {
cache.set(cacheKey, document);
}
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting document ${id} from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get document ${id} from ${collection}`, error);
}
});
protected getLoggerName(): string {
return 'KEYVALUE_STORE';
}
/**
* Update a document in a collection
* Implementation for the KeyValue store create operation
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
const existing = await db.get(id) as T | null;
if (!existing && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Document ${id} not found in ${collection}`,
{ collection, id }
);
}
// Prepare document for update
const document = prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existing || undefined);
// Update in database
const hash = await db.put(id, document);
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existing });
logger.info(`Updated document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update document in ${collection}`, error);
}
});
protected async performCreate(db: any, id: string, document: any): Promise<string> {
return await db.put(id, document);
}
/**
* Delete a document from a collection
* Implementation for the KeyValue store get operation
*/
async remove(
collection: string,
id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
// Get the document before deleting for the event
const document = await db.get(id);
// Delete from database
await db.del(id);
// Remove from cache
const cacheKey = `${collection}:${id}`;
cache.del(cacheKey);
// Emit change event
events.emit('document:deleted', { collection, id, document });
logger.info(`Deleted document in ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete document in ${collection}`, error);
}
});
protected async performGet<T>(db: any, id: string): Promise<T | null> {
return (await db.get(id)) as T | null;
}
/**
* Implementation for the KeyValue store update operation
*/
protected async performUpdate(db: any, id: string, document: any): Promise<string> {
return await db.put(id, document);
}
/**
* Implementation for the KeyValue store remove operation
*/
protected async performRemove(db: any, id: string): Promise<void> {
await db.del(id);
}
/**
* List all documents in a collection with pagination
*/
async list<T extends Record<string, any>>(
collection: string,
options?: ListOptions
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
const all = await db.all();
let documents = Object.entries(all).map(([key, value]) => ({
id: key,
...(value as any)
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list documents in ${collection}`, error);
}
});
try {
const db = await this.openStore(collection, options);
const all = await db.all();
// Convert the key-value pairs to an array of documents with IDs
let documents = Object.entries(all).map(([key, value]) => ({
id: key,
...(value as any),
})) as T[];
// Apply sorting
documents = this.applySorting(documents, options);
// Apply pagination
return this.applyPagination(documents, options);
} catch (error) {
this.handleError(`Error listing documents in ${collection}`, error);
}
}
/**
* Query documents in a collection with filtering and pagination
*/
async query<T extends Record<string, any>>(
collection: string,
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions
options?: QueryOptions,
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
const all = await db.all();
// Apply filter
let filtered = Object.entries(all)
.filter(([_, value]) => filter(value as T))
.map(([key, value]) => ({
id: key,
...(value as any)
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
filtered.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = filtered.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = filtered.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query documents in ${collection}`, error);
}
});
try {
const db = await this.openStore(collection, options);
const all = await db.all();
// Apply filter
let filtered = Object.entries(all)
.filter(([_, value]) => filter(value as T))
.map(([key, value]) => ({
id: key,
...(value as any),
})) as T[];
// Apply sorting
filtered = this.applySorting(filtered, options);
// Apply pagination
return this.applyPagination(filtered, options);
} catch (error) {
this.handleError(`Error querying documents in ${collection}`, error);
}
}
/**
* Create an index for a collection to speed up queries
*/
async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
async createIndex(collection: string, field: string): Promise<boolean> {
try {
// In a real implementation, this would register the index with OrbitDB
// or create a specialized data structure. For now, we'll just log the request.
logger.info(`Index created on ${field} for collection ${collection}`);
// KeyValueStore doesn't support real indexing - this is just a placeholder
this.logger.info(
`Index created on ${field} for collection ${collection} (not supported in KeyValueStore)`,
);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating index for ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create index for ${collection}`, error);
this.handleError(`Error creating index for ${collection}`, error);
}
}
}
/**
* Helper to open a store of the correct type
*/
private async openStore(collection: string, options?: StoreOptions): Promise<any> {
const { openStore } = await import('./baseStore');
return await openStore(collection, this.storeType, options);
}
/**
* Helper to handle errors consistently
*/
private handleError(message: string, error: any): never {
if (error instanceof DBError) {
throw error;
}
this.logger.error(`${message}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `${message}: ${error.message}`, error);
}
}

View File

@ -9,46 +9,40 @@ import { CounterStore } from './counterStore';
const logger = createServiceLogger('STORE_FACTORY');
// Initialize instances for each store type
// Initialize instances for each store type - singleton pattern
const storeInstances = new Map<StoreType, BaseStore>();
// Store type mapping to implementations
const storeImplementations = {
[StoreType.KEYVALUE]: KeyValueStore,
[StoreType.DOCSTORE]: DocStore,
[StoreType.FEED]: FeedStore,
[StoreType.EVENTLOG]: FeedStore, // Alias for feed
[StoreType.COUNTER]: CounterStore,
};
/**
* Get a store instance by type
* Get a store instance by type (factory and singleton pattern)
*/
export function getStore(type: StoreType): BaseStore {
// Check if we already have an instance
// Return cached instance if available (singleton pattern)
if (storeInstances.has(type)) {
return storeInstances.get(type)!;
}
// Create a new instance based on type
let store: BaseStore;
switch (type) {
case StoreType.KEYVALUE:
store = new KeyValueStore();
break;
case StoreType.DOCSTORE:
store = new DocStore();
break;
case StoreType.FEED:
case StoreType.EVENTLOG: // Alias for feed
store = new FeedStore();
break;
case StoreType.COUNTER:
store = new CounterStore();
break;
default:
logger.error(`Unsupported store type: ${type}`);
throw new DBError(ErrorCode.STORE_TYPE_ERROR, `Unsupported store type: ${type}`);
// Get the store implementation class
const StoreClass = storeImplementations[type];
if (!StoreClass) {
logger.error(`Unsupported store type: ${type}`);
throw new DBError(ErrorCode.STORE_TYPE_ERROR, `Unsupported store type: ${type}`);
}
// Cache the instance
// Create a new instance of the store
const store = new StoreClass();
// Cache the instance for future use
storeInstances.set(type, store);
return store;
}
}

View File

@ -1,9 +1,3 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode } from '../types';
import { DBError } from '../core/error';
const logger = createServiceLogger('DB_TRANSACTION');
// Transaction operation type
interface TransactionOperation {
type: 'create' | 'update' | 'delete';
@ -18,11 +12,11 @@ interface TransactionOperation {
export class Transaction {
private operations: TransactionOperation[] = [];
private connectionId?: string;
constructor(connectionId?: string) {
this.connectionId = connectionId;
}
/**
* Add a create operation to the transaction
*/
@ -31,11 +25,11 @@ export class Transaction {
type: 'create',
collection,
id,
data
data,
});
return this;
}
/**
* Add an update operation to the transaction
*/
@ -44,11 +38,11 @@ export class Transaction {
type: 'update',
collection,
id,
data
data,
});
return this;
}
/**
* Add a delete operation to the transaction
*/
@ -56,22 +50,22 @@ export class Transaction {
this.operations.push({
type: 'delete',
collection,
id
id,
});
return this;
}
/**
* Get all operations in this transaction
*/
getOperations(): TransactionOperation[] {
return [...this.operations];
}
/**
* Get connection ID for this transaction
*/
getConnectionId(): string | undefined {
return this.connectionId;
}
}
}

View File

@ -4,13 +4,32 @@ import { Transaction } from '../transactions/transactionService';
export type { Transaction };
// Resource locking for concurrent operations
const locks = new Map<string, boolean>();
export const acquireLock = (resourceId: string): boolean => {
if (locks.has(resourceId)) {
return false;
}
locks.set(resourceId, true);
return true;
};
export const releaseLock = (resourceId: string): void => {
locks.delete(resourceId);
};
export const isLocked = (resourceId: string): boolean => {
return locks.has(resourceId);
};
// Database Types
export enum StoreType {
KEYVALUE = 'keyvalue',
DOCSTORE = 'docstore',
FEED = 'feed',
EVENTLOG = 'eventlog',
COUNTER = 'counter'
COUNTER = 'counter',
}
// Common result types
@ -129,4 +148,4 @@ export interface StoreOptions {
}
// Event bus for database events
export const dbEvents = new EventEmitter();
export const dbEvents = new EventEmitter();

View File

@ -19,8 +19,6 @@ import {
getFile,
deleteFile,
defineSchema,
getMetrics,
resetMetrics,
closeConnection,
stop as stopDB,
} from './db/dbService';
@ -81,8 +79,6 @@ export {
getFile,
deleteFile,
defineSchema,
getMetrics,
resetMetrics,
closeConnection,
stopDB,
ErrorCode,
@ -135,8 +131,6 @@ export default {
getFile,
deleteFile,
defineSchema,
getMetrics,
resetMetrics,
closeConnection,
stop: stopDB,
ErrorCode,

View File

@ -41,4 +41,4 @@ export const validateConfig = (): ValidationResult => {
valid: errors.length === 0,
errors,
};
};
};

View File

@ -8,7 +8,12 @@ import {
getProxyAgentInstance,
} from './services/ipfsCoreService';
import { getConnectedPeers, getOptimalPeer, updateNodeLoad, logPeersStatus } from './services/discoveryService';
import {
getConnectedPeers,
getOptimalPeer,
updateNodeLoad,
logPeersStatus,
} from './services/discoveryService';
import { createServiceLogger } from '../utils/logger';
// Create logger for IPFS service

View File

@ -1,102 +1,80 @@
// Load balancer service - Implements load balancing strategies for distributing connections
import * as ipfsService from './ipfsService';
import { config } from '../config';
import { createServiceLogger } from '../utils/logger';
const logger = createServiceLogger('LOAD_BALANCER');
// Track last peer chosen for round-robin strategy
let lastPeerIndex = -1;
interface PeerInfo {
// Type definitions
export interface PeerInfo {
peerId: string;
load: number;
publicAddress: string;
}
interface PeerStatus extends PeerInfo {
export interface PeerStatus extends PeerInfo {
lastSeen: number;
}
interface NodeStatus {
export interface NodeStatus {
fingerprint: string;
peerCount: number;
isHealthy: boolean;
}
interface LoadBalancerServiceModule {
getOptimalPeer: () => PeerInfo | null;
getAllPeers: () => PeerStatus[];
getNodeStatus: () => NodeStatus;
}
type LoadBalancerStrategy = 'leastLoaded' | 'roundRobin' | 'random';
/**
* Strategies for peer selection
*/
const strategies = {
leastLoaded: (peers: PeerStatus[]): PeerStatus => {
return peers.reduce((min, current) => (current.load < min.load ? current : min), peers[0]);
},
roundRobin: (peers: PeerStatus[]): PeerStatus => {
lastPeerIndex = (lastPeerIndex + 1) % peers.length;
return peers[lastPeerIndex];
},
random: (peers: PeerStatus[]): PeerStatus => {
const randomIndex = Math.floor(Math.random() * peers.length);
return peers[randomIndex];
},
};
/**
* Get the optimal peer based on the configured load balancing strategy
* @returns Object containing the selected peer information or null if no peers available
*/
export const getOptimalPeer = (): { peerId: string; load: number; publicAddress: string } | null => {
// Get all available peers
export const getOptimalPeer = (): PeerInfo | null => {
const connectedPeers = ipfsService.getConnectedPeers();
// If no peers are available, return null
if (connectedPeers.size === 0) {
console.log('[LOAD-BALANCER] No peers available for load balancing');
logger.info('No peers available for load balancing');
return null;
}
// Convert Map to Array for easier manipulation
const peersArray = Array.from(connectedPeers.entries()).map(([peerId, data]) => {
return {
peerId,
load: data.load,
lastSeen: data.lastSeen,
publicAddress: data.publicAddress,
};
});
const peersArray = Array.from(connectedPeers.entries()).map(([peerId, data]) => ({
peerId,
load: data.load,
lastSeen: data.lastSeen,
publicAddress: data.publicAddress,
}));
// Apply the load balancing strategy
const strategy = config.loadBalancer.strategy;
// Apply the selected load balancing strategy
const strategy = config.loadBalancer.strategy as LoadBalancerStrategy;
let selectedPeer;
switch (strategy) {
case 'least-loaded':
// Find the peer with the lowest load
selectedPeer = peersArray.reduce((min, current) => (current.load < min.load ? current : min), peersArray[0]);
console.log(
`[LOAD-BALANCER] Selected least loaded peer: ${selectedPeer.peerId.substring(0, 15)}... with load ${
selectedPeer.load
}%`
);
break;
// Select strategy function or default to least loaded
const strategyFn = strategies[strategy] || strategies.leastLoaded;
selectedPeer = strategyFn(peersArray);
case 'round-robin':
// Simple round-robin strategy
lastPeerIndex = (lastPeerIndex + 1) % peersArray.length;
selectedPeer = peersArray[lastPeerIndex];
console.log(
`[LOAD-BALANCER] Selected round-robin peer: ${selectedPeer.peerId.substring(0, 15)}... with load ${
selectedPeer.load
}%`
);
break;
case 'random':
// Random selection
const randomIndex = Math.floor(Math.random() * peersArray.length);
selectedPeer = peersArray[randomIndex];
console.log(
`[LOAD-BALANCER] Selected random peer: ${selectedPeer.peerId.substring(0, 15)}... with load ${
selectedPeer.load
}%`
);
break;
default:
// Default to least-loaded if unknown strategy
selectedPeer = peersArray.reduce((min, current) => (current.load < min.load ? current : min), peersArray[0]);
console.log(
`[LOAD-BALANCER] Selected least loaded peer: ${selectedPeer.peerId.substring(0, 15)}... with load ${
selectedPeer.load
}%`
);
}
logger.info(
`Selected peer (${strategy}): ${selectedPeer.peerId.substring(0, 15)}... with load ${selectedPeer.load}%`,
);
return {
peerId: selectedPeer.peerId,
@ -107,27 +85,22 @@ export const getOptimalPeer = (): { peerId: string; load: number; publicAddress:
/**
* Get all available peers with their load information
* @returns Array of peer information objects
*/
export const getAllPeers = () => {
export const getAllPeers = (): PeerStatus[] => {
const connectedPeers = ipfsService.getConnectedPeers();
const result: any = [];
connectedPeers.forEach((data, peerId) => {
result.push({
peerId,
load: data.load,
lastSeen: data.lastSeen,
});
});
return result;
return Array.from(connectedPeers.entries()).map(([peerId, data]) => ({
peerId,
load: data.load,
lastSeen: data.lastSeen,
publicAddress: data.publicAddress,
}));
};
/**
* Get information about the current node's load
*/
export const getNodeStatus = () => {
export const getNodeStatus = (): NodeStatus => {
const connectedPeers = ipfsService.getConnectedPeers();
return {
fingerprint: config.env.fingerprint,
@ -136,8 +109,4 @@ export const getNodeStatus = () => {
};
};
export default {
getOptimalPeer,
getAllPeers,
getNodeStatus,
} as LoadBalancerServiceModule;
export default { getOptimalPeer, getAllPeers, getNodeStatus };

View File

@ -10,8 +10,10 @@ const heartbeatLogger = createServiceLogger('HEARTBEAT');
// Node metadata
const fingerprint = config.env.fingerprint;
const connectedPeers: Map<string, { lastSeen: number; load: number; publicAddress: string; fingerprint: string }> =
new Map();
const connectedPeers: Map<
string,
{ lastSeen: number; load: number; publicAddress: string; fingerprint: string }
> = new Map();
const SERVICE_DISCOVERY_TOPIC = ipfsConfig.serviceDiscovery.topic;
const HEARTBEAT_INTERVAL = ipfsConfig.serviceDiscovery.heartbeatInterval;
let heartbeatInterval: NodeJS.Timeout;
@ -37,9 +39,13 @@ export const setupServiceDiscovery = async (pubsub: PubSub) => {
});
if (!existingPeer) {
discoveryLogger.info(`New peer discovered: ${peerId} (fingerprint=${message.fingerprint})`);
discoveryLogger.info(
`New peer discovered: ${peerId} (fingerprint=${message.fingerprint})`,
);
}
heartbeatLogger.info(`Received from ${peerId}: load=${message.load}, addr=${message.publicAddress}`);
heartbeatLogger.info(
`Received from ${peerId}: load=${message.load}, addr=${message.publicAddress}`,
);
}
} catch (err) {
discoveryLogger.error(`Error processing message:`, err);
@ -58,15 +64,22 @@ export const setupServiceDiscovery = async (pubsub: PubSub) => {
publicAddress: ipfsConfig.serviceDiscovery.publicAddress,
};
await pubsub.publish(SERVICE_DISCOVERY_TOPIC, new TextEncoder().encode(JSON.stringify(heartbeatMsg)));
heartbeatLogger.info(`Sent: fingerprint=${fingerprint}, load=${nodeLoad}, addr=${heartbeatMsg.publicAddress}`);
await pubsub.publish(
SERVICE_DISCOVERY_TOPIC,
new TextEncoder().encode(JSON.stringify(heartbeatMsg)),
);
heartbeatLogger.info(
`Sent: fingerprint=${fingerprint}, load=${nodeLoad}, addr=${heartbeatMsg.publicAddress}`,
);
const now = Date.now();
const staleTime = ipfsConfig.serviceDiscovery.staleTimeout;
for (const [peerId, peerData] of connectedPeers.entries()) {
if (now - peerData.lastSeen > staleTime) {
discoveryLogger.info(`Peer ${peerId.substring(0, 15)}... is stale, removing from load balancer`);
discoveryLogger.info(
`Peer ${peerId.substring(0, 15)}... is stale, removing from load balancer`,
);
connectedPeers.delete(peerId);
}
}
@ -102,7 +115,9 @@ export const logPeersStatus = () => {
if (peerCount > 0) {
discoveryLogger.info('Peer status:');
connectedPeers.forEach((data, peerId) => {
discoveryLogger.debug(` - ${peerId} Load: ${data.load}% Last seen: ${new Date(data.lastSeen).toISOString()}`);
discoveryLogger.debug(
` - ${peerId} Load: ${data.load}% Last seen: ${new Date(data.lastSeen).toISOString()}`,
);
});
}
};
@ -144,4 +159,4 @@ export const stopDiscoveryService = async (pubsub: PubSub | null) => {
discoveryLogger.error(`Error unsubscribing from topic:`, err);
}
}
};
};

View File

@ -29,9 +29,18 @@ export const initIpfsNode = async (externalProxyAgent: any = null) => {
proxyAgent = externalProxyAgent;
const blockstorePath = ipfsConfig.blockstorePath;
if (!fs.existsSync(blockstorePath)) {
fs.mkdirSync(blockstorePath, { recursive: true });
logger.info(`Created blockstore directory: ${blockstorePath}`);
try {
if (!fs.existsSync(blockstorePath)) {
fs.mkdirSync(blockstorePath, { recursive: true, mode: 0o755 });
logger.info(`Created blockstore directory: ${blockstorePath}`);
}
// Check write permissions
fs.accessSync(blockstorePath, fs.constants.W_OK);
logger.info(`Verified write permissions for blockstore directory: ${blockstorePath}`);
} catch (permError: any) {
logger.error(`Permission error with blockstore directory: ${blockstorePath}`, permError);
throw new Error(`Cannot access or write to blockstore directory: ${permError.message}`);
}
const blockstore = new FsBlockstore(blockstorePath);
@ -77,7 +86,7 @@ export const initIpfsNode = async (externalProxyAgent: any = null) => {
`Listening on: ${libp2pNode
.getMultiaddrs()
.map((addr: any) => addr.toString())
.join(', ')}`
.join(', ')}`,
);
helia = await createHelia({
@ -118,7 +127,9 @@ function setupPeerEventListeners(node: Libp2p) {
node.peerStore
.get(event.detail)
.then((peerInfo) => {
const multiaddrs = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()) || ['unknown'];
const multiaddrs = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()) || [
'unknown',
];
logger.info(`Peer multiaddrs: ${multiaddrs.join(', ')}`);
})
.catch((error) => {
@ -137,7 +148,9 @@ function setupPeerEventListeners(node: Libp2p) {
node.peerStore
.get(event.detail)
.then((peerInfo) => {
const multiaddrs = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()) || ['unknown'];
const multiaddrs = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()) || [
'unknown',
];
logger.error(`Peer multiaddrs: ${multiaddrs.join(', ')}`);
})
.catch((error) => {
@ -203,7 +216,8 @@ async function attemptPeerConnections(node: Libp2p) {
try {
// Get peer info including addresses
const peerInfo = await node.peerStore.get(peerId);
const addresses = peerInfo?.addresses.map((addr) => addr.multiaddr.toString()).join(', ') || 'unknown';
const addresses =
peerInfo?.addresses.map((addr) => addr.multiaddr.toString()).join(', ') || 'unknown';
logger.info(` - Connected to peer: ${peerId.toString()}`);
logger.info(` Addresses: ${addresses}`);
} catch (_error) {

View File

@ -27,4 +27,4 @@ export const getPrivateKey = async () => {
logger.error('Error generating private key:', error);
throw error;
}
};
};

View File

@ -14,30 +14,67 @@ let orbitdb: any;
export const getOrbitDBDir = (): string => {
const baseDir = config.orbitdb.directory;
const fingerprint = config.env.fingerprint;
return `${baseDir}-${fingerprint}`;
// Use path.join for proper cross-platform path handling
return path.join(baseDir, `debros-${fingerprint}`);
};
const ORBITDB_DIR = getOrbitDBDir();
const ADDRESS_DIR = path.join(ORBITDB_DIR, 'addresses');
export const getDBAddress = (name: string): string | null => {
const addressFile = path.join(ORBITDB_DIR, `${name}.address`);
if (fs.existsSync(addressFile)) {
return fs.readFileSync(addressFile, 'utf-8').trim();
try {
const addressFile = path.join(ADDRESS_DIR, `${name}.address`);
if (fs.existsSync(addressFile)) {
return fs.readFileSync(addressFile, 'utf-8').trim();
}
} catch (error) {
logger.error(`Error reading DB address for ${name}:`, error);
}
return null;
};
export const saveDBAddress = (name: string, address: string) => {
const addressFile = path.join(ORBITDB_DIR, `${name}.address`);
fs.writeFileSync(addressFile, address);
export const saveDBAddress = (name: string, address: string): boolean => {
try {
// Ensure the address directory exists
if (!fs.existsSync(ADDRESS_DIR)) {
fs.mkdirSync(ADDRESS_DIR, { recursive: true, mode: 0o755 });
}
const addressFile = path.join(ADDRESS_DIR, `${name}.address`);
fs.writeFileSync(addressFile, address, { mode: 0o644 });
logger.info(`Saved DB address for ${name} at ${addressFile}`);
return true;
} catch (error) {
logger.error(`Failed to save DB address for ${name}:`, error);
return false;
}
};
export const init = async () => {
try {
// Create directory if it doesn't exist
if (!fs.existsSync(ORBITDB_DIR)) {
fs.mkdirSync(ORBITDB_DIR, { recursive: true });
logger.info(`Created OrbitDB directory: ${ORBITDB_DIR}`);
// Create directory with proper permissions if it doesn't exist
try {
if (!fs.existsSync(ORBITDB_DIR)) {
fs.mkdirSync(ORBITDB_DIR, { recursive: true, mode: 0o755 });
logger.info(`Created OrbitDB directory: ${ORBITDB_DIR}`);
}
// Check write permissions
fs.accessSync(ORBITDB_DIR, fs.constants.W_OK);
} catch (permError: any) {
logger.error(`Permission error with OrbitDB directory: ${ORBITDB_DIR}`, permError);
throw new Error(`Cannot access or write to OrbitDB directory: ${permError.message}`);
}
// Create the addresses directory
try {
if (!fs.existsSync(ADDRESS_DIR)) {
fs.mkdirSync(ADDRESS_DIR, { recursive: true, mode: 0o755 });
logger.info(`Created OrbitDB addresses directory: ${ADDRESS_DIR}`);
}
} catch (dirError) {
logger.error(`Error creating addresses directory: ${ADDRESS_DIR}`, dirError);
// Continue anyway, we'll handle failures when saving addresses
}
registerFeed();
@ -56,9 +93,9 @@ export const init = async () => {
logger.info('OrbitDB initialized successfully.');
return orbitdb;
} catch (e) {
} catch (e: any) {
logger.error('Failed to initialize OrbitDB:', e);
throw e;
throw new Error(`OrbitDB initialization failed: ${e.message}`);
}
};
@ -74,7 +111,9 @@ export const openDB = async (name: string, type: string) => {
const dbOptions = {
type,
overwrite: false,
AccessController: IPFSAccessController({ write: ['*'] }),
AccessController: IPFSAccessController({
write: ['*'],
}),
};
if (existingAddress) {

View File

@ -31,7 +31,7 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
// Set default options
const logsDir = options.logsDir || path.join(process.cwd(), 'logs');
const logLevel = options.level || process.env.LOG_LEVEL || 'info';
// Create logs directory if it doesn't exist
if (!fs.existsSync(logsDir) && !options.disableFile) {
fs.mkdirSync(logsDir, { recursive: true });
@ -55,7 +55,8 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
} else {
try {
message = JSON.stringify(message, null, 2);
} catch (e) {
} catch (e: any) {
console.error(e);
message = '[Object]';
}
}
@ -77,7 +78,8 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
} else {
try {
message = JSON.stringify(message);
} catch (e) {
} catch (e: any) {
console.error(e);
message = '[Object]';
}
}
@ -89,30 +91,39 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
// Configure transports
const loggerTransports = [];
// Add console transport if not disabled
if (!options.disableConsole) {
loggerTransports.push(
new transports.Console({
format: format.combine(format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), customConsoleFormat),
})
format: format.combine(
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
customConsoleFormat,
),
}),
);
}
// Add file transports if not disabled
if (!options.disableFile) {
loggerTransports.push(
// Combined log file
new transports.File({
filename: path.join(logsDir, 'app.log'),
format: format.combine(format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), customFileFormat),
format: format.combine(
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
customFileFormat,
),
}),
// Error log file
new transports.File({
filename: path.join(logsDir, 'error.log'),
level: 'error',
format: format.combine(format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), customFileFormat),
})
format: format.combine(
format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
customFileFormat,
),
}),
);
}
@ -127,10 +138,14 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
// Helper function to create a logger for a specific service
const createServiceLogger = (serviceName: string) => {
return {
error: (message: any, ...meta: any[]) => logger.error(message, { service: serviceName, ...meta }),
warn: (message: any, ...meta: any[]) => logger.warn(message, { service: serviceName, ...meta }),
info: (message: any, ...meta: any[]) => logger.info(message, { service: serviceName, ...meta }),
debug: (message: any, ...meta: any[]) => logger.debug(message, { service: serviceName, ...meta }),
error: (message: any, ...meta: any[]) =>
logger.error(message, { service: serviceName, ...meta }),
warn: (message: any, ...meta: any[]) =>
logger.warn(message, { service: serviceName, ...meta }),
info: (message: any, ...meta: any[]) =>
logger.info(message, { service: serviceName, ...meta }),
debug: (message: any, ...meta: any[]) =>
logger.debug(message, { service: serviceName, ...meta }),
};
};
@ -144,4 +159,4 @@ export function createDebrosLogger(options: LoggerOptions = {}) {
const { logger, createServiceLogger } = createDebrosLogger();
export { logger, createServiceLogger };
export default logger;
export default logger;

41
types.d.ts vendored
View File

@ -230,11 +230,41 @@ declare module '@debros/network' {
options?: { connectionId?: string; storeType?: StoreType },
): Promise<boolean>;
// Subscription API
// Event Subscription API
export interface DocumentCreatedEvent {
collection: string;
id: string;
document: any;
}
export interface DocumentUpdatedEvent {
collection: string;
id: string;
document: any;
previous: any;
}
export interface DocumentDeletedEvent {
collection: string;
id: string;
document: any;
}
export type DBEventType = 'document:created' | 'document:updated' | 'document:deleted';
export function subscribe(
event: 'document:created' | 'document:updated' | 'document:deleted',
callback: (data: any) => void,
event: 'document:created',
callback: (data: DocumentCreatedEvent) => void,
): () => void;
export function subscribe(
event: 'document:updated',
callback: (data: DocumentUpdatedEvent) => void,
): () => void;
export function subscribe(
event: 'document:deleted',
callback: (data: DocumentDeletedEvent) => void,
): () => void;
export function subscribe(event: DBEventType, callback: (data: any) => void): () => void;
// File operations
export function uploadFile(
@ -248,9 +278,6 @@ declare module '@debros/network' {
export function closeConnection(connectionId: string): Promise<boolean>;
// Metrics
export function getMetrics(): Metrics;
export function resetMetrics(): void;
// Stop
export function stopDB(): Promise<void>;
@ -304,8 +331,6 @@ declare module '@debros/network' {
getFile: typeof getFile;
deleteFile: typeof deleteFile;
defineSchema: typeof defineSchema;
getMetrics: typeof getMetrics;
resetMetrics: typeof resetMetrics;
closeConnection: typeof closeConnection;
stop: typeof stopDB;
ErrorCode: typeof ErrorCode;