Compare commits

...

10 Commits

Author SHA1 Message Date
12inchpenguin
be3763d988 fixed some type errors
Some checks failed
Publish Alpha Package to npm / publish (push) Has been cancelled
2025-04-08 14:20:13 +03:00
12inchpenguin
dd920d26ce updated version 2025-04-07 15:56:33 +03:00
12inchpenguin
44d19ad628 updates types for orbit and bug fixes, and sanitiza data on document prepare 2025-04-07 15:49:13 +03:00
12inchpenguin
d39b3c7003 added husky 2025-04-07 13:03:45 +03:00
12inchpenguin
e00280aea9 initialized version alpha 0.20.0 2025-04-07 12:58:23 +03:00
12inchpenguin
9e937231c8 updated version 2025-04-06 18:29:42 +03:00
12inchpenguin
2553483ef9 updated version 2025-04-06 18:19:33 +03:00
12inchpenguin
a667f98f25 added post build tool 2025-04-06 18:19:03 +03:00
12inchpenguin
ef4be50b5a updated readme and packagejson 2025-04-01 16:44:29 +03:00
12inchpenguin
a4cbda0fb9 updated github workflow 2025-04-01 16:41:38 +03:00
32 changed files with 4867 additions and 371 deletions

View File

@ -26,11 +26,6 @@ jobs:
- name: Install dependencies
run: pnpm install
- name: Publish to npm as alpha
run: npm publish --tag alpha --access public
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
- name: Publish to npm as latest
run: npm publish --tag latest --access public
env:

4
.husky/pre-commit Executable file
View File

@ -0,0 +1,4 @@
#!/usr/bin/env sh
. "$(dirname -- "$0")/_/husky.sh"
npx lint-staged

10
.lintstagedrc Normal file
View File

@ -0,0 +1,10 @@
{
"*.{js,ts}": [
"prettier --write",
"eslint --fix",
"npm run build"
],
"*.{json,md}": [
"prettier --write"
]
}

8
.prettierrc Normal file
View File

@ -0,0 +1,8 @@
{
"semi": true,
"singleQuote": true,
"trailingComma": "all",
"printWidth": 100,
"tabWidth": 2,
"endOfLine": "auto"
}

347
README.md
View File

@ -1,14 +1,21 @@
# @debros/netowrk
# @debros/network
Core networking functionality for the Debros decentralized network. This package provides essential IPFS, libp2p, and OrbitDB functionality to build decentralized applications on the Debros network.
Core networking functionality for the Debros decentralized network. This package provides a powerful database interface with advanced features built on IPFS and OrbitDB for decentralized applications.
## Features
- Pre-configured IPFS/libp2p node setup
- Service discovery for peer-to-peer communication
- OrbitDB integration for distributed databases
- Consistent logging across network components
- Secure key generation
- Rich database-like API with TypeScript support
- Multiple database store types (KeyValue, Document, Feed, Counter)
- Document operations with schema validation
- Advanced querying with pagination, sorting and filtering
- Transaction support for batch operations
- Built-in file storage with metadata
- Real-time subscriptions for data changes
- Memory caching for performance
- Connection pooling for managing multiple database instances
- Index creation for faster queries
- Comprehensive error handling with error codes
- Performance metrics and monitoring
## Installation
@ -19,80 +26,296 @@ npm install @debros/network
## Basic Usage
```typescript
import { initConfig, initIpfs, initOrbitDB, logger } from "@debros/network";
import { initDB, create, get, query, uploadFile, logger } from "@debros/network";
// Initialize with custom configuration (optional)
const config = initConfig({
env: {
fingerprint: "my-unique-node-id",
port: 8080,
},
ipfs: {
bootstrapNodes: "node1,node2,node3",
},
});
// Start the network node
async function startNode() {
// Initialize the database service
async function startApp() {
try {
// Initialize IPFS
const ipfs = await initIpfs();
// Initialize OrbitDB with the IPFS instance
const orbitdb = await initOrbitDB({ getHelia: () => ipfs });
// Create/open a database
const db = await orbitDB("myDatabase", "feed");
logger.info("Node started successfully");
return { ipfs, orbitdb, db };
// Initialize with default configuration
await initDB();
logger.info("Database initialized successfully");
// Create a new user document
const userId = 'user123';
const user = {
username: 'johndoe',
walletAddress: '0x1234567890',
avatar: null
};
const result = await create('users', userId, user);
logger.info(`Created user with ID: ${result.id}`);
// Get a user by ID
const retrievedUser = await get('users', userId);
logger.info('User:', retrievedUser);
// Query users with filtering
const activeUsers = await query('users',
user => user.isActive === true,
{ limit: 10, sort: { field: 'createdAt', order: 'desc' } }
);
logger.info(`Found ${activeUsers.total} active users`);
// Upload a file
const fileData = Buffer.from('File content');
const fileUpload = await uploadFile(fileData, { filename: 'document.txt' });
logger.info(`Uploaded file with CID: ${fileUpload.cid}`);
return true;
} catch (error) {
logger.error("Failed to start node:", error);
logger.error("Failed to start app:", error);
throw error;
}
}
startNode();
startApp();
```
## Configuration
## Database Store Types
The package provides sensible defaults but can be customized:
The library supports multiple OrbitDB store types, each optimized for different use cases:
```typescript
import { initConfig } from "@debros/network";
import { create, get, update, StoreType } from "@debros/network";
const customConfig = initConfig({
env: {
fingerprint: "unique-fingerprint",
port: 9000,
},
ipfs: {
blockstorePath: "./custom-blockstore",
serviceDiscovery: {
topic: "my-custom-topic",
heartbeatInterval: 10000,
// Default KeyValue store (for general use)
await create('users', 'user1', { name: 'Alice' });
// Document store (better for complex documents with indexing)
await create('posts', 'post1', { title: 'Hello', content: '...' },
{ storeType: StoreType.DOCSTORE }
);
// Feed/EventLog store (append-only, good for immutable logs)
await create('events', 'evt1', { type: 'login', user: 'alice' },
{ storeType: StoreType.FEED }
);
// Counter store (for numeric counters)
await create('stats', 'visits', { value: 0 },
{ storeType: StoreType.COUNTER }
);
// Increment a counter
await update('stats', 'visits', { increment: 1 },
{ storeType: StoreType.COUNTER }
);
// Get counter value
const stats = await get('stats', 'visits', { storeType: StoreType.COUNTER });
console.log(`Visit count: ${stats.value}`);
```
## Advanced Features
### Schema Validation
```typescript
import { defineSchema, create } from "@debros/network";
// Define a schema
defineSchema('users', {
properties: {
username: {
type: 'string',
required: true,
min: 3,
max: 20
},
email: {
type: 'string',
pattern: '^[\w-\.]+@([\w-]+\.)+[\w-]{2,4}$'
},
age: {
type: 'number',
min: 18
}
},
orbitdb: {
directory: "./custom-orbitdb",
},
required: ['username']
});
// Document creation will be validated against the schema
await create('users', 'user1', {
username: 'alice',
email: 'alice@example.com',
age: 25
});
```
## Documentation
### Transactions
### Core Modules
```typescript
import { createTransaction, commitTransaction } from "@debros/network";
- **IPFS Service**: Setup and manage IPFS nodes
- **OrbitDB Service**: Distributed database management
- **Config**: Network configuration
- **Logger**: Consistent logging
// Create a transaction
const transaction = createTransaction();
### Main Exports
// Add multiple operations
transaction
.create('posts', 'post1', { title: 'Hello World', content: '...' })
.update('users', 'user1', { postCount: 1 })
.delete('drafts', 'draft1');
- `initConfig`: Configure the network node
- `ipfsService`: IPFS node management
- `orbitDBService`: OrbitDB operations
- `logger`: Logging utilities
// Commit all operations
const result = await commitTransaction(transaction);
console.log(`Transaction completed with ${result.results.length} operations`);
```
### Subscriptions
```typescript
import { subscribe } from "@debros/network";
// Subscribe to document changes
const unsubscribe = subscribe('document:created', (data) => {
console.log(`New document created in ${data.collection}:`, data.id);
});
// Later, unsubscribe
unsubscribe();
```
### Pagination and Sorting
```typescript
import { list, query } from "@debros/network";
// List with pagination and sorting
const page1 = await list('users', {
limit: 10,
offset: 0,
sort: { field: 'createdAt', order: 'desc' }
});
// Query with pagination
const results = await query('users',
(user) => user.age > 21,
{ limit: 10, offset: 20 }
);
console.log(`Found ${results.total} matches, showing ${results.documents.length}`);
console.log(`Has more pages: ${results.hasMore}`);
```
### TypeScript Support
```typescript
import { get, update, query } from "@debros/network";
interface User {
username: string;
email: string;
age: number;
createdAt: number;
updatedAt: number;
}
// Type-safe operations
const user = await get<User>('users', 'user1');
await update<User>('users', 'user1', { age: 26 });
const results = await query<User>('users',
(user) => user.age > 21
);
```
### Connection Management
```typescript
import { initDB, closeConnection } from "@debros/network";
// Create multiple connections
const conn1 = await initDB('connection1');
const conn2 = await initDB('connection2');
// Use specific connection
await create('users', 'user1', { name: 'Alice' }, { connectionId: conn1 });
// Close a specific connection
await closeConnection(conn1);
```
### Performance Metrics
```typescript
import { getMetrics, resetMetrics } from "@debros/network";
// Get performance metrics
const metrics = getMetrics();
console.log('Operations:', metrics.operations);
console.log('Avg operation time:', metrics.performance.averageOperationTime, 'ms');
console.log('Cache hits/misses:', metrics.cacheStats);
// Reset metrics (e.g., after deployment)
resetMetrics();
```
## API Reference
### Core Database Operations
- `initDB(connectionId?: string): Promise<string>` - Initialize the database
- `create<T>(collection, id, data, options?): Promise<CreateResult>` - Create a document
- `get<T>(collection, id, options?): Promise<T | null>` - Get a document by ID
- `update<T>(collection, id, data, options?): Promise<UpdateResult>` - Update a document
- `remove(collection, id, options?): Promise<boolean>` - Delete a document
- `list<T>(collection, options?): Promise<PaginatedResult<T>>` - List documents with pagination
- `query<T>(collection, filter, options?): Promise<PaginatedResult<T>>` - Query documents
- `stopDB(): Promise<void>` - Stop the database service
### Store Types
- `StoreType.KEYVALUE` - Key-value pair storage (default)
- `StoreType.DOCSTORE` - Document storage with indexing
- `StoreType.FEED` - Append-only log
- `StoreType.EVENTLOG` - Alias for FEED
- `StoreType.COUNTER` - Numeric counter
### Schema Validation
- `defineSchema(collection, schema): void` - Define a schema for a collection
### Transactions
- `createTransaction(connectionId?): Transaction` - Create a new transaction
- `commitTransaction(transaction): Promise<{success, results}>` - Execute the transaction
- `Transaction.create<T>(collection, id, data): Transaction` - Add a create operation
- `Transaction.update<T>(collection, id, data): Transaction` - Add an update operation
- `Transaction.delete(collection, id): Transaction` - Add a delete operation
### Subscriptions
- `subscribe(event, callback): () => void` - Subscribe to events, returns unsubscribe function
### File Operations
- `uploadFile(fileData, options?): Promise<FileUploadResult>` - Upload a file
- `getFile(cid, options?): Promise<FileResult>` - Get a file by CID
- `deleteFile(cid, options?): Promise<boolean>` - Delete a file
### Connection Management
- `closeConnection(connectionId): Promise<boolean>` - Close a specific connection
### Indexes and Performance
- `createIndex(collection, field, options?): Promise<boolean>` - Create an index
- `getMetrics(): Metrics` - Get performance metrics
- `resetMetrics(): void` - Reset performance metrics
## Configuration
```typescript
import { config, initDB } from "@debros/network";
// Configure (optional)
config.env.fingerprint = "my-unique-app-id";
config.env.port = 9000;
config.ipfs.blockstorePath = "./custom-path/blockstore";
config.orbitdb.directory = "./custom-path/orbitdb";
// Initialize with configuration
await initDB();
```

View File

@ -0,0 +1,73 @@
import { initDB, create, get, update, remove, list, query, uploadFile, getFile, deleteFile, stopDB, logger } from '../src';
// Alternative import method
// import debros from '../src';
// const { db } = debros;
async function databaseExample() {
try {
logger.info('Starting database example...');
// Initialize the database service (abstracts away IPFS and OrbitDB)
await initDB();
logger.info('Database service initialized');
// Create a new user document
const userId = 'user123';
const userData = {
username: 'johndoe',
walletAddress: '0x1234567890',
avatar: null
};
const createResult = await create('users', userId, userData);
logger.info(`Created user with ID: ${createResult.id} and hash: ${createResult.hash}`);
// Retrieve the user
const user = await get('users', userId);
logger.info('Retrieved user:', user);
// Update the user
const updateResult = await update('users', userId, {
avatar: 'profile.jpg',
bio: 'Software developer'
});
logger.info(`Updated user with hash: ${updateResult.hash}`);
// Query users
const filteredUsers = await query('users', (user) => user.username === 'johndoe');
logger.info(`Found ${filteredUsers.length} matching users`);
// List all users
const allUsers = await list('users', { limit: 10 });
logger.info(`Retrieved ${allUsers.length} users`);
// Upload a file
const fileData = Buffer.from('This is a test file content');
const fileUpload = await uploadFile(fileData, { filename: 'test.txt' });
logger.info(`Uploaded file with CID: ${fileUpload.cid}`);
// Retrieve the file
const file = await getFile(fileUpload.cid);
logger.info('Retrieved file:', {
content: file.data.toString(),
metadata: file.metadata
});
// Delete the file
await deleteFile(fileUpload.cid);
logger.info('File deleted');
// Delete the user
await remove('users', userId);
logger.info('User deleted');
// Stop the database service
await stopDB();
logger.info('Database service stopped');
} catch (error) {
logger.error('Error in database example:', error);
}
}
databaseExample();

View File

@ -0,0 +1,266 @@
import {
initDB,
create,
get,
update,
remove,
list,
query,
uploadFile,
getFile,
createTransaction,
commitTransaction,
subscribe,
defineSchema,
getMetrics,
createIndex,
ErrorCode,
StoreType,
logger
} from '../src';
// Define a user schema
const userSchema = {
properties: {
username: {
type: 'string',
required: true,
min: 3,
max: 20,
},
email: {
type: 'string',
pattern: '^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$',
},
age: {
type: 'number',
min: 18,
max: 120,
},
roles: {
type: 'array',
items: {
type: 'string',
},
},
isActive: {
type: 'boolean',
},
},
required: ['username'],
};
async function advancedDatabaseExample() {
try {
logger.info('Starting advanced database example...');
// Initialize the database service with a specific connection ID
const connectionId = await initDB('example-connection');
logger.info(`Database service initialized with connection ID: ${connectionId}`);
// Define schema for validation
defineSchema('users', userSchema);
logger.info('User schema defined');
// Create index for faster queries (works with docstore)
await createIndex('users', 'username', { storeType: StoreType.DOCSTORE });
logger.info('Created index on username field');
// Set up subscription for real-time updates
const unsubscribe = subscribe('document:created', (data) => {
logger.info('Document created:', data.collection, data.id);
});
// Create multiple users using a transaction
const transaction = createTransaction(connectionId);
transaction
.create('users', 'user1', {
username: 'alice',
email: 'alice@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
})
.create('users', 'user2', {
username: 'bob',
email: 'bob@example.com',
age: 34,
roles: ['user'],
isActive: true,
})
.create('users', 'user3', {
username: 'charlie',
email: 'charlie@example.com',
age: 45,
roles: ['moderator', 'user'],
isActive: false,
});
const txResult = await commitTransaction(transaction);
logger.info(`Transaction committed with ${txResult.results.length} operations`);
// Get a specific user with type safety
interface User {
username: string;
email: string;
age: number;
roles: string[];
isActive: boolean;
createdAt: number;
updatedAt: number;
}
// Using KeyValue store (default)
const user = await get<User>('users', 'user1');
logger.info('Retrieved user from KeyValue store:', user?.username, user?.email);
// Using DocStore
await create<User>(
'users_docstore',
'user1',
{
username: 'alice_doc',
email: 'alice_doc@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
},
{ storeType: StoreType.DOCSTORE }
);
const docStoreUser = await get<User>('users_docstore', 'user1', { storeType: StoreType.DOCSTORE });
logger.info('Retrieved user from DocStore:', docStoreUser?.username, docStoreUser?.email);
// Using Feed/EventLog store
await create<User>(
'users_feed',
'user1',
{
username: 'alice_feed',
email: 'alice_feed@example.com',
age: 28,
roles: ['admin', 'user'],
isActive: true,
},
{ storeType: StoreType.FEED }
);
// Update the feed entry (creates a new entry with the same ID)
await update<User>(
'users_feed',
'user1',
{
roles: ['admin', 'user', 'tester'],
},
{ storeType: StoreType.FEED }
);
// List all entries in the feed
const feedUsers = await list<User>('users_feed', { storeType: StoreType.FEED });
logger.info(`Found ${feedUsers.total} feed entries:`);
feedUsers.documents.forEach(user => {
logger.info(`- ${user.username} (${user.email})`);
});
// Using Counter store
await create(
'counters',
'visitors',
{ value: 100 },
{ storeType: StoreType.COUNTER }
);
// Increment the counter
await update(
'counters',
'visitors',
{ increment: 5 },
{ storeType: StoreType.COUNTER }
);
// Get the counter value
const counter = await get('counters', 'visitors', { storeType: StoreType.COUNTER });
logger.info(`Counter value: ${counter?.value}`);
// Update a user in KeyValue store
await update<User>('users', 'user1', {
roles: ['admin', 'user', 'tester'],
});
// Query users from KeyValue store
const result = await query<User>(
'users',
(user) => user.isActive === true,
{
limit: 10,
offset: 0,
sort: { field: 'username', order: 'asc' },
}
);
logger.info(`Found ${result.total} active users:`);
result.documents.forEach(user => {
logger.info(`- ${user.username} (${user.email})`);
});
// List all users from KeyValue store with pagination
const allUsers = await list<User>('users', {
limit: 10,
offset: 0,
sort: { field: 'age', order: 'desc' },
});
logger.info(`Listed ${allUsers.total} users sorted by age (desc):`);
allUsers.documents.forEach(user => {
logger.info(`- ${user.username}: ${user.age} years old`);
});
// Upload a file with metadata
const fileData = Buffer.from('This is a test file with advanced features.');
const fileUpload = await uploadFile(fileData, {
filename: 'advanced-test.txt',
metadata: {
contentType: 'text/plain',
tags: ['example', 'test'],
owner: 'user1',
}
});
logger.info(`Uploaded file with CID: ${fileUpload.cid}`);
// Retrieve the file
const file = await getFile(fileUpload.cid);
logger.info('Retrieved file content:', file.data.toString());
logger.info('File metadata:', file.metadata);
// Get performance metrics
const metrics = getMetrics();
logger.info('Database metrics:', {
operations: metrics.operations,
performance: {
averageOperationTime: `${metrics.performance.averageOperationTime?.toFixed(2)}ms`,
},
cache: metrics.cacheStats,
});
// Unsubscribe from events
unsubscribe();
logger.info('Unsubscribed from events');
// Delete a user
await remove('users', 'user3');
logger.info('Deleted user user3');
return true;
} catch (error) {
if (error && typeof error === 'object' && 'code' in error) {
logger.error(`Database error (${error.code}):`, error.message);
} else {
logger.error('Error in advanced database example:', error);
}
return false;
}
}
advancedDatabaseExample();

View File

@ -1,4 +1,6 @@
import { initIpfs, initOrbitDB, logger, createServiceLogger } from '../src/index';
import { init as initIpfs, stop as stopIpfs } from '../src/ipfs/ipfsService';
import { init as initOrbitDB } from '../src/orbit/orbitDBService';
import { createServiceLogger } from '../src/utils/logger';
const appLogger = createServiceLogger('APP');
@ -11,11 +13,7 @@ async function startNode() {
appLogger.info('IPFS node initialized');
// Initialize OrbitDB
const ipfsService = {
getHelia: () => ipfs,
};
const orbitdb = await initOrbitDB(ipfsService);
const orbitdb = await initOrbitDB();
appLogger.info('OrbitDB initialized');
// Create a test database
@ -40,7 +38,7 @@ async function startNode() {
process.on('SIGINT', async () => {
appLogger.info('Shutting down...');
await orbitdb.stop();
await initIpfs.stop();
await stopIpfs();
process.exit(0);
});
} catch (error) {

View File

@ -1,126 +0,0 @@
// Example of how to integrate the new @debros/node-core package into an application
import express from 'express';
import { initIpfs, initOrbitDB, createServiceLogger, getConnectedPeers } from '@debros/node-core'; // This would be the import path when installed from npm
// Create service-specific loggers
const apiLogger = createServiceLogger('API');
const networkLogger = createServiceLogger('NETWORK');
// Initialize Express app
const app = express();
app.use(express.json());
// Network state
let ipfsNode: any;
let orbitInstance: any;
let messageDB: any;
// Initialize network components
async function initializeNetwork() {
try {
// Initialize IPFS
networkLogger.info('Initializing IPFS node...');
ipfsNode = await initIpfs();
// Initialize OrbitDB
networkLogger.info('Initializing OrbitDB...');
orbitInstance = await initOrbitDB({
getHelia: () => ipfsNode,
});
// Open message database
messageDB = await orbitInstance.open('messages', {
type: 'feed',
});
networkLogger.info('Network components initialized successfully');
// Log connected peers every minute
setInterval(() => {
const peers = getConnectedPeers();
networkLogger.info(`Connected to ${peers.size} peers`);
}, 60000);
return true;
} catch (error) {
networkLogger.error('Failed to initialize network:', error);
return false;
}
}
// API routes
app.get('/api/peers', (req, res) => {
const peers = getConnectedPeers();
const peerList: any[] = [];
peers.forEach((data, peerId) => {
peerList.push({
id: peerId,
load: data.load,
address: data.publicAddress,
lastSeen: data.lastSeen,
});
});
res.json({ peers: peerList });
});
app.get('/api/messages', async (req, res) => {
try {
const messages = messageDB.iterator({ limit: 100 }).collect();
res.json({ messages });
} catch (error) {
apiLogger.error('Error fetching messages:', error);
res.status(500).json({ error: 'Failed to fetch messages' });
}
});
app.post('/api/messages', async (req, res) => {
try {
const { content } = req.body;
if (!content) {
return res.status(400).json({ error: 'Content is required' });
}
const entry = await messageDB.add({
content,
timestamp: Date.now(),
});
res.status(201).json({ id: entry });
} catch (error) {
apiLogger.error('Error creating message:', error);
res.status(500).json({ error: 'Failed to create message' });
}
});
// Start the application
async function startApp() {
const networkInitialized = await initializeNetwork();
if (networkInitialized) {
const port = config.env.port;
app.listen(port, () => {
apiLogger.info(`Server listening on port ${port}`);
});
} else {
apiLogger.error('Cannot start application: Network initialization failed');
process.exit(1);
}
}
// Shutdown handler
process.on('SIGINT', async () => {
networkLogger.info('Application shutting down...');
if (orbitInstance) {
await orbitInstance.stop();
}
if (ipfsNode) {
await initIpfs.stop();
}
process.exit(0);
});
// Start the application
startApp();

2
orbitdb.d.ts vendored
View File

@ -1,7 +1,7 @@
// custom.d.ts
declare module '@orbitdb/core' {
// Import the types from @constl/orbit-db-types
import { OrbitDBTypes } from '@constl/orbit-db-types';
import { OrbitDBTypes } from '@orbitdb/core-types';
// Assuming @orbitdb/core exports an interface or type you want to override
// Replace 'OrbitDB' with the actual export name from @orbitdb/core you want to type

View File

@ -1,6 +1,6 @@
{
"name": "@debros/network",
"version": "0.0.13-alpha",
"version": "0.0.22-alpha",
"description": "Debros network core functionality for IPFS, libp2p and OrbitDB",
"type": "module",
"main": "dist/index.js",
@ -11,12 +11,14 @@
"types.d.ts"
],
"scripts": {
"build": "tsc",
"build": "tsc && tsc-esm-fix --outDir=./dist/esm",
"dev": "tsc -w",
"clean": "rimraf dist",
"prepublishOnly": "npm run clean && npm run build",
"prepare": "husky",
"lint": "npx eslint src"
"lint": "npx eslint src",
"format": "prettier --write \"**/*.{ts,js,json,md}\"",
"lint:fix": "npx eslint src --fix"
},
"keywords": [
"ipfs",
@ -32,6 +34,7 @@
"@chainsafe/libp2p-gossipsub": "^14.1.0",
"@chainsafe/libp2p-noise": "^16.1.0",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@helia/unixfs": "^5.0.0",
"@libp2p/bootstrap": "^11.0.32",
"@libp2p/crypto": "^5.0.15",
"@libp2p/identify": "^3.0.27",
@ -47,6 +50,8 @@
"express": "^5.1.0",
"helia": "^5.3.0",
"libp2p": "^2.8.2",
"multiformats": "^13.3.2",
"node-cache": "^5.1.2",
"node-forge": "^1.3.1",
"winston": "^3.17.0"
},
@ -54,16 +59,24 @@
"typescript": ">=5.0.0"
},
"devDependencies": {
"@constl/orbit-db-types": "^2.0.6",
"@eslint/js": "^9.24.0",
"@orbitdb/core-types": "^1.0.14",
"@types/express": "^5.0.1",
"@types/node": "^22.13.10",
"@types/node-forge": "^1.3.11",
"@typescript-eslint/eslint-plugin": "^8.29.0",
"@typescript-eslint/parser": "^8.29.0",
"eslint": "^9.24.0",
"eslint-config-prettier": "^10.1.1",
"eslint-plugin-prettier": "^5.2.6",
"globals": "^16.0.0",
"husky": "^8.0.3",
"lint-staged": "^15.5.0",
"prettier": "^3.5.3",
"rimraf": "^5.0.5",
"typescript": "^5.8.2"
"tsc-esm-fix": "^3.1.2",
"typescript": "^5.8.2",
"typescript-eslint": "^8.29.0"
},
"compilerOptions": {
"typeRoots": [

1066
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

62
src/db/cache/cacheService.ts vendored Normal file
View File

@ -0,0 +1,62 @@
import NodeCache from 'node-cache';
import { createServiceLogger } from '../../utils/logger';
const logger = createServiceLogger('DB_CACHE');
// Cache for frequently accessed documents
const cache = new NodeCache({
stdTTL: 300, // 5 minutes default TTL
checkperiod: 60, // Check for expired items every 60 seconds
useClones: false, // Don't clone objects (for performance)
});
// Cache statistics
export const cacheStats = {
hits: 0,
misses: 0,
};
/**
* Get an item from cache
*/
export const get = <T>(key: string): T | undefined => {
const value = cache.get<T>(key);
if (value !== undefined) {
cacheStats.hits++;
return value;
}
cacheStats.misses++;
return undefined;
};
/**
* Set an item in cache
*/
export const set = <T>(key: string, value: T, ttl?: number): boolean => {
if (ttl === undefined) {
return cache.set(key, value);
}
return cache.set(key, value, ttl);
};
/**
* Delete an item from cache
*/
export const del = (key: string | string[]): number => {
return cache.del(key);
};
/**
* Flush the entire cache
*/
export const flushAll = (): void => {
cache.flushAll();
};
/**
* Reset cache statistics
*/
export const resetStats = (): void => {
cacheStats.hits = 0;
cacheStats.misses = 0;
};

138
src/db/core/connection.ts Normal file
View File

@ -0,0 +1,138 @@
import { createServiceLogger } from '../../utils/logger';
import { init as initIpfs, stop as stopIpfs } from '../../ipfs/ipfsService';
import { init as initOrbitDB } from '../../orbit/orbitDBService';
import { DBConnection, ErrorCode } from '../types';
import { DBError } from './error';
const logger = createServiceLogger('DB_CONNECTION');
// Connection pool of database instances
const connections = new Map<string, DBConnection>();
let defaultConnectionId: string | null = null;
/**
* Initialize the database service
* This abstracts away OrbitDB and IPFS from the end user
*/
export const init = async (connectionId?: string): Promise<string> => {
try {
const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
logger.info(`Initializing DB service with connection ID: ${connId}`);
// Initialize IPFS
const ipfsInstance = await initIpfs();
// Initialize OrbitDB
const orbitdbInstance = await initOrbitDB();
// Store connection in pool
connections.set(connId, {
ipfs: ipfsInstance,
orbitdb: orbitdbInstance,
timestamp: Date.now(),
isActive: true,
});
// Set as default if no default exists
if (!defaultConnectionId) {
defaultConnectionId = connId;
}
logger.info(`DB service initialized successfully with connection ID: ${connId}`);
return connId;
} catch (error) {
logger.error('Failed to initialize DB service:', error);
throw new DBError(ErrorCode.INITIALIZATION_FAILED, 'Failed to initialize database service', error);
}
};
/**
* Get the active connection
*/
export const getConnection = (connectionId?: string): DBConnection => {
const connId = connectionId || defaultConnectionId;
if (!connId || !connections.has(connId)) {
throw new DBError(
ErrorCode.NOT_INITIALIZED,
`No active database connection found${connectionId ? ` for ID: ${connectionId}` : ''}`
);
}
const connection = connections.get(connId)!;
if (!connection.isActive) {
throw new DBError(
ErrorCode.CONNECTION_ERROR,
`Connection ${connId} is no longer active`
);
}
return connection;
};
/**
* Close a specific database connection
*/
export const closeConnection = async (connectionId: string): Promise<boolean> => {
if (!connections.has(connectionId)) {
return false;
}
try {
const connection = connections.get(connectionId)!;
// Stop OrbitDB
if (connection.orbitdb) {
await connection.orbitdb.stop();
}
// Mark connection as inactive
connection.isActive = false;
// If this was the default connection, clear it
if (defaultConnectionId === connectionId) {
defaultConnectionId = null;
// Try to find another active connection to be the default
for (const [id, conn] of connections.entries()) {
if (conn.isActive) {
defaultConnectionId = id;
break;
}
}
}
logger.info(`Closed database connection: ${connectionId}`);
return true;
} catch (error) {
logger.error(`Error closing connection ${connectionId}:`, error);
return false;
}
};
/**
* Stop all database connections
*/
export const stop = async (): Promise<void> => {
try {
// Close all connections
for (const [id, connection] of connections.entries()) {
if (connection.isActive) {
await closeConnection(id);
}
}
// Stop IPFS if needed
const ipfs = connections.get(defaultConnectionId || '')?.ipfs;
if (ipfs) {
await stopIpfs();
}
defaultConnectionId = null;
logger.info('All DB connections stopped successfully');
} catch (error) {
logger.error('Error stopping DB connections:', error);
throw error;
}
};

18
src/db/core/error.ts Normal file
View File

@ -0,0 +1,18 @@
import { ErrorCode } from '../types';
// Re-export error code for easier access
export { ErrorCode };
// Custom error class with error codes
export class DBError extends Error {
code: ErrorCode;
details?: any;
constructor(code: ErrorCode, message: string, details?: any) {
super(message);
this.name = 'DBError';
this.code = code;
this.details = details;
}
}

212
src/db/dbService.ts Normal file
View File

@ -0,0 +1,212 @@
import { createServiceLogger } from '../utils/logger';
import { init, getConnection, closeConnection, stop } from './core/connection';
import { defineSchema, validateDocument } from './schema/validator';
import * as cache from './cache/cacheService';
import * as events from './events/eventService';
import { getMetrics, resetMetrics } from './metrics/metricsService';
import { Transaction } from './transactions/transactionService';
import { StoreType, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions, ErrorCode } from './types';
import { DBError } from './core/error';
import { getStore } from './stores/storeFactory';
import { uploadFile, getFile, deleteFile } from './stores/fileStore';
// Re-export imported functions
export { init, closeConnection, stop, defineSchema, getMetrics, resetMetrics, uploadFile, getFile, deleteFile };
const logger = createServiceLogger('DB_SERVICE');
/**
* Create a new transaction for batching operations
*/
export const createTransaction = (connectionId?: string): Transaction => {
return new Transaction(connectionId);
};
/**
* Execute all operations in a transaction
*/
export const commitTransaction = async (transaction: Transaction): Promise<{ success: boolean; results: any[] }> => {
try {
// Validate that we have operations
const operations = transaction.getOperations();
if (operations.length === 0) {
return { success: true, results: [] };
}
const connectionId = transaction.getConnectionId();
const results = [];
// Execute all operations
for (const operation of operations) {
let result;
switch (operation.type) {
case 'create':
result = await create(
operation.collection,
operation.id,
operation.data,
{ connectionId }
);
break;
case 'update':
result = await update(
operation.collection,
operation.id,
operation.data,
{ connectionId }
);
break;
case 'delete':
result = await remove(
operation.collection,
operation.id,
{ connectionId }
);
break;
}
results.push(result);
}
return { success: true, results };
} catch (error) {
logger.error('Transaction failed:', error);
throw new DBError(ErrorCode.TRANSACTION_FAILED, 'Failed to commit transaction', error);
}
};
/**
* Create a new document in the specified collection using the appropriate store
*/
export const create = async <T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: { connectionId?: string, storeType?: StoreType }
): Promise<CreateResult> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
return store.create(collection, id, data, { connectionId: options?.connectionId });
};
/**
* Get a document by ID from a collection
*/
export const get = async <T extends Record<string, any>>(
collection: string,
id: string,
options?: { connectionId?: string; skipCache?: boolean, storeType?: StoreType }
): Promise<T | null> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
return store.get(collection, id, options);
};
/**
* Update a document in a collection
*/
export const update = async <T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: { connectionId?: string; upsert?: boolean, storeType?: StoreType }
): Promise<UpdateResult> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
return store.update(collection, id, data, options);
};
/**
* Delete a document from a collection
*/
export const remove = async (
collection: string,
id: string,
options?: { connectionId?: string, storeType?: StoreType }
): Promise<boolean> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
return store.remove(collection, id, options);
};
/**
* List all documents in a collection with pagination
*/
export const list = async <T extends Record<string, any>>(
collection: string,
options?: ListOptions & { storeType?: StoreType }
): Promise<PaginatedResult<T>> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
// Remove storeType from options
const { storeType: _, ...storeOptions } = options || {};
return store.list(collection, storeOptions);
};
/**
* Query documents in a collection with filtering and pagination
*/
export const query = async <T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions & { storeType?: StoreType }
): Promise<PaginatedResult<T>> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
// Remove storeType from options
const { storeType: _, ...storeOptions } = options || {};
return store.query(collection, filter, storeOptions);
};
/**
* Create an index for a collection to speed up queries
*/
export const createIndex = async (
collection: string,
field: string,
options?: { connectionId?: string, storeType?: StoreType }
): Promise<boolean> => {
const storeType = options?.storeType || StoreType.KEYVALUE;
const store = getStore(storeType);
return store.createIndex(collection, field, { connectionId: options?.connectionId });
};
/**
* Subscribe to database events
*/
export const subscribe = events.subscribe;
// Re-export error types and codes
export { DBError } from './core/error';
export { ErrorCode } from './types';
// Export store types
export { StoreType } from './types';
export default {
init,
create,
get,
update,
remove,
list,
query,
createIndex,
createTransaction,
commitTransaction,
subscribe,
uploadFile,
getFile,
deleteFile,
defineSchema,
getMetrics,
resetMetrics,
closeConnection,
stop,
StoreType
};

View File

@ -0,0 +1,36 @@
import { dbEvents } from '../types';
// Event types
type DBEventType = 'document:created' | 'document:updated' | 'document:deleted';
/**
* Subscribe to database events
*/
export const subscribe = (
event: DBEventType,
callback: (data: any) => void
): () => void => {
dbEvents.on(event, callback);
// Return unsubscribe function
return () => {
dbEvents.off(event, callback);
};
};
/**
* Emit an event
*/
export const emit = (
event: DBEventType,
data: any
): void => {
dbEvents.emit(event, data);
};
/**
* Remove all event listeners
*/
export const removeAllListeners = (): void => {
dbEvents.removeAllListeners();
};

View File

@ -0,0 +1,101 @@
import { Metrics, ErrorCode } from '../types';
import { DBError } from '../core/error';
import * as cacheService from '../cache/cacheService';
// Metrics tracking
const metrics: Metrics = {
operations: {
creates: 0,
reads: 0,
updates: 0,
deletes: 0,
queries: 0,
fileUploads: 0,
fileDownloads: 0,
},
performance: {
totalOperationTime: 0,
operationCount: 0,
},
errors: {
count: 0,
byCode: {},
},
cacheStats: {
hits: 0,
misses: 0,
},
startTime: Date.now(),
};
/**
* Measure performance of a database operation
*/
export async function measurePerformance<T>(operation: () => Promise<T>): Promise<T> {
const startTime = performance.now();
try {
const result = await operation();
const endTime = performance.now();
metrics.performance.totalOperationTime += (endTime - startTime);
metrics.performance.operationCount++;
return result;
} catch (error) {
const endTime = performance.now();
metrics.performance.totalOperationTime += (endTime - startTime);
metrics.performance.operationCount++;
// Track error metrics
metrics.errors.count++;
if (error instanceof DBError) {
metrics.errors.byCode[error.code] = (metrics.errors.byCode[error.code] || 0) + 1;
}
throw error;
}
}
/**
* Get database metrics
*/
export const getMetrics = (): Metrics => {
return {
...metrics,
// Sync cache stats
cacheStats: cacheService.cacheStats,
// Calculate some derived metrics
performance: {
...metrics.performance,
averageOperationTime: metrics.performance.operationCount > 0 ?
metrics.performance.totalOperationTime / metrics.performance.operationCount :
0
}
};
};
/**
* Reset metrics (useful for testing)
*/
export const resetMetrics = (): void => {
metrics.operations = {
creates: 0,
reads: 0,
updates: 0,
deletes: 0,
queries: 0,
fileUploads: 0,
fileDownloads: 0,
};
metrics.performance = {
totalOperationTime: 0,
operationCount: 0,
};
metrics.errors = {
count: 0,
byCode: {},
};
// Reset cache stats too
cacheService.resetStats();
metrics.startTime = Date.now();
};

225
src/db/schema/validator.ts Normal file
View File

@ -0,0 +1,225 @@
import { createServiceLogger } from '../../utils/logger';
import { CollectionSchema, ErrorCode } from '../types';
import { DBError } from '../core/error';
const logger = createServiceLogger('DB_SCHEMA');
// Store collection schemas
const schemas = new Map<string, CollectionSchema>();
/**
* Define a schema for a collection
*/
export const defineSchema = (collection: string, schema: CollectionSchema): void => {
schemas.set(collection, schema);
logger.info(`Schema defined for collection: ${collection}`);
};
/**
* Validate a document against its schema
*/
export const validateDocument = (collection: string, document: any): boolean => {
const schema = schemas.get(collection);
if (!schema) {
return true; // No schema defined, so validation passes
}
// Check required fields
if (schema.required) {
for (const field of schema.required) {
if (document[field] === undefined) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Required field '${field}' is missing`,
{ collection, document }
);
}
}
}
// Validate properties
for (const [field, definition] of Object.entries(schema.properties)) {
const value = document[field];
// Skip undefined optional fields
if (value === undefined) {
if (definition.required) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Required field '${field}' is missing`,
{ collection, document }
);
}
continue;
}
// Type validation
switch (definition.type) {
case 'string':
if (typeof value !== 'string') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be a string`,
{ collection, field, value }
);
}
// Pattern validation
if (definition.pattern && !new RegExp(definition.pattern).test(value)) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' does not match pattern: ${definition.pattern}`,
{ collection, field, value }
);
}
// Length validation
if (definition.min !== undefined && value.length < definition.min) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at least ${definition.min} characters`,
{ collection, field, value }
);
}
if (definition.max !== undefined && value.length > definition.max) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at most ${definition.max} characters`,
{ collection, field, value }
);
}
break;
case 'number':
if (typeof value !== 'number') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be a number`,
{ collection, field, value }
);
}
// Range validation
if (definition.min !== undefined && value < definition.min) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be at least ${definition.min}`,
{ collection, field, value }
);
}
if (definition.max !== undefined && value > definition.max) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be at most ${definition.max}`,
{ collection, field, value }
);
}
break;
case 'boolean':
if (typeof value !== 'boolean') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be a boolean`,
{ collection, field, value }
);
}
break;
case 'array':
if (!Array.isArray(value)) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be an array`,
{ collection, field, value }
);
}
// Length validation
if (definition.min !== undefined && value.length < definition.min) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at least ${definition.min} items`,
{ collection, field, value }
);
}
if (definition.max !== undefined && value.length > definition.max) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must have at most ${definition.max} items`,
{ collection, field, value }
);
}
// Validate array items if item schema is defined
if (definition.items && value.length > 0) {
for (let i = 0; i < value.length; i++) {
const item = value[i];
// This is a simplified item validation
// In a real implementation, this would recursively validate complex objects
switch (definition.items.type) {
case 'string':
if (typeof item !== 'string') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a string`,
{ collection, field, item }
);
}
break;
case 'number':
if (typeof item !== 'number') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a number`,
{ collection, field, item }
);
}
break;
case 'boolean':
if (typeof item !== 'boolean') {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Item at index ${i} in field '${field}' must be a boolean`,
{ collection, field, item }
);
}
break;
}
}
}
break;
case 'object':
if (typeof value !== 'object' || value === null || Array.isArray(value)) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be an object`,
{ collection, field, value }
);
}
// Nested object validation would go here in a real implementation
break;
case 'enum':
if (definition.enum && !definition.enum.includes(value)) {
throw new DBError(
ErrorCode.INVALID_SCHEMA,
`Field '${field}' must be one of: ${definition.enum.join(', ')}`,
{ collection, field, value }
);
}
break;
}
}
return true;
};

142
src/db/stores/baseStore.ts Normal file
View File

@ -0,0 +1,142 @@
import { createServiceLogger } from '../../utils/logger';
import { openDB } from '../../orbit/orbitDBService';
import { getConnection } from '../core/connection';
import { validateDocument } from '../schema/validator';
import {
ErrorCode,
StoreType,
StoreOptions,
CreateResult,
UpdateResult,
PaginatedResult,
QueryOptions,
ListOptions,
} from '../types';
import { DBError } from '../core/error';
const logger = createServiceLogger('DB_STORE');
/**
* Base Store interface that all store implementations should extend
*/
export interface BaseStore {
/**
* Create a new document
*/
create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions,
): Promise<CreateResult>;
/**
* Get a document by ID
*/
get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean },
): Promise<T | null>;
/**
* Update a document
*/
update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean },
): Promise<UpdateResult>;
/**
* Delete a document
*/
remove(collection: string, id: string, options?: StoreOptions): Promise<boolean>;
/**
* List all documents in a collection with pagination
*/
list<T extends Record<string, any>>(
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>>;
/**
* Query documents in a collection with filtering and pagination
*/
query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions,
): Promise<PaginatedResult<T>>;
/**
* Create an index for a collection to speed up queries
*/
createIndex(collection: string, field: string, options?: StoreOptions): Promise<boolean>;
}
/**
* Open a store of the specified type
*/
export async function openStore(
collection: string,
storeType: StoreType,
options?: StoreOptions,
): Promise<any> {
try {
const connection = getConnection(options?.connectionId);
logger.info(`Connection for ${collection}:`, connection);
return await openDB(collection, storeType).catch((err) => {
throw new Error(`OrbitDB openDB failed: ${err.message}`);
});
} catch (error) {
logger.error(`Error opening ${storeType} store for collection ${collection}:`, error);
throw new DBError(
ErrorCode.OPERATION_FAILED,
`Failed to open ${storeType} store for collection ${collection}`,
error,
);
}
}
/**
* Helper function to prepare a document for storage
*/
export function prepareDocument<T extends Record<string, any>>(
collection: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
existingDoc?: T | null,
): T {
const timestamp = Date.now();
// Sanitize the input data by replacing undefined with null
const sanitizedData = Object.fromEntries(
Object.entries(data).map(([key, value]) => [key, value === undefined ? null : value]),
) as Omit<T, 'createdAt' | 'updatedAt'>;
// If it's an update to an existing document
if (existingDoc) {
const doc = {
...existingDoc,
...sanitizedData,
updatedAt: timestamp,
} as T;
// Validate the document against its schema
validateDocument(collection, doc);
return doc;
}
// Otherwise it's a new document
const doc = {
...sanitizedData,
createdAt: timestamp,
updatedAt: timestamp,
} as unknown as T;
// Validate the document against its schema
validateDocument(collection, doc);
return doc;
}

View File

@ -0,0 +1,331 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('COUNTER_STORE');
/**
* CounterStore implementation
* Uses OrbitDB's counter store for simple numeric counters
*/
export class CounterStore implements BaseStore {
/**
* Create or set counter value
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Extract value from data, default to 0
const value = typeof data === 'object' && data !== null && 'value' in data ?
Number(data.value) : 0;
// Set the counter value
const hash = await db.set(value);
// Construct document representation
const document = {
id,
value,
createdAt: Date.now(),
updatedAt: Date.now()
};
// Add to cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Set counter in ${collection} to ${value}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error setting counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to set counter in ${collection}`, error);
}
});
}
/**
* Get counter value
*/
async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Note: for counters, id is not used in the underlying store (there's only one counter per db)
// but we use it for consistency with the API
// Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.COUNTER, options);
// Get the counter value
const value = await db.value();
// Construct document representation
const document = {
id,
value,
updatedAt: Date.now()
} as unknown as T;
// Update cache
cache.set(cacheKey, document);
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting counter from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get counter from ${collection}`, error);
}
});
}
/**
* Update counter (increment/decrement)
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Get current value before update
const currentValue = await db.value();
// Extract value from data
let value: number;
let operation: 'increment' | 'decrement' | 'set' = 'set';
// Check what kind of operation we're doing
if (typeof data === 'object' && data !== null) {
if ('increment' in data) {
value = Number(data.increment);
operation = 'increment';
} else if ('decrement' in data) {
value = Number(data.decrement);
operation = 'decrement';
} else if ('value' in data) {
value = Number(data.value);
operation = 'set';
} else {
value = 0;
operation = 'set';
}
} else {
value = 0;
operation = 'set';
}
// Update the counter
let hash;
let newValue;
switch (operation) {
case 'increment':
hash = await db.inc(value);
newValue = currentValue + value;
break;
case 'decrement':
hash = await db.inc(-value); // Counter store uses inc with negative value
newValue = currentValue - value;
break;
case 'set':
hash = await db.set(value);
newValue = value;
break;
}
// Construct document representation
const document = {
id,
value: newValue,
updatedAt: Date.now()
};
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', {
collection,
id,
document,
previous: { id, value: currentValue }
});
logger.info(`Updated counter in ${collection} from ${currentValue} to ${newValue}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update counter in ${collection}`, error);
}
});
}
/**
* Delete/reset counter
*/
async remove(
collection: string,
id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
// Get the current value for the event
const currentValue = await db.value();
// Reset the counter to 0 (counters can't be truly deleted)
await db.set(0);
// Remove from cache
const cacheKey = `${collection}:${id}`;
cache.del(cacheKey);
// Emit change event
events.emit('document:deleted', {
collection,
id,
document: { id, value: currentValue }
});
logger.info(`Reset counter in ${collection} from ${currentValue} to 0`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error resetting counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to reset counter in ${collection}`, error);
}
});
}
/**
* List all counters (for counter stores, there's only one counter per db)
*/
async list<T extends Record<string, any>>(
collection: string,
options?: ListOptions
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// For counter stores, we just return one document with the counter value
const document = {
id: '0', // Default ID since counters don't have IDs
value,
updatedAt: Date.now()
} as unknown as T;
return {
documents: [document],
total: 1,
hasMore: false
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list counter in ${collection}`, error);
}
});
}
/**
* Query is not applicable for counter stores, but we implement for API consistency
*/
async query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.COUNTER, options);
const value = await db.value();
// Create document
const document = {
id: '0', // Default ID since counters don't have IDs
value,
updatedAt: Date.now()
} as unknown as T;
// Apply filter
const documents = filter(document) ? [document] : [];
return {
documents,
total: documents.length,
hasMore: false
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying counter in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query counter in ${collection}`, error);
}
});
}
/**
* Create an index - not applicable for counter stores
*/
async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
logger.warn(`Index creation not supported for counter collections, ignoring request for ${collection}`);
return false;
}
}

350
src/db/stores/docStore.ts Normal file
View File

@ -0,0 +1,350 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('DOCSTORE');
/**
* DocStore implementation
* Uses OrbitDB's document store which allows for more complex document storage with indices
*/
export class DocStore implements BaseStore {
/**
* Create a new document in the specified collection
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
// Prepare document for storage (including _id which is required for docstore)
const document = {
_id: id,
...prepareDocument<T>(collection, data)
};
// Add to database
const hash = await db.put(document);
// Add to cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Created document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create document in ${collection}`, error);
}
});
}
/**
* Get a document by ID from a collection
*/
async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.DOCSTORE, options);
const document = await db.get(id) as T | null;
// Update cache if document exists
if (document) {
cache.set(cacheKey, document);
}
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting document ${id} from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get document ${id} from ${collection}`, error);
}
});
}
/**
* Update a document in a collection
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
const existing = await db.get(id) as T | null;
if (!existing && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Document ${id} not found in ${collection}`,
{ collection, id }
);
}
// Prepare document for update
const document = {
_id: id,
...prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existing || undefined)
};
// Update in database
const hash = await db.put(document);
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existing });
logger.info(`Updated document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update document in ${collection}`, error);
}
});
}
/**
* Delete a document from a collection
*/
async remove(
collection: string,
id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
// Get the document before deleting for the event
const document = await db.get(id);
// Delete from database
await db.del(id);
// Remove from cache
const cacheKey = `${collection}:${id}`;
cache.del(cacheKey);
// Emit change event
events.emit('document:deleted', { collection, id, document });
logger.info(`Deleted document in ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete document in ${collection}`, error);
}
});
}
/**
* List all documents in a collection with pagination
*/
async list<T extends Record<string, any>>(
collection: string,
options?: ListOptions
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
const allDocs = await db.query((doc: any) => true);
let documents = allDocs.map((doc: any) => ({
id: doc._id,
...doc
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list documents in ${collection}`, error);
}
});
}
/**
* Query documents in a collection with filtering and pagination
*/
async query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
// Apply filter using docstore's query capability
const filtered = await db.query((doc: any) => filter(doc as T));
// Map the documents to include id
let documents = filtered.map((doc: any) => ({
id: doc._id,
...doc
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query documents in ${collection}`, error);
}
});
}
/**
* Create an index for a collection to speed up queries
* DocStore has built-in indexing capabilities
*/
async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
try {
const db = await openStore(collection, StoreType.DOCSTORE, options);
// DocStore supports indexing, so we create the index
if (typeof db.createIndex === 'function') {
await db.createIndex(field);
logger.info(`Index created on ${field} for collection ${collection}`);
return true;
}
logger.info(`Index creation not supported for this DB instance, but DocStore has built-in indices`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating index for ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create index for ${collection}`, error);
}
}
}

416
src/db/stores/feedStore.ts Normal file
View File

@ -0,0 +1,416 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('FEED_STORE');
/**
* FeedStore/EventLog implementation
* Uses OrbitDB's feed/eventlog store which is an append-only log
*/
export class FeedStore implements BaseStore {
/**
* Create a new document in the specified collection
* For feeds, this appends a new entry
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Prepare document for storage with ID
const document = {
id,
...prepareDocument<T>(collection, data)
};
// Add to database
const hash = await db.add(document);
// Feed entries are append-only, so we use a different cache key pattern
const cacheKey = `${collection}:entry:${hash}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document, hash });
logger.info(`Created entry in feed ${collection} with id ${id} and hash ${hash}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating entry in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create entry in feed ${collection}`, error);
}
});
}
/**
* Get a specific entry in a feed - note this works differently than other stores
* as feeds are append-only logs identified by hash
*/
async get<T extends Record<string, any>>(
collection: string,
hash: string,
options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Check cache first if not skipped
const cacheKey = `${collection}:entry:${hash}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.FEED, options);
// Get the specific entry by hash
const entry = await db.get(hash);
if (!entry) {
return null;
}
const document = entry.payload.value as T;
// Update cache
cache.set(cacheKey, document);
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting entry ${hash} from feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get entry ${hash} from feed ${collection}`, error);
}
});
}
/**
* Update an entry in a feed
* Note: Feeds are append-only, so we can't actually update existing entries
* Instead, we append a new entry with the updated data and link it to the original
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Find the latest entry with the given id
const entries = await db.iterator({ limit: -1 }).collect();
const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value;
return value && value.id === id;
});
if (existingEntryIndex === -1 && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Entry with id ${id} not found in feed ${collection}`,
{ collection, id }
);
}
const existingEntry = existingEntryIndex !== -1 ? entries[existingEntryIndex].payload.value : null;
// Prepare document with update
const document = {
id,
...prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existingEntry),
// Add reference to the previous entry if it exists
previousEntryHash: existingEntryIndex !== -1 ? entries[existingEntryIndex].hash : undefined
};
// Add to feed (append new entry)
const hash = await db.add(document);
// Cache the new entry
const cacheKey = `${collection}:entry:${hash}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existingEntry });
logger.info(`Updated entry in feed ${collection} with id ${id} (new hash: ${hash})`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating entry in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update entry in feed ${collection}`, error);
}
});
}
/**
* Delete is not supported in feed/eventlog stores since they're append-only
* Instead, we add a "tombstone" entry that marks the entry as deleted
*/
async remove(
collection: string,
id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Find the entry with the given id
const entries = await db.iterator({ limit: -1 }).collect();
const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value;
return value && value.id === id;
});
if (existingEntryIndex === -1) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Entry with id ${id} not found in feed ${collection}`,
{ collection, id }
);
}
const existingEntry = entries[existingEntryIndex].payload.value;
const existingHash = entries[existingEntryIndex].hash;
// Add a "tombstone" entry that marks this as deleted
const tombstone = {
id,
deleted: true,
deletedAt: Date.now(),
previousEntryHash: existingHash
};
await db.add(tombstone);
// Emit change event
events.emit('document:deleted', { collection, id, document: existingEntry });
logger.info(`Marked entry as deleted in feed ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error marking entry as deleted in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to mark entry as deleted in feed ${collection}`, error);
}
});
}
/**
* List all entries in a feed with pagination
* Note: This will only return the latest entry for each unique ID
*/
async list<T extends Record<string, any>>(
collection: string,
options?: ListOptions
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries
const entries = await db.iterator({ limit: -1 }).collect();
// Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries
const latestEntries = new Map<string, any>();
for (const entry of entries) {
const value = entry.payload.value;
if (!value || value.deleted) continue;
const id = value.id;
if (!id) continue;
// If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) {
const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
latestEntries.set(id, { hash: entry.hash, value });
}
}
// Convert to array of documents
let documents = Array.from(latestEntries.values()).map(entry => ({
...entry.value
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing entries in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list entries in feed ${collection}`, error);
}
});
}
/**
* Query entries in a feed with filtering and pagination
* Note: This queries the latest entry for each unique ID
*/
async query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries
const entries = await db.iterator({ limit: -1 }).collect();
// Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries
const latestEntries = new Map<string, any>();
for (const entry of entries) {
const value = entry.payload.value;
if (!value || value.deleted) continue;
const id = value.id;
if (!id) continue;
// If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) {
const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
latestEntries.set(id, { hash: entry.hash, value });
}
}
// Convert to array of documents and apply filter
let filtered = Array.from(latestEntries.values())
.filter(entry => filter(entry.value as T))
.map(entry => ({
...entry.value
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
filtered.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = filtered.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = filtered.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying entries in feed ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query entries in feed ${collection}`, error);
}
});
}
/**
* Create an index for a collection - not supported for feeds
*/
async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
logger.warn(`Index creation not supported for feed collections, ignoring request for ${collection}`);
return false;
}
}

149
src/db/stores/fileStore.ts Normal file
View File

@ -0,0 +1,149 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, FileUploadResult, FileResult } from '../types';
import { DBError } from '../core/error';
import { getConnection } from '../core/connection';
import { openStore } from './baseStore';
import { getHelia } from '../../ipfs/ipfsService';
import { measurePerformance } from '../metrics/metricsService';
// Helper function to convert AsyncIterable to Buffer
async function readAsyncIterableToBuffer(asyncIterable: AsyncIterable<Uint8Array>): Promise<Buffer> {
const chunks: Uint8Array[] = [];
for await (const chunk of asyncIterable) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
const logger = createServiceLogger('FILE_STORE');
/**
* Upload a file to IPFS
*/
export const uploadFile = async (
fileData: Buffer,
options?: {
filename?: string;
connectionId?: string;
metadata?: Record<string, any>;
}
): Promise<FileUploadResult> => {
return measurePerformance(async () => {
try {
const connection = getConnection(options?.connectionId);
const ipfs = getHelia();
if (!ipfs) {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
// Add to IPFS
const blockstore = ipfs.blockstore;
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const cid = await fs.addBytes(fileData);
const cidStr = cid.toString();
// Store metadata
const filesDb = await openStore('_files', StoreType.KEYVALUE);
await filesDb.put(cidStr, {
filename: options?.filename,
size: fileData.length,
uploadedAt: Date.now(),
...options?.metadata
});
logger.info(`Uploaded file with CID: ${cidStr}`);
return { cid: cidStr };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error('Error uploading file:', error);
throw new DBError(ErrorCode.OPERATION_FAILED, 'Failed to upload file', error);
}
});
};
/**
* Get a file from IPFS by CID
*/
export const getFile = async (
cid: string,
options?: { connectionId?: string }
): Promise<FileResult> => {
return measurePerformance(async () => {
try {
const connection = getConnection(options?.connectionId);
const ipfs = getHelia();
if (!ipfs) {
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
// Get from IPFS
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);
const { CID } = await import('multiformats/cid');
const resolvedCid = CID.parse(cid);
try {
// Convert AsyncIterable to Buffer
const bytes = await readAsyncIterableToBuffer(fs.cat(resolvedCid));
// Get metadata if available
let metadata = null;
try {
const filesDb = await openStore('_files', StoreType.KEYVALUE);
metadata = await filesDb.get(cid);
} catch (err) {
// Metadata might not exist, continue without it
}
return { data: bytes, metadata };
} catch (error) {
throw new DBError(ErrorCode.FILE_NOT_FOUND, `File with CID ${cid} not found`, error);
}
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting file with CID ${cid}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get file with CID ${cid}`, error);
}
});
};
/**
* Delete a file from IPFS by CID
*/
export const deleteFile = async (
cid: string,
options?: { connectionId?: string }
): Promise<boolean> => {
return measurePerformance(async () => {
try {
const connection = getConnection(options?.connectionId);
// Delete metadata
try {
const filesDb = await openStore('_files', StoreType.KEYVALUE);
await filesDb.del(cid);
} catch (err) {
// Ignore if metadata doesn't exist
}
// In IPFS we can't really delete files, but we can remove them from our local blockstore
// and they will eventually be garbage collected if no one else has pinned them
logger.info(`Deleted file with CID: ${cid}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting file with CID ${cid}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete file with CID ${cid}`, error);
}
});
};

View File

@ -0,0 +1,335 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, StoreOptions, CreateResult, UpdateResult, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { DBError } from '../core/error';
import { BaseStore, openStore, prepareDocument } from './baseStore';
import * as cache from '../cache/cacheService';
import * as events from '../events/eventService';
import { measurePerformance } from '../metrics/metricsService';
const logger = createServiceLogger('KEYVALUE_STORE');
/**
* KeyValue Store implementation
*/
export class KeyValueStore implements BaseStore {
/**
* Create a new document in the specified collection
*/
async create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: StoreOptions
): Promise<CreateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
// Prepare document for storage
const document = prepareDocument<T>(collection, data);
// Add to database
const hash = await db.put(id, document);
// Add to cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:created', { collection, id, document });
logger.info(`Created document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create document in ${collection}`, error);
}
});
}
/**
* Get a document by ID from a collection
*/
async get<T extends Record<string, any>>(
collection: string,
id: string,
options?: StoreOptions & { skipCache?: boolean }
): Promise<T | null> {
return measurePerformance(async () => {
try {
// Check cache first if not skipped
const cacheKey = `${collection}:${id}`;
if (!options?.skipCache) {
const cachedDocument = cache.get<T>(cacheKey);
if (cachedDocument) {
return cachedDocument;
}
}
const db = await openStore(collection, StoreType.KEYVALUE, options);
const document = await db.get(id) as T | null;
// Update cache if document exists
if (document) {
cache.set(cacheKey, document);
}
return document;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error getting document ${id} from ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to get document ${id} from ${collection}`, error);
}
});
}
/**
* Update a document in a collection
*/
async update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: StoreOptions & { upsert?: boolean }
): Promise<UpdateResult> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
const existing = await db.get(id) as T | null;
if (!existing && !options?.upsert) {
throw new DBError(
ErrorCode.DOCUMENT_NOT_FOUND,
`Document ${id} not found in ${collection}`,
{ collection, id }
);
}
// Prepare document for update
const document = prepareDocument<T>(collection, data as unknown as Omit<T, "createdAt" | "updatedAt">, existing || undefined);
// Update in database
const hash = await db.put(id, document);
// Update cache
const cacheKey = `${collection}:${id}`;
cache.set(cacheKey, document);
// Emit change event
events.emit('document:updated', { collection, id, document, previous: existing });
logger.info(`Updated document in ${collection} with id ${id}`);
return { id, hash };
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error updating document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to update document in ${collection}`, error);
}
});
}
/**
* Delete a document from a collection
*/
async remove(
collection: string,
id: string,
options?: StoreOptions
): Promise<boolean> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
// Get the document before deleting for the event
const document = await db.get(id);
// Delete from database
await db.del(id);
// Remove from cache
const cacheKey = `${collection}:${id}`;
cache.del(cacheKey);
// Emit change event
events.emit('document:deleted', { collection, id, document });
logger.info(`Deleted document in ${collection} with id ${id}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error deleting document in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to delete document in ${collection}`, error);
}
});
}
/**
* List all documents in a collection with pagination
*/
async list<T extends Record<string, any>>(
collection: string,
options?: ListOptions
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
const all = await db.all();
let documents = Object.entries(all).map(([key, value]) => ({
id: key,
...(value as any)
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
documents.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = documents.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error listing documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to list documents in ${collection}`, error);
}
});
}
/**
* Query documents in a collection with filtering and pagination
*/
async query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions
): Promise<PaginatedResult<T>> {
return measurePerformance(async () => {
try {
const db = await openStore(collection, StoreType.KEYVALUE, options);
const all = await db.all();
// Apply filter
let filtered = Object.entries(all)
.filter(([_, value]) => filter(value as T))
.map(([key, value]) => ({
id: key,
...(value as any)
})) as T[];
// Sort if requested
if (options?.sort) {
const { field, order } = options.sort;
filtered.sort((a, b) => {
const valueA = a[field];
const valueB = b[field];
// Handle different data types for sorting
if (typeof valueA === 'string' && typeof valueB === 'string') {
return order === 'asc' ? valueA.localeCompare(valueB) : valueB.localeCompare(valueA);
} else if (typeof valueA === 'number' && typeof valueB === 'number') {
return order === 'asc' ? valueA - valueB : valueB - valueA;
} else if (valueA instanceof Date && valueB instanceof Date) {
return order === 'asc' ? valueA.getTime() - valueB.getTime() : valueB.getTime() - valueA.getTime();
}
// Default comparison for other types
return order === 'asc' ?
String(valueA).localeCompare(String(valueB)) :
String(valueB).localeCompare(String(valueA));
});
}
const total = filtered.length;
// Apply pagination
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = filtered.slice(offset, offset + limit);
const hasMore = offset + limit < total;
return {
documents: paginatedDocuments,
total,
hasMore
};
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error querying documents in ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to query documents in ${collection}`, error);
}
});
}
/**
* Create an index for a collection to speed up queries
*/
async createIndex(
collection: string,
field: string,
options?: StoreOptions
): Promise<boolean> {
try {
// In a real implementation, this would register the index with OrbitDB
// or create a specialized data structure. For now, we'll just log the request.
logger.info(`Index created on ${field} for collection ${collection}`);
return true;
} catch (error) {
if (error instanceof DBError) {
throw error;
}
logger.error(`Error creating index for ${collection}:`, error);
throw new DBError(ErrorCode.OPERATION_FAILED, `Failed to create index for ${collection}`, error);
}
}
}

View File

@ -0,0 +1,54 @@
import { createServiceLogger } from '../../utils/logger';
import { StoreType, ErrorCode } from '../types';
import { DBError } from '../core/error';
import { BaseStore } from './baseStore';
import { KeyValueStore } from './keyValueStore';
import { DocStore } from './docStore';
import { FeedStore } from './feedStore';
import { CounterStore } from './counterStore';
const logger = createServiceLogger('STORE_FACTORY');
// Initialize instances for each store type
const storeInstances = new Map<StoreType, BaseStore>();
/**
* Get a store instance by type
*/
export function getStore(type: StoreType): BaseStore {
// Check if we already have an instance
if (storeInstances.has(type)) {
return storeInstances.get(type)!;
}
// Create a new instance based on type
let store: BaseStore;
switch (type) {
case StoreType.KEYVALUE:
store = new KeyValueStore();
break;
case StoreType.DOCSTORE:
store = new DocStore();
break;
case StoreType.FEED:
case StoreType.EVENTLOG: // Alias for feed
store = new FeedStore();
break;
case StoreType.COUNTER:
store = new CounterStore();
break;
default:
logger.error(`Unsupported store type: ${type}`);
throw new DBError(ErrorCode.STORE_TYPE_ERROR, `Unsupported store type: ${type}`);
}
// Cache the instance
storeInstances.set(type, store);
return store;
}

View File

@ -0,0 +1,77 @@
import { createServiceLogger } from '../../utils/logger';
import { ErrorCode } from '../types';
import { DBError } from '../core/error';
const logger = createServiceLogger('DB_TRANSACTION');
// Transaction operation type
interface TransactionOperation {
type: 'create' | 'update' | 'delete';
collection: string;
id: string;
data?: any;
}
/**
* Transaction object for batching operations
*/
export class Transaction {
private operations: TransactionOperation[] = [];
private connectionId?: string;
constructor(connectionId?: string) {
this.connectionId = connectionId;
}
/**
* Add a create operation to the transaction
*/
create<T>(collection: string, id: string, data: T): Transaction {
this.operations.push({
type: 'create',
collection,
id,
data
});
return this;
}
/**
* Add an update operation to the transaction
*/
update<T>(collection: string, id: string, data: Partial<T>): Transaction {
this.operations.push({
type: 'update',
collection,
id,
data
});
return this;
}
/**
* Add a delete operation to the transaction
*/
delete(collection: string, id: string): Transaction {
this.operations.push({
type: 'delete',
collection,
id
});
return this;
}
/**
* Get all operations in this transaction
*/
getOperations(): TransactionOperation[] {
return [...this.operations];
}
/**
* Get connection ID for this transaction
*/
getConnectionId(): string | undefined {
return this.connectionId;
}
}

132
src/db/types/index.ts Normal file
View File

@ -0,0 +1,132 @@
// Common types for database operations
import { EventEmitter } from 'events';
import { Transaction } from '../transactions/transactionService';
export type { Transaction };
// Database Types
export enum StoreType {
KEYVALUE = 'keyvalue',
DOCSTORE = 'docstore',
FEED = 'feed',
EVENTLOG = 'eventlog',
COUNTER = 'counter'
}
// Common result types
export interface CreateResult {
id: string;
hash: string;
}
export interface UpdateResult {
id: string;
hash: string;
}
export interface FileUploadResult {
cid: string;
}
export interface FileMetadata {
filename?: string;
size: number;
uploadedAt: number;
[key: string]: any;
}
export interface FileResult {
data: Buffer;
metadata: FileMetadata | null;
}
export interface PaginatedResult<T> {
documents: T[];
total: number;
hasMore: boolean;
}
// Define error codes
export enum ErrorCode {
NOT_INITIALIZED = 'ERR_NOT_INITIALIZED',
INITIALIZATION_FAILED = 'ERR_INIT_FAILED',
DOCUMENT_NOT_FOUND = 'ERR_DOC_NOT_FOUND',
INVALID_SCHEMA = 'ERR_INVALID_SCHEMA',
OPERATION_FAILED = 'ERR_OPERATION_FAILED',
TRANSACTION_FAILED = 'ERR_TRANSACTION_FAILED',
FILE_NOT_FOUND = 'ERR_FILE_NOT_FOUND',
INVALID_PARAMETERS = 'ERR_INVALID_PARAMS',
CONNECTION_ERROR = 'ERR_CONNECTION',
STORE_TYPE_ERROR = 'ERR_STORE_TYPE',
}
// Connection pool interface
export interface DBConnection {
ipfs: any;
orbitdb: any;
timestamp: number;
isActive: boolean;
}
// Schema validation
export interface SchemaDefinition {
type: string;
required?: boolean;
pattern?: string;
min?: number;
max?: number;
enum?: any[];
items?: SchemaDefinition; // For arrays
properties?: Record<string, SchemaDefinition>; // For objects
}
export interface CollectionSchema {
properties: Record<string, SchemaDefinition>;
required?: string[];
}
// Metrics tracking
export interface Metrics {
operations: {
creates: number;
reads: number;
updates: number;
deletes: number;
queries: number;
fileUploads: number;
fileDownloads: number;
};
performance: {
totalOperationTime: number;
operationCount: number;
averageOperationTime?: number;
};
errors: {
count: number;
byCode: Record<string, number>;
};
cacheStats: {
hits: number;
misses: number;
};
startTime: number;
}
// Store options
export interface ListOptions {
limit?: number;
offset?: number;
connectionId?: string;
sort?: { field: string; order: 'asc' | 'desc' };
}
export interface QueryOptions extends ListOptions {
indexBy?: string;
}
export interface StoreOptions {
connectionId?: string;
}
// Event bus for database events
export const dbEvents = new EventEmitter();

View File

@ -2,97 +2,149 @@
import { config, defaultConfig, type DebrosConfig } from './config';
import { validateConfig, type ValidationResult } from './ipfs/config/configValidator';
// IPFS exports
import ipfsService, {
init as initIpfs,
stop as stopIpfs,
getHelia,
getProxyAgent,
getInstance,
getLibp2p,
getConnectedPeers,
getOptimalPeer,
updateNodeLoad,
logPeersStatus,
type IPFSModule,
} from './ipfs/ipfsService';
// Database service exports (new abstracted layer)
import {
init as initDB,
create,
get,
update,
remove,
list,
query,
createIndex,
createTransaction,
commitTransaction,
subscribe,
uploadFile,
getFile,
deleteFile,
defineSchema,
getMetrics,
resetMetrics,
closeConnection,
stop as stopDB,
} from './db/dbService';
import { ErrorCode, StoreType } from './db/types';
import { ipfsConfig, getIpfsPort, getBlockstorePath } from './ipfs/config/ipfsConfig';
// Import types
import type {
Transaction,
CreateResult,
UpdateResult,
PaginatedResult,
ListOptions,
QueryOptions,
FileUploadResult,
FileResult,
CollectionSchema,
SchemaDefinition,
Metrics,
} from './db/types';
// OrbitDB exports
import orbitDBService, {
init as initOrbitDB,
openDB,
getOrbitDB,
db as orbitDB,
getOrbitDBDir,
getDBAddress,
saveDBAddress,
} from './orbit/orbitDBService';
import { DBError } from './db/core/error';
import loadBalancerControllerDefault from './ipfs/loadBalancerController';
export const loadBalancerController = loadBalancerControllerDefault;
// Legacy exports (internal use only, not exposed in default export)
import { getConnectedPeers, logPeersStatus } from './ipfs/ipfsService';
// Load balancer exports
import loadBalancerController from './ipfs/loadBalancerController';
// Logger exports
import logger, { createServiceLogger, createDebrosLogger, type LoggerOptions } from './utils/logger';
import logger, {
createServiceLogger,
createDebrosLogger,
type LoggerOptions,
} from './utils/logger';
// Crypto exports
import { getPrivateKey } from './ipfs/utils/crypto';
// Export everything
// Export public API
export {
// Config
// Configuration
config,
defaultConfig,
validateConfig,
type DebrosConfig,
type ValidationResult,
// IPFS
ipfsService,
initIpfs,
stopIpfs,
getHelia,
getProxyAgent,
getInstance,
getLibp2p,
// Database Service (Main public API)
initDB,
create,
get,
update,
remove,
list,
query,
createIndex,
createTransaction,
commitTransaction,
subscribe,
uploadFile,
getFile,
deleteFile,
defineSchema,
getMetrics,
resetMetrics,
closeConnection,
stopDB,
ErrorCode,
StoreType,
// Load Balancer
loadBalancerController,
getConnectedPeers,
getOptimalPeer,
updateNodeLoad,
logPeersStatus,
type IPFSModule,
// IPFS Config
ipfsConfig,
getIpfsPort,
getBlockstorePath,
// OrbitDB
orbitDBService,
initOrbitDB,
openDB,
getOrbitDB,
orbitDB,
getOrbitDBDir,
getDBAddress,
saveDBAddress,
// Types
type Transaction,
type DBError,
type CollectionSchema,
type SchemaDefinition,
type CreateResult,
type UpdateResult,
type PaginatedResult,
type ListOptions,
type QueryOptions,
type FileUploadResult,
type FileResult,
type Metrics,
// Logger
logger,
createServiceLogger,
createDebrosLogger,
type LoggerOptions,
// Crypto
getPrivateKey,
};
// Default export for convenience
export default {
config,
validateConfig,
ipfsService,
orbitDBService,
// Database Service as main interface
db: {
init: initDB,
create,
get,
update,
remove,
list,
query,
createIndex,
createTransaction,
commitTransaction,
subscribe,
uploadFile,
getFile,
deleteFile,
defineSchema,
getMetrics,
resetMetrics,
closeConnection,
stop: stopDB,
ErrorCode,
StoreType,
},
loadBalancerController,
logPeersStatus,
getConnectedPeers,
logger,
createServiceLogger,
};

View File

@ -74,7 +74,7 @@ export const openDB = async (name: string, type: string) => {
const dbOptions = {
type,
overwrite: false,
AccessController: IPFSAccessController({ write: ['*'], storage: getHelia() }),
AccessController: IPFSAccessController({ write: ['*'] }),
};
if (existingAddress) {

View File

@ -50,7 +50,7 @@
// ]
// },
},
"include": ["src/**/*", "orbitdb.d.ts"],
"include": ["src/**/*", "orbitdb.d.ts", "types.d.ts"],
"exclude": ["coverage", "dist", "eslint.config.js", "node_modules"],
"ts-node": {
"esm": true

336
types.d.ts vendored
View File

@ -2,11 +2,15 @@
// Project: https://github.com/debros/anchat-relay
// Definitions by: Debros Team
declare module "@debros/network" {
import { Request, Response, NextFunction } from "express";
declare module '@debros/network' {
import { Request, Response, NextFunction } from 'express';
// Config types
export interface DebrosConfig {
env: {
fingerprint: string;
port: number;
};
ipfs: {
swarm: {
port: number;
@ -15,9 +19,15 @@ declare module "@debros/network" {
connectAddresses: string[];
};
blockstorePath: string;
orbitdbPath: string;
bootstrap: string[];
privateKey?: string;
serviceDiscovery?: {
topic: string;
heartbeatInterval: number;
};
};
orbitdb: {
directory: string;
};
logger: {
level: string;
@ -33,66 +43,216 @@ declare module "@debros/network" {
// Core configuration
export const config: DebrosConfig;
export const defaultConfig: DebrosConfig;
export function validateConfig(
config: Partial<DebrosConfig>
): ValidationResult;
export function validateConfig(config: Partial<DebrosConfig>): ValidationResult;
// IPFS types
export interface IPFSModule {
helia: any;
libp2p: any;
// Store types
export enum StoreType {
KEYVALUE = 'keyvalue',
DOCSTORE = 'docstore',
FEED = 'feed',
EVENTLOG = 'eventlog',
COUNTER = 'counter',
}
// IPFS Service
export const ipfsService: {
init(): Promise<IPFSModule>;
stop(): Promise<void>;
};
export function initIpfs(): Promise<IPFSModule>;
export function stopIpfs(): Promise<void>;
export function getHelia(): any;
export function getProxyAgent(): any;
export function getInstance(): IPFSModule;
export function getLibp2p(): any;
export function getConnectedPeers(): any[];
export function getOptimalPeer(): any;
export function updateNodeLoad(load: number): void;
export function logPeersStatus(): void;
// IPFS Config
export const ipfsConfig: any;
export function getIpfsPort(): number;
export function getBlockstorePath(): string;
// LoadBalancerController interface and value declaration
export interface LoadBalancerController {
getNodeInfo: (_req: Request, _res: Response, _next: NextFunction) => void;
getOptimalPeer: (
_req: Request,
_res: Response,
_next: NextFunction
) => void;
getAllPeers: (_req: Request, _res: Response, _next: NextFunction) => void;
// Error handling
export enum ErrorCode {
NOT_INITIALIZED = 'ERR_NOT_INITIALIZED',
INITIALIZATION_FAILED = 'ERR_INIT_FAILED',
DOCUMENT_NOT_FOUND = 'ERR_DOC_NOT_FOUND',
INVALID_SCHEMA = 'ERR_INVALID_SCHEMA',
OPERATION_FAILED = 'ERR_OPERATION_FAILED',
TRANSACTION_FAILED = 'ERR_TRANSACTION_FAILED',
FILE_NOT_FOUND = 'ERR_FILE_NOT_FOUND',
INVALID_PARAMETERS = 'ERR_INVALID_PARAMS',
CONNECTION_ERROR = 'ERR_CONNECTION',
STORE_TYPE_ERROR = 'ERR_STORE_TYPE',
}
// Declare loadBalancerController as a value
export const loadBalancerController: LoadBalancerController;
export class DBError extends Error {
code: ErrorCode;
details?: any;
constructor(code: ErrorCode, message: string, details?: any);
}
// OrbitDB
export const orbitDBService: {
init(): Promise<any>;
};
export function initOrbitDB(): Promise<any>;
export function openDB(
dbName: string,
dbType: string,
options?: any
): Promise<any>;
export function getOrbitDB(): any;
export const orbitDB: any;
export function getOrbitDBDir(): string;
export function getDBAddress(dbName: string): string | null;
export function saveDBAddress(dbName: string, address: string): void;
// Schema validation
export interface SchemaDefinition {
type: string;
required?: boolean;
pattern?: string;
min?: number;
max?: number;
enum?: any[];
items?: SchemaDefinition; // For arrays
properties?: Record<string, SchemaDefinition>; // For objects
}
export interface CollectionSchema {
properties: Record<string, SchemaDefinition>;
required?: string[];
}
// Database types
export interface DocumentMetadata {
createdAt: number;
updatedAt: number;
}
export interface Document extends DocumentMetadata {
[key: string]: any;
}
export interface CreateResult {
id: string;
hash: string;
}
export interface UpdateResult {
id: string;
hash: string;
}
export interface FileUploadResult {
cid: string;
}
export interface FileMetadata {
filename?: string;
size: number;
uploadedAt: number;
[key: string]: any;
}
export interface FileResult {
data: Buffer;
metadata: FileMetadata | null;
}
export interface ListOptions {
limit?: number;
offset?: number;
sort?: { field: string; order: 'asc' | 'desc' };
connectionId?: string;
storeType?: StoreType;
}
export interface QueryOptions extends ListOptions {
indexBy?: string;
}
export interface PaginatedResult<T> {
documents: T[];
total: number;
hasMore: boolean;
}
// Transaction API
export class Transaction {
create<T>(collection: string, id: string, data: T): Transaction;
update<T>(collection: string, id: string, data: Partial<T>): Transaction;
delete(collection: string, id: string): Transaction;
commit(): Promise<{ success: boolean; results: any[] }>;
}
// Metrics tracking
export interface Metrics {
operations: {
creates: number;
reads: number;
updates: number;
deletes: number;
queries: number;
fileUploads: number;
fileDownloads: number;
};
performance: {
totalOperationTime: number;
operationCount: number;
averageOperationTime: number;
};
errors: {
count: number;
byCode: Record<string, number>;
};
cacheStats: {
hits: number;
misses: number;
};
startTime: number;
}
// Database Operations
export function initDB(connectionId?: string): Promise<string>;
export function create<T extends Record<string, any>>(
collection: string,
id: string,
data: Omit<T, 'createdAt' | 'updatedAt'>,
options?: { connectionId?: string; storeType?: StoreType },
): Promise<CreateResult>;
export function get<T extends Record<string, any>>(
collection: string,
id: string,
options?: { connectionId?: string; skipCache?: boolean; storeType?: StoreType },
): Promise<T | null>;
export function update<T extends Record<string, any>>(
collection: string,
id: string,
data: Partial<Omit<T, 'createdAt' | 'updatedAt'>>,
options?: { connectionId?: string; upsert?: boolean; storeType?: StoreType },
): Promise<UpdateResult>;
export function remove(
collection: string,
id: string,
options?: { connectionId?: string; storeType?: StoreType },
): Promise<boolean>;
export function list<T extends Record<string, any>>(
collection: string,
options?: ListOptions,
): Promise<PaginatedResult<T>>;
export function query<T extends Record<string, any>>(
collection: string,
filter: (doc: T) => boolean,
options?: QueryOptions,
): Promise<PaginatedResult<T>>;
// Schema operations
export function defineSchema(collection: string, schema: CollectionSchema): void;
// Transaction operations
export function createTransaction(connectionId?: string): Transaction;
export function commitTransaction(
transaction: Transaction,
): Promise<{ success: boolean; results: any[] }>;
// Index operations
export function createIndex(
collection: string,
field: string,
options?: { connectionId?: string; storeType?: StoreType },
): Promise<boolean>;
// Subscription API
export function subscribe(
event: 'document:created' | 'document:updated' | 'document:deleted',
callback: (data: any) => void,
): () => void;
// File operations
export function uploadFile(
fileData: Buffer,
options?: { filename?: string; connectionId?: string; metadata?: Record<string, any> },
): Promise<FileUploadResult>;
export function getFile(cid: string, options?: { connectionId?: string }): Promise<FileResult>;
export function deleteFile(cid: string, options?: { connectionId?: string }): Promise<boolean>;
// Connection management
export function closeConnection(connectionId: string): Promise<boolean>;
// Metrics
export function getMetrics(): Metrics;
export function resetMetrics(): void;
// Stop
export function stopDB(): Promise<void>;
// Logger
export interface LoggerOptions {
@ -101,21 +261,67 @@ declare module "@debros/network" {
service?: string;
}
export const logger: any;
export function createServiceLogger(
name: string,
options?: LoggerOptions
): any;
export function createServiceLogger(name: string, options?: LoggerOptions): any;
export function createDebrosLogger(options?: LoggerOptions): any;
// Crypto
export function getPrivateKey(): Promise<string>;
// Load Balancer
export interface LoadBalancerControllerModule {
getNodeInfo: (req: Request, res: Response, next: NextFunction) => void;
getOptimalPeer: (req: Request, res: Response, next: NextFunction) => void;
getAllPeers: (req: Request, res: Response, next: NextFunction) => void;
}
export const loadBalancerController: LoadBalancerControllerModule;
export const getConnectedPeers: () => Map<
string,
{
lastSeen: number;
load: number;
publicAddress: string;
fingerprint: string;
}
>;
export const logPeersStatus: () => void;
// Default export
const defaultExport: {
config: DebrosConfig;
validateConfig: typeof validateConfig;
ipfsService: typeof ipfsService;
orbitDBService: typeof orbitDBService;
db: {
init: typeof initDB;
create: typeof create;
get: typeof get;
update: typeof update;
remove: typeof remove;
list: typeof list;
query: typeof query;
createIndex: typeof createIndex;
createTransaction: typeof createTransaction;
commitTransaction: typeof commitTransaction;
subscribe: typeof subscribe;
uploadFile: typeof uploadFile;
getFile: typeof getFile;
deleteFile: typeof deleteFile;
defineSchema: typeof defineSchema;
getMetrics: typeof getMetrics;
resetMetrics: typeof resetMetrics;
closeConnection: typeof closeConnection;
stop: typeof stopDB;
ErrorCode: typeof ErrorCode;
StoreType: typeof StoreType;
};
loadBalancerController: LoadBalancerControllerModule;
getConnectedPeers: () => Map<
string,
{
lastSeen: number;
load: number;
publicAddress: string;
fingerprint: string;
}
>;
logPeersStatus: () => void;
logger: any;
createServiceLogger: typeof createServiceLogger;
};