diff --git a/.gitignore b/.gitignore index ede8588..b62507c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ network.txt node_modules/ dist/ -system.txt \ No newline at end of file +system.txt +.DS_Store \ No newline at end of file diff --git a/package.json b/package.json index 05c502b..83e4943 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@debros/network", - "version": "0.0.23-alpha", + "version": "0.0.24-alpha", "description": "Debros network core functionality for IPFS, libp2p and OrbitDB", "type": "module", "main": "dist/index.js", diff --git a/src/db/core/connection.ts b/src/db/core/connection.ts index c803fd6..d202b9a 100644 --- a/src/db/core/connection.ts +++ b/src/db/core/connection.ts @@ -29,6 +29,16 @@ export const init = async (connectionId?: string): Promise => { } const connId = connectionId || `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + + // Check if connection already exists + if (connections.has(connId)) { + const existingConnection = connections.get(connId)!; + if (existingConnection.isActive) { + logger.info(`Using existing active connection: ${connId}`); + return connId; + } + } + logger.info(`Initializing DB service with connection ID: ${connId}`); let attempts = 0; @@ -87,6 +97,14 @@ export const init = async (connectionId?: string): Promise => { logger.info( `Retrying initialization in ${delay}ms (attempt ${attempts + 1}/${MAX_RETRY_ATTEMPTS})...`, ); + + // Clean up any partial initialization before retrying + try { + await stopIpfs(); + } catch (cleanupError) { + logger.warn('Error during cleanup before retry:', cleanupError); + } + await new Promise((resolve) => setTimeout(resolve, delay)); } } diff --git a/src/db/stores/baseStore.ts b/src/db/stores/baseStore.ts index ba6cb49..74bea1b 100644 --- a/src/db/stores/baseStore.ts +++ b/src/db/stores/baseStore.ts @@ -85,6 +85,34 @@ export async function openStore( } } +/** + * Recursively sanitize an object by removing undefined values + * This is necessary because IPLD doesn't support undefined values + */ +function deepSanitizeUndefined(obj: any): any { + if (obj === null || obj === undefined) { + return null; + } + + if (Array.isArray(obj)) { + return obj.map(deepSanitizeUndefined).filter((item) => item !== undefined); + } + + if (typeof obj === 'object' && obj.constructor === Object) { + const sanitized: any = {}; + for (const [key, value] of Object.entries(obj)) { + const sanitizedValue = deepSanitizeUndefined(value); + // Only include the property if it's not undefined + if (sanitizedValue !== undefined) { + sanitized[key] = sanitizedValue; + } + } + return sanitized; + } + + return obj; +} + /** * Helper function to prepare a document for storage */ @@ -95,10 +123,8 @@ export function prepareDocument>( ): T { const timestamp = Date.now(); - // Sanitize the input data by replacing undefined with null - const sanitizedData = Object.fromEntries( - Object.entries(data).map(([key, value]) => [key, value === undefined ? null : value]), - ) as Omit; + // Deep sanitize the input data by removing undefined values + const sanitizedData = deepSanitizeUndefined(data) as Omit; // Prepare document for validation let docToValidate: T; @@ -119,9 +145,12 @@ export function prepareDocument>( } as unknown as T; } + // Deep sanitize the final document to ensure no undefined values remain + const finalDocument = deepSanitizeUndefined(docToValidate) as T; + // Validate the document BEFORE processing - validateDocument(collection, docToValidate); + validateDocument(collection, finalDocument); // Return the validated document - return docToValidate; + return finalDocument; } diff --git a/src/db/stores/docStore.ts b/src/db/stores/docStore.ts index f6b6074..146d1fe 100644 --- a/src/db/stores/docStore.ts +++ b/src/db/stores/docStore.ts @@ -1,6 +1,7 @@ import { StoreType, StoreOptions, PaginatedResult, QueryOptions, ListOptions } from '../types'; import { AbstractStore } from './abstractStore'; import { prepareDocument } from './baseStore'; +import { DBError, ErrorCode } from '../core/error'; /** * DocStore implementation @@ -169,8 +170,6 @@ export class DocStore extends AbstractStore { * Helper to handle errors consistently */ private handleError(message: string, error: any): never { - const { DBError, ErrorCode } = require('../core/error'); - if (error instanceof DBError) { throw error; } diff --git a/src/db/stores/feedStore.ts b/src/db/stores/feedStore.ts index bdf67f4..03929ba 100644 --- a/src/db/stores/feedStore.ts +++ b/src/db/stores/feedStore.ts @@ -110,7 +110,12 @@ export class FeedStore implements BaseStore { try { const db = await openStore(collection, StoreType.FEED, options); - const entries = await db.iterator({ limit: -1 }).collect(); + // Get all entries using proper iterator API + const entries = []; + for await (const entry of db.iterator({ limit: -1 })) { + entries.push(entry); + } + const existingEntryIndex = entries.findIndex((e: any) => { const value = e.payload.value; return value && value.id === id; @@ -169,8 +174,12 @@ export class FeedStore implements BaseStore { try { const db = await openStore(collection, StoreType.FEED, options); - // Find the entry with the given id - const entries = await db.iterator({ limit: -1 }).collect(); + // Find the entry with the given id using proper iterator API + const entries = []; + for await (const entry of db.iterator({ limit: -1 })) { + entries.push(entry); + } + const existingEntryIndex = entries.findIndex((e: any) => { const value = e.payload.value; return value && value.id === id; @@ -227,14 +236,51 @@ export class FeedStore implements BaseStore { try { const db = await openStore(collection, StoreType.FEED, options); - // Get all entries - const entries = await db.iterator({ limit: -1 }).collect(); + // Use proper pagination instead of loading everything + const requestedLimit = options?.limit || 50; + const requestedOffset = options?.offset || 0; + + // For feeds, we need to get more entries than requested since we'll filter duplicates + // Use a reasonable multiplier but cap it to prevent memory issues + const fetchLimit = requestedLimit === -1 ? -1 : Math.min(requestedLimit * 3, 1000); + + // Get entries using proper iterator API with pagination + const entries = []; + let count = 0; + let skipped = 0; + + for await (const entry of db.iterator({ limit: fetchLimit })) { + // Skip entries for offset + if (requestedOffset > 0 && skipped < requestedOffset) { + skipped++; + continue; + } + + entries.push(entry); + count++; + + // Break if we have enough entries and not requesting all + if (requestedLimit !== -1 && count >= fetchLimit) { + break; + } + } // Group by ID and keep only the latest entry for each ID // Also filter out tombstone entries const latestEntries = new Map(); for (const entry of entries) { - const value = entry.payload.value; + // Handle different possible entry structures + let value; + if (entry && entry.payload && entry.payload.value) { + value = entry.payload.value; + } else if (entry && entry.value) { + value = entry.value; + } else if (entry && typeof entry === 'object') { + value = entry; + } else { + continue; + } + if (!value || value.deleted) continue; const id = value.id; @@ -243,7 +289,9 @@ export class FeedStore implements BaseStore { // If we already have an entry with this ID, check which is newer if (latestEntries.has(id)) { const existing = latestEntries.get(id); - if (value.updatedAt > existing.value.updatedAt) { + const existingTime = existing.value.updatedAt || existing.value.timestamp || 0; + const currentTime = value.updatedAt || value.timestamp || 0; + if (currentTime > existingTime) { latestEntries.set(id, { hash: entry.hash, value }); } } else { @@ -281,13 +329,11 @@ export class FeedStore implements BaseStore { }); } - // Apply pagination + // Apply final pagination to the processed results const total = documents.length; - const offset = options?.offset || 0; - const limit = options?.limit || total; - - const paginatedDocuments = documents.slice(offset, offset + limit); - const hasMore = offset + limit < total; + const finalLimit = requestedLimit === -1 ? total : requestedLimit; + const paginatedDocuments = documents.slice(0, finalLimit); + const hasMore = documents.length > finalLimit; return { documents: paginatedDocuments, @@ -320,14 +366,28 @@ export class FeedStore implements BaseStore { try { const db = await openStore(collection, StoreType.FEED, options); - // Get all entries - const entries = await db.iterator({ limit: -1 }).collect(); + // Get all entries using proper iterator API + const entries = []; + for await (const entry of db.iterator({ limit: -1 })) { + entries.push(entry); + } // Group by ID and keep only the latest entry for each ID // Also filter out tombstone entries const latestEntries = new Map(); for (const entry of entries) { - const value = entry.payload.value; + // Handle different possible entry structures + let value; + if (entry && entry.payload && entry.payload.value) { + value = entry.payload.value; + } else if (entry && entry.value) { + value = entry.value; + } else if (entry && typeof entry === 'object') { + value = entry; + } else { + continue; + } + if (!value || value.deleted) continue; const id = value.id; diff --git a/src/db/stores/fileStore.ts b/src/db/stores/fileStore.ts index 35f8dde..3819277 100644 --- a/src/db/stores/fileStore.ts +++ b/src/db/stores/fileStore.ts @@ -2,7 +2,7 @@ import { createServiceLogger } from '../../utils/logger'; import { ErrorCode, StoreType, FileUploadResult, FileResult } from '../types'; import { DBError } from '../core/error'; import { openStore } from './baseStore'; -import { getHelia } from '../../ipfs/ipfsService'; +import ipfsService, { getHelia } from '../../ipfs/ipfsService'; import { CreateResult, StoreOptions } from '../types'; async function readAsyncIterableToBuffer( @@ -31,9 +31,22 @@ export const uploadFile = async ( try { const ipfs = getHelia(); if (!ipfs) { + logger.error('IPFS instance not available - Helia is null or undefined'); + // Try to check if IPFS service is running + try { + const heliaInstance = ipfsService.getHelia(); + logger.error( + 'IPFS Service getHelia() returned:', + heliaInstance ? 'instance available' : 'null/undefined', + ); + } catch (importError) { + logger.error('Error importing IPFS service:', importError); + } throw new DBError(ErrorCode.OPERATION_FAILED, 'IPFS instance not available'); } + logger.info(`Attempting to upload file with size: ${fileData.length} bytes`); + // Add to IPFS const unixfs = await import('@helia/unixfs'); const fs = unixfs.unixfs(ipfs); diff --git a/src/db/types/index.ts b/src/db/types/index.ts index 65bbc28..9339ff4 100644 --- a/src/db/types/index.ts +++ b/src/db/types/index.ts @@ -26,9 +26,9 @@ export const isLocked = (resourceId: string): boolean => { // Database Types export enum StoreType { KEYVALUE = 'keyvalue', - DOCSTORE = 'docstore', + DOCSTORE = 'documents', FEED = 'feed', - EVENTLOG = 'eventlog', + EVENTLOG = 'events', COUNTER = 'counter', } diff --git a/src/index.ts b/src/index.ts index bff243f..f61e395 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,7 @@ import { closeConnection, stop as stopDB, } from './db/dbService'; + import { ErrorCode, StoreType } from './db/types'; // Import types diff --git a/src/ipfs/config/ipfsConfig.ts b/src/ipfs/config/ipfsConfig.ts index 270e0de..7e91a7e 100644 --- a/src/ipfs/config/ipfsConfig.ts +++ b/src/ipfs/config/ipfsConfig.ts @@ -6,7 +6,10 @@ export const getIpfsPort = (): number => { return parseInt(process.env.IPFS_PORT); } const httpPort = parseInt(process.env.PORT || '7777'); - return httpPort + 1; // Default to HTTP port + 1 + // Add some randomness to avoid port conflicts during retries + const basePort = httpPort + 1; + const randomOffset = Math.floor(Math.random() * 10); + return basePort + randomOffset; // Add random offset to avoid conflicts }; // Get a node-specific blockstore path diff --git a/src/ipfs/services/ipfsCoreService.ts b/src/ipfs/services/ipfsCoreService.ts index 546b996..e046222 100644 --- a/src/ipfs/services/ipfsCoreService.ts +++ b/src/ipfs/services/ipfsCoreService.ts @@ -26,6 +26,18 @@ let reconnectInterval: NodeJS.Timeout; export const initIpfsNode = async (externalProxyAgent: any = null) => { try { + // If already initialized, return existing instance + if (helia && libp2pNode) { + logger.info('IPFS node already initialized, returning existing instance'); + return helia; + } + + // Clean up any existing instances first + if (helia || libp2pNode) { + logger.info('Cleaning up existing IPFS instances before reinitializing'); + await stopIpfsNode(); + } + proxyAgent = externalProxyAgent; const blockstorePath = ipfsConfig.blockstorePath; @@ -168,20 +180,38 @@ function setupPeerEventListeners(node: Libp2p) { } export const stopIpfsNode = async () => { + logger.info('Stopping IPFS node...'); + if (reconnectInterval) { clearInterval(reconnectInterval); + reconnectInterval = undefined as any; } if (libp2pNode) { - const pubsub = libp2pNode.services.pubsub as PubSub; - await stopDiscoveryService(pubsub); + try { + const pubsub = libp2pNode.services.pubsub as PubSub; + await stopDiscoveryService(pubsub); + + // Stop libp2p + await libp2pNode.stop(); + } catch (error) { + logger.error('Error stopping libp2p node:', error); + } + libp2pNode = undefined as any; } else { await stopDiscoveryService(null); } if (helia) { - await helia.stop(); + try { + await helia.stop(); + } catch (error) { + logger.error('Error stopping Helia:', error); + } + helia = null; } + + logger.info('IPFS node stopped successfully'); }; export const getHeliaInstance = () => {