Merge pull request 'refactor: enhance connection management and error handling, sanitize data, and improve file upload logging' (#2) from alpha-0.0.24 into main
All checks were successful
Publish Alpha Package to npm / publish (push) Successful in 52s
All checks were successful
Publish Alpha Package to npm / publish (push) Successful in 52s
Reviewed-on: DeBros/network#2
This commit is contained in:
commit
dcc21d5867
1
.gitignore
vendored
1
.gitignore
vendored
@ -2,3 +2,4 @@ network.txt
|
||||
node_modules/
|
||||
dist/
|
||||
system.txt
|
||||
.DS_Store
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@debros/network",
|
||||
"version": "0.0.23-alpha",
|
||||
"version": "0.0.24-alpha",
|
||||
"description": "Debros network core functionality for IPFS, libp2p and OrbitDB",
|
||||
"type": "module",
|
||||
"main": "dist/index.js",
|
||||
|
@ -29,6 +29,16 @@ export const init = async (connectionId?: string): Promise<string> => {
|
||||
}
|
||||
|
||||
const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
|
||||
|
||||
// Check if connection already exists
|
||||
if (connections.has(connId)) {
|
||||
const existingConnection = connections.get(connId)!;
|
||||
if (existingConnection.isActive) {
|
||||
logger.info(`Using existing active connection: ${connId}`);
|
||||
return connId;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`Initializing DB service with connection ID: ${connId}`);
|
||||
|
||||
let attempts = 0;
|
||||
@ -87,6 +97,14 @@ export const init = async (connectionId?: string): Promise<string> => {
|
||||
logger.info(
|
||||
`Retrying initialization in ${delay}ms (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS})...`,
|
||||
);
|
||||
|
||||
// Clean up any partial initialization before retrying
|
||||
try {
|
||||
await stopIpfs();
|
||||
} catch (cleanupError) {
|
||||
logger.warn('Error during cleanup before retry:', cleanupError);
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
}
|
||||
}
|
||||
|
@ -85,6 +85,34 @@ export async function openStore(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively sanitize an object by removing undefined values
|
||||
* This is necessary because IPLD doesn't support undefined values
|
||||
*/
|
||||
function deepSanitizeUndefined(obj: any): any {
|
||||
if (obj === null || obj === undefined) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (Array.isArray(obj)) {
|
||||
return obj.map(deepSanitizeUndefined).filter((item) => item !== undefined);
|
||||
}
|
||||
|
||||
if (typeof obj === 'object' && obj.constructor === Object) {
|
||||
const sanitized: any = {};
|
||||
for (const [key, value] of Object.entries(obj)) {
|
||||
const sanitizedValue = deepSanitizeUndefined(value);
|
||||
// Only include the property if it's not undefined
|
||||
if (sanitizedValue !== undefined) {
|
||||
sanitized[key] = sanitizedValue;
|
||||
}
|
||||
}
|
||||
return sanitized;
|
||||
}
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to prepare a document for storage
|
||||
*/
|
||||
@ -95,10 +123,8 @@ export function prepareDocument<T extends Record<string, any>>(
|
||||
): T {
|
||||
const timestamp = Date.now();
|
||||
|
||||
// Sanitize the input data by replacing undefined with null
|
||||
const sanitizedData = Object.fromEntries(
|
||||
Object.entries(data).map(([key, value]) => [key, value === undefined ? null : value]),
|
||||
) as Omit<T, 'createdAt' | 'updatedAt'>;
|
||||
// Deep sanitize the input data by removing undefined values
|
||||
const sanitizedData = deepSanitizeUndefined(data) as Omit<T, 'createdAt' | 'updatedAt'>;
|
||||
|
||||
// Prepare document for validation
|
||||
let docToValidate: T;
|
||||
@ -119,9 +145,12 @@ export function prepareDocument<T extends Record<string, any>>(
|
||||
} as unknown as T;
|
||||
}
|
||||
|
||||
// Deep sanitize the final document to ensure no undefined values remain
|
||||
const finalDocument = deepSanitizeUndefined(docToValidate) as T;
|
||||
|
||||
// Validate the document BEFORE processing
|
||||
validateDocument(collection, docToValidate);
|
||||
validateDocument(collection, finalDocument);
|
||||
|
||||
// Return the validated document
|
||||
return docToValidate;
|
||||
return finalDocument;
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
import { StoreType, StoreOptions, PaginatedResult, QueryOptions, ListOptions } from '../types';
|
||||
import { AbstractStore } from './abstractStore';
|
||||
import { prepareDocument } from './baseStore';
|
||||
import { DBError, ErrorCode } from '../core/error';
|
||||
|
||||
/**
|
||||
* DocStore implementation
|
||||
@ -169,8 +170,6 @@ export class DocStore extends AbstractStore {
|
||||
* Helper to handle errors consistently
|
||||
*/
|
||||
private handleError(message: string, error: any): never {
|
||||
const { DBError, ErrorCode } = require('../core/error');
|
||||
|
||||
if (error instanceof DBError) {
|
||||
throw error;
|
||||
}
|
||||
|
@ -110,7 +110,12 @@ export class FeedStore implements BaseStore {
|
||||
try {
|
||||
const db = await openStore(collection, StoreType.FEED, options);
|
||||
|
||||
const entries = await db.iterator({ limit: -1 }).collect();
|
||||
// Get all entries using proper iterator API
|
||||
const entries = [];
|
||||
for await (const entry of db.iterator({ limit: -1 })) {
|
||||
entries.push(entry);
|
||||
}
|
||||
|
||||
const existingEntryIndex = entries.findIndex((e: any) => {
|
||||
const value = e.payload.value;
|
||||
return value && value.id === id;
|
||||
@ -169,8 +174,12 @@ export class FeedStore implements BaseStore {
|
||||
try {
|
||||
const db = await openStore(collection, StoreType.FEED, options);
|
||||
|
||||
// Find the entry with the given id
|
||||
const entries = await db.iterator({ limit: -1 }).collect();
|
||||
// Find the entry with the given id using proper iterator API
|
||||
const entries = [];
|
||||
for await (const entry of db.iterator({ limit: -1 })) {
|
||||
entries.push(entry);
|
||||
}
|
||||
|
||||
const existingEntryIndex = entries.findIndex((e: any) => {
|
||||
const value = e.payload.value;
|
||||
return value && value.id === id;
|
||||
@ -227,14 +236,51 @@ export class FeedStore implements BaseStore {
|
||||
try {
|
||||
const db = await openStore(collection, StoreType.FEED, options);
|
||||
|
||||
// Get all entries
|
||||
const entries = await db.iterator({ limit: -1 }).collect();
|
||||
// Use proper pagination instead of loading everything
|
||||
const requestedLimit = options?.limit || 50;
|
||||
const requestedOffset = options?.offset || 0;
|
||||
|
||||
// For feeds, we need to get more entries than requested since we'll filter duplicates
|
||||
// Use a reasonable multiplier but cap it to prevent memory issues
|
||||
const fetchLimit = requestedLimit === -1 ? -1 : Math.min(requestedLimit * 3, 1000);
|
||||
|
||||
// Get entries using proper iterator API with pagination
|
||||
const entries = [];
|
||||
let count = 0;
|
||||
let skipped = 0;
|
||||
|
||||
for await (const entry of db.iterator({ limit: fetchLimit })) {
|
||||
// Skip entries for offset
|
||||
if (requestedOffset > 0 && skipped < requestedOffset) {
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
entries.push(entry);
|
||||
count++;
|
||||
|
||||
// Break if we have enough entries and not requesting all
|
||||
if (requestedLimit !== -1 && count >= fetchLimit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Group by ID and keep only the latest entry for each ID
|
||||
// Also filter out tombstone entries
|
||||
const latestEntries = new Map<string, any>();
|
||||
for (const entry of entries) {
|
||||
const value = entry.payload.value;
|
||||
// Handle different possible entry structures
|
||||
let value;
|
||||
if (entry && entry.payload && entry.payload.value) {
|
||||
value = entry.payload.value;
|
||||
} else if (entry && entry.value) {
|
||||
value = entry.value;
|
||||
} else if (entry && typeof entry === 'object') {
|
||||
value = entry;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!value || value.deleted) continue;
|
||||
|
||||
const id = value.id;
|
||||
@ -243,7 +289,9 @@ export class FeedStore implements BaseStore {
|
||||
// If we already have an entry with this ID, check which is newer
|
||||
if (latestEntries.has(id)) {
|
||||
const existing = latestEntries.get(id);
|
||||
if (value.updatedAt > existing.value.updatedAt) {
|
||||
const existingTime = existing.value.updatedAt || existing.value.timestamp || 0;
|
||||
const currentTime = value.updatedAt || value.timestamp || 0;
|
||||
if (currentTime > existingTime) {
|
||||
latestEntries.set(id, { hash: entry.hash, value });
|
||||
}
|
||||
} else {
|
||||
@ -281,13 +329,11 @@ export class FeedStore implements BaseStore {
|
||||
});
|
||||
}
|
||||
|
||||
// Apply pagination
|
||||
// Apply final pagination to the processed results
|
||||
const total = documents.length;
|
||||
const offset = options?.offset || 0;
|
||||
const limit = options?.limit || total;
|
||||
|
||||
const paginatedDocuments = documents.slice(offset, offset + limit);
|
||||
const hasMore = offset + limit < total;
|
||||
const finalLimit = requestedLimit === -1 ? total : requestedLimit;
|
||||
const paginatedDocuments = documents.slice(0, finalLimit);
|
||||
const hasMore = documents.length > finalLimit;
|
||||
|
||||
return {
|
||||
documents: paginatedDocuments,
|
||||
@ -320,14 +366,28 @@ export class FeedStore implements BaseStore {
|
||||
try {
|
||||
const db = await openStore(collection, StoreType.FEED, options);
|
||||
|
||||
// Get all entries
|
||||
const entries = await db.iterator({ limit: -1 }).collect();
|
||||
// Get all entries using proper iterator API
|
||||
const entries = [];
|
||||
for await (const entry of db.iterator({ limit: -1 })) {
|
||||
entries.push(entry);
|
||||
}
|
||||
|
||||
// Group by ID and keep only the latest entry for each ID
|
||||
// Also filter out tombstone entries
|
||||
const latestEntries = new Map<string, any>();
|
||||
for (const entry of entries) {
|
||||
const value = entry.payload.value;
|
||||
// Handle different possible entry structures
|
||||
let value;
|
||||
if (entry && entry.payload && entry.payload.value) {
|
||||
value = entry.payload.value;
|
||||
} else if (entry && entry.value) {
|
||||
value = entry.value;
|
||||
} else if (entry && typeof entry === 'object') {
|
||||
value = entry;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!value || value.deleted) continue;
|
||||
|
||||
const id = value.id;
|
||||
|
@ -2,7 +2,7 @@ import { createServiceLogger } from '../../utils/logger';
|
||||
import { ErrorCode, StoreType, FileUploadResult, FileResult } from '../types';
|
||||
import { DBError } from '../core/error';
|
||||
import { openStore } from './baseStore';
|
||||
import { getHelia } from '../../ipfs/ipfsService';
|
||||
import ipfsService, { getHelia } from '../../ipfs/ipfsService';
|
||||
import { CreateResult, StoreOptions } from '../types';
|
||||
|
||||
async function readAsyncIterableToBuffer(
|
||||
@ -31,9 +31,22 @@ export const uploadFile = async (
|
||||
try {
|
||||
const ipfs = getHelia();
|
||||
if (!ipfs) {
|
||||
logger.error('IPFS instance not available - Helia is null or undefined');
|
||||
// Try to check if IPFS service is running
|
||||
try {
|
||||
const heliaInstance = ipfsService.getHelia();
|
||||
logger.error(
|
||||
'IPFS Service getHelia() returned:',
|
||||
heliaInstance ? 'instance available' : 'null/undefined',
|
||||
);
|
||||
} catch (importError) {
|
||||
logger.error('Error importing IPFS service:', importError);
|
||||
}
|
||||
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
|
||||
}
|
||||
|
||||
logger.info(`Attempting to upload file with size: ${fileData.length} bytes`);
|
||||
|
||||
// Add to IPFS
|
||||
const unixfs = await import('@helia/unixfs');
|
||||
const fs = unixfs.unixfs(ipfs);
|
||||
|
@ -26,9 +26,9 @@ export const isLocked = (resourceId: string): boolean => {
|
||||
// Database Types
|
||||
export enum StoreType {
|
||||
KEYVALUE = 'keyvalue',
|
||||
DOCSTORE = 'docstore',
|
||||
DOCSTORE = 'documents',
|
||||
FEED = 'feed',
|
||||
EVENTLOG = 'eventlog',
|
||||
EVENTLOG = 'events',
|
||||
COUNTER = 'counter',
|
||||
}
|
||||
|
||||
|
@ -22,6 +22,7 @@ import {
|
||||
closeConnection,
|
||||
stop as stopDB,
|
||||
} from './db/dbService';
|
||||
|
||||
import { ErrorCode, StoreType } from './db/types';
|
||||
|
||||
// Import types
|
||||
|
@ -6,7 +6,10 @@ export const getIpfsPort = (): number => {
|
||||
return parseInt(process.env.IPFS_PORT);
|
||||
}
|
||||
const httpPort = parseInt(process.env.PORT || '7777');
|
||||
return httpPort + 1; // Default to HTTP port + 1
|
||||
// Add some randomness to avoid port conflicts during retries
|
||||
const basePort = httpPort + 1;
|
||||
const randomOffset = Math.floor(Math.random() * 10);
|
||||
return basePort + randomOffset; // Add random offset to avoid conflicts
|
||||
};
|
||||
|
||||
// Get a node-specific blockstore path
|
||||
|
@ -26,6 +26,18 @@ let reconnectInterval: NodeJS.Timeout;
|
||||
|
||||
export const initIpfsNode = async (externalProxyAgent: any = null) => {
|
||||
try {
|
||||
// If already initialized, return existing instance
|
||||
if (helia && libp2pNode) {
|
||||
logger.info('IPFS node already initialized, returning existing instance');
|
||||
return helia;
|
||||
}
|
||||
|
||||
// Clean up any existing instances first
|
||||
if (helia || libp2pNode) {
|
||||
logger.info('Cleaning up existing IPFS instances before reinitializing');
|
||||
await stopIpfsNode();
|
||||
}
|
||||
|
||||
proxyAgent = externalProxyAgent;
|
||||
|
||||
const blockstorePath = ipfsConfig.blockstorePath;
|
||||
@ -168,20 +180,38 @@ function setupPeerEventListeners(node: Libp2p) {
|
||||
}
|
||||
|
||||
export const stopIpfsNode = async () => {
|
||||
logger.info('Stopping IPFS node...');
|
||||
|
||||
if (reconnectInterval) {
|
||||
clearInterval(reconnectInterval);
|
||||
reconnectInterval = undefined as any;
|
||||
}
|
||||
|
||||
if (libp2pNode) {
|
||||
try {
|
||||
const pubsub = libp2pNode.services.pubsub as PubSub;
|
||||
await stopDiscoveryService(pubsub);
|
||||
|
||||
// Stop libp2p
|
||||
await libp2pNode.stop();
|
||||
} catch (error) {
|
||||
logger.error('Error stopping libp2p node:', error);
|
||||
}
|
||||
libp2pNode = undefined as any;
|
||||
} else {
|
||||
await stopDiscoveryService(null);
|
||||
}
|
||||
|
||||
if (helia) {
|
||||
try {
|
||||
await helia.stop();
|
||||
} catch (error) {
|
||||
logger.error('Error stopping Helia:', error);
|
||||
}
|
||||
helia = null;
|
||||
}
|
||||
|
||||
logger.info('IPFS node stopped successfully');
|
||||
};
|
||||
|
||||
export const getHeliaInstance = () => {
|
||||
|
Reference in New Issue
Block a user