refactor: enhance connection management and error handling, sanitize data, and improve file upload logging #2

Merged
anonpenguin merged 1 commits from alpha-0.0.24 into main 2025-06-18 12:34:16 +00:00
11 changed files with 187 additions and 33 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@ network.txt
node_modules/
dist/
system.txt
.DS_Store

View File

@ -1,6 +1,6 @@
{
"name": "@debros/network",
"version": "0.0.23-alpha",
"version": "0.0.24-alpha",
"description": "Debros network core functionality for IPFS, libp2p and OrbitDB",
"type": "module",
"main": "dist/index.js",

View File

@ -29,6 +29,16 @@ export const init = async (connectionId?: string): Promise<string> => {
}
const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// Check if connection already exists
if (connections.has(connId)) {
const existingConnection = connections.get(connId)!;
if (existingConnection.isActive) {
logger.info(`Using existing active connection: ${connId}`);
return connId;
}
}
logger.info(`Initializing DB service with connection ID: ${connId}`);
let attempts = 0;
@ -87,6 +97,14 @@ export const init = async (connectionId?: string): Promise<string> => {
logger.info(
`Retrying initialization in ${delay}ms (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS})...`,
);
// Clean up any partial initialization before retrying
try {
await stopIpfs();
} catch (cleanupError) {
logger.warn('Error during cleanup before retry:', cleanupError);
}
await new Promise((resolve) => setTimeout(resolve, delay));
}
}

View File

@ -85,6 +85,34 @@ export async function openStore(
}
}
/**
* Recursively sanitize an object by removing undefined values
* This is necessary because IPLD doesn't support undefined values
*/
function deepSanitizeUndefined(obj: any): any {
if (obj === null || obj === undefined) {
return null;
}
if (Array.isArray(obj)) {
return obj.map(deepSanitizeUndefined).filter((item) => item !== undefined);
}
if (typeof obj === 'object' && obj.constructor === Object) {
const sanitized: any = {};
for (const [key, value] of Object.entries(obj)) {
const sanitizedValue = deepSanitizeUndefined(value);
// Only include the property if it's not undefined
if (sanitizedValue !== undefined) {
sanitized[key] = sanitizedValue;
}
}
return sanitized;
}
return obj;
}
/**
* Helper function to prepare a document for storage
*/
@ -95,10 +123,8 @@ export function prepareDocument<T extends Record<string, any>>(
): T {
const timestamp = Date.now();
// Sanitize the input data by replacing undefined with null
const sanitizedData = Object.fromEntries(
Object.entries(data).map(([key, value]) => [key, value === undefined ? null : value]),
) as Omit<T, 'createdAt' | 'updatedAt'>;
// Deep sanitize the input data by removing undefined values
const sanitizedData = deepSanitizeUndefined(data) as Omit<T, 'createdAt' | 'updatedAt'>;
// Prepare document for validation
let docToValidate: T;
@ -119,9 +145,12 @@ export function prepareDocument<T extends Record<string, any>>(
} as unknown as T;
}
// Deep sanitize the final document to ensure no undefined values remain
const finalDocument = deepSanitizeUndefined(docToValidate) as T;
// Validate the document BEFORE processing
validateDocument(collection, docToValidate);
validateDocument(collection, finalDocument);
// Return the validated document
return docToValidate;
return finalDocument;
}

View File

@ -1,6 +1,7 @@
import { StoreType, StoreOptions, PaginatedResult, QueryOptions, ListOptions } from '../types';
import { AbstractStore } from './abstractStore';
import { prepareDocument } from './baseStore';
import { DBError, ErrorCode } from '../core/error';
/**
* DocStore implementation
@ -169,8 +170,6 @@ export class DocStore extends AbstractStore {
* Helper to handle errors consistently
*/
private handleError(message: string, error: any): never {
const { DBError, ErrorCode } = require('../core/error');
if (error instanceof DBError) {
throw error;
}

View File

@ -110,7 +110,12 @@ export class FeedStore implements BaseStore {
try {
const db = await openStore(collection, StoreType.FEED, options);
const entries = await db.iterator({ limit: -1 }).collect();
// Get all entries using proper iterator API
const entries = [];
for await (const entry of db.iterator({ limit: -1 })) {
entries.push(entry);
}
const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value;
return value && value.id === id;
@ -169,8 +174,12 @@ export class FeedStore implements BaseStore {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Find the entry with the given id
const entries = await db.iterator({ limit: -1 }).collect();
// Find the entry with the given id using proper iterator API
const entries = [];
for await (const entry of db.iterator({ limit: -1 })) {
entries.push(entry);
}
const existingEntryIndex = entries.findIndex((e: any) => {
const value = e.payload.value;
return value && value.id === id;
@ -227,14 +236,51 @@ export class FeedStore implements BaseStore {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries
const entries = await db.iterator({ limit: -1 }).collect();
// Use proper pagination instead of loading everything
const requestedLimit = options?.limit || 50;
const requestedOffset = options?.offset || 0;
// For feeds, we need to get more entries than requested since we'll filter duplicates
// Use a reasonable multiplier but cap it to prevent memory issues
const fetchLimit = requestedLimit === -1 ? -1 : Math.min(requestedLimit * 3, 1000);
// Get entries using proper iterator API with pagination
const entries = [];
let count = 0;
let skipped = 0;
for await (const entry of db.iterator({ limit: fetchLimit })) {
// Skip entries for offset
if (requestedOffset > 0 && skipped < requestedOffset) {
skipped++;
continue;
}
entries.push(entry);
count++;
// Break if we have enough entries and not requesting all
if (requestedLimit !== -1 && count >= fetchLimit) {
break;
}
}
// Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries
const latestEntries = new Map<string, any>();
for (const entry of entries) {
const value = entry.payload.value;
// Handle different possible entry structures
let value;
if (entry && entry.payload && entry.payload.value) {
value = entry.payload.value;
} else if (entry && entry.value) {
value = entry.value;
} else if (entry && typeof entry === 'object') {
value = entry;
} else {
continue;
}
if (!value || value.deleted) continue;
const id = value.id;
@ -243,7 +289,9 @@ export class FeedStore implements BaseStore {
// If we already have an entry with this ID, check which is newer
if (latestEntries.has(id)) {
const existing = latestEntries.get(id);
if (value.updatedAt > existing.value.updatedAt) {
const existingTime = existing.value.updatedAt || existing.value.timestamp || 0;
const currentTime = value.updatedAt || value.timestamp || 0;
if (currentTime > existingTime) {
latestEntries.set(id, { hash: entry.hash, value });
}
} else {
@ -281,13 +329,11 @@ export class FeedStore implements BaseStore {
});
}
// Apply pagination
// Apply final pagination to the processed results
const total = documents.length;
const offset = options?.offset || 0;
const limit = options?.limit || total;
const paginatedDocuments = documents.slice(offset, offset + limit);
const hasMore = offset + limit < total;
const finalLimit = requestedLimit === -1 ? total : requestedLimit;
const paginatedDocuments = documents.slice(0, finalLimit);
const hasMore = documents.length > finalLimit;
return {
documents: paginatedDocuments,
@ -320,14 +366,28 @@ export class FeedStore implements BaseStore {
try {
const db = await openStore(collection, StoreType.FEED, options);
// Get all entries
const entries = await db.iterator({ limit: -1 }).collect();
// Get all entries using proper iterator API
const entries = [];
for await (const entry of db.iterator({ limit: -1 })) {
entries.push(entry);
}
// Group by ID and keep only the latest entry for each ID
// Also filter out tombstone entries
const latestEntries = new Map<string, any>();
for (const entry of entries) {
const value = entry.payload.value;
// Handle different possible entry structures
let value;
if (entry && entry.payload && entry.payload.value) {
value = entry.payload.value;
} else if (entry && entry.value) {
value = entry.value;
} else if (entry && typeof entry === 'object') {
value = entry;
} else {
continue;
}
if (!value || value.deleted) continue;
const id = value.id;

View File

@ -2,7 +2,7 @@ import { createServiceLogger } from '../../utils/logger';
import { ErrorCode, StoreType, FileUploadResult, FileResult } from '../types';
import { DBError } from '../core/error';
import { openStore } from './baseStore';
import { getHelia } from '../../ipfs/ipfsService';
import ipfsService, { getHelia } from '../../ipfs/ipfsService';
import { CreateResult, StoreOptions } from '../types';
async function readAsyncIterableToBuffer(
@ -31,9 +31,22 @@ export const uploadFile = async (
try {
const ipfs = getHelia();
if (!ipfs) {
logger.error('IPFS instance not available - Helia is null or undefined');
// Try to check if IPFS service is running
try {
const heliaInstance = ipfsService.getHelia();
logger.error(
'IPFS Service getHelia() returned:',
heliaInstance ? 'instance available' : 'null/undefined',
);
} catch (importError) {
logger.error('Error importing IPFS service:', importError);
}
throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available');
}
logger.info(`Attempting to upload file with size: ${fileData.length} bytes`);
// Add to IPFS
const unixfs = await import('@helia/unixfs');
const fs = unixfs.unixfs(ipfs);

View File

@ -26,9 +26,9 @@ export const isLocked = (resourceId: string): boolean => {
// Database Types
export enum StoreType {
KEYVALUE = 'keyvalue',
DOCSTORE = 'docstore',
DOCSTORE = 'documents',
FEED = 'feed',
EVENTLOG = 'eventlog',
EVENTLOG = 'events',
COUNTER = 'counter',
}

View File

@ -22,6 +22,7 @@ import {
closeConnection,
stop as stopDB,
} from './db/dbService';
import { ErrorCode, StoreType } from './db/types';
// Import types

View File

@ -6,7 +6,10 @@ export const getIpfsPort = (): number => {
return parseInt(process.env.IPFS_PORT);
}
const httpPort = parseInt(process.env.PORT || '7777');
return httpPort + 1; // Default to HTTP port + 1
// Add some randomness to avoid port conflicts during retries
const basePort = httpPort + 1;
const randomOffset = Math.floor(Math.random() * 10);
return basePort + randomOffset; // Add random offset to avoid conflicts
};
// Get a node-specific blockstore path

View File

@ -26,6 +26,18 @@ let reconnectInterval: NodeJS.Timeout;
export const initIpfsNode = async (externalProxyAgent: any = null) => {
try {
// If already initialized, return existing instance
if (helia && libp2pNode) {
logger.info('IPFS node already initialized, returning existing instance');
return helia;
}
// Clean up any existing instances first
if (helia || libp2pNode) {
logger.info('Cleaning up existing IPFS instances before reinitializing');
await stopIpfsNode();
}
proxyAgent = externalProxyAgent;
const blockstorePath = ipfsConfig.blockstorePath;
@ -168,20 +180,38 @@ function setupPeerEventListeners(node: Libp2p) {
}
export const stopIpfsNode = async () => {
logger.info('Stopping IPFS node...');
if (reconnectInterval) {
clearInterval(reconnectInterval);
reconnectInterval = undefined as any;
}
if (libp2pNode) {
try {
const pubsub = libp2pNode.services.pubsub as PubSub;
await stopDiscoveryService(pubsub);
// Stop libp2p
await libp2pNode.stop();
} catch (error) {
logger.error('Error stopping libp2p node:', error);
}
libp2pNode = undefined as any;
} else {
await stopDiscoveryService(null);
}
if (helia) {
try {
await helia.stop();
} catch (error) {
logger.error('Error stopping Helia:', error);
}
helia = null;
}
logger.info('IPFS node stopped successfully');
};
export const getHeliaInstance = () => {