From a542295226e9bfd1438d9d938c62b07d66a9aed6 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 16:48:12 +0200 Subject: [PATCH 01/27] timeout --- src/components/P2P/handleProtocolCommands.ts | 25 +++++++++++++------- src/components/P2P/index.ts | 2 +- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 91b5abefb..2801b8fd8 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -42,13 +42,20 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - // Check if stream is already closed - if (stream.status === 'closed' || stream.status === 'closing') { + if ( + stream.status === 'closed' || + stream.status === 'closing' || + stream.status === 'aborted' + ) { P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - - // Resume stream in case it's paused - we need to write + if ( + stream.writeStatus !== 'writable' && + stream.writeStatus !== 'closing' + ) { + return + } stream.resume() const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) @@ -56,7 +63,9 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } catch (e) { P2P_LOGGER.error(`Error sending error response: ${e.message}`) try { - stream.abort(e as Error) + if (stream.status === 'open' || stream.status === 'closing') { + stream.abort(e as Error) + } } catch {} } } @@ -141,11 +150,9 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - // Handle backpressure - if send returns false, wait for drain + // Handle backpressure - if send returns false, wait for drain (no timeout for large streams) if (!stream.send(bytes)) { - await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain - }) + await stream.onDrain() } } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 1336a74ca..735f608dc 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -756,7 +756,7 @@ export class OceanP2P extends EventEmitter { let stream: Stream try { const options = { - signal: AbortSignal.timeout(10000), + signal: AbortSignal.timeout(120000), priority: 100, runOnLimitedConnection: true } From 4b2e2f4cca4b477bb37b874db81fd76079adb4eb Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 17:26:10 +0200 Subject: [PATCH 02/27] restart only when paused --- src/components/P2P/handleProtocolCommands.ts | 24 +++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 2801b8fd8..52b3a1b72 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -45,23 +45,31 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect if ( stream.status === 'closed' || stream.status === 'closing' || - stream.status === 'aborted' + stream.status === 'aborted' || + stream.status === 'reset' ) { - P2P_LOGGER.warn('Stream already closed, cannot send error response') + P2P_LOGGER.warn('Stream already closed/reset, cannot send error response') return } - if ( - stream.writeStatus !== 'writable' && - stream.writeStatus !== 'closing' - ) { + if (stream.writeStatus !== 'writable' && stream.writeStatus !== 'closing') { return } - stream.resume() + // Only resume if stream is paused and still readable; resume() throws if stream is closing/closed + if (stream.readStatus === 'paused') { + try { + stream.resume() + } catch (e) { + P2P_LOGGER.warn( + 'Cannot resume stream (already closing/closed): ' + (e as Error).message + ) + return + } + } const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${e.message}`) + P2P_LOGGER.error(`Error sending error response: ${(e as Error).message}`) try { if (stream.status === 'open' || stream.status === 'closing') { stream.abort(e as Error) From 0579ca681a4c9a9205e8984d2c37cf2b9d717e12 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 17:59:31 +0200 Subject: [PATCH 03/27] Revert "restart only when paused" This reverts commit 4b2e2f4cca4b477bb37b874db81fd76079adb4eb. --- src/components/P2P/handleProtocolCommands.ts | 39 ++++++-------------- src/components/P2P/index.ts | 2 +- 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 52b3a1b72..91b5abefb 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -42,38 +42,21 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - if ( - stream.status === 'closed' || - stream.status === 'closing' || - stream.status === 'aborted' || - stream.status === 'reset' - ) { - P2P_LOGGER.warn('Stream already closed/reset, cannot send error response') + // Check if stream is already closed + if (stream.status === 'closed' || stream.status === 'closing') { + P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - if (stream.writeStatus !== 'writable' && stream.writeStatus !== 'closing') { - return - } - // Only resume if stream is paused and still readable; resume() throws if stream is closing/closed - if (stream.readStatus === 'paused') { - try { - stream.resume() - } catch (e) { - P2P_LOGGER.warn( - 'Cannot resume stream (already closing/closed): ' + (e as Error).message - ) - return - } - } + + // Resume stream in case it's paused - we need to write + stream.resume() const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${(e as Error).message}`) + P2P_LOGGER.error(`Error sending error response: ${e.message}`) try { - if (stream.status === 'open' || stream.status === 'closing') { - stream.abort(e as Error) - } + stream.abort(e as Error) } catch {} } } @@ -158,9 +141,11 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - // Handle backpressure - if send returns false, wait for drain (no timeout for large streams) + // Handle backpressure - if send returns false, wait for drain if (!stream.send(bytes)) { - await stream.onDrain() + await stream.onDrain({ + signal: AbortSignal.timeout(30000) // 30 second timeout for drain + }) } } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 735f608dc..1336a74ca 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -756,7 +756,7 @@ export class OceanP2P extends EventEmitter { let stream: Stream try { const options = { - signal: AbortSignal.timeout(120000), + signal: AbortSignal.timeout(10000), priority: 100, runOnLimitedConnection: true } From 89ad5e561a128c6d7bd2ae8727e1ac54a3b47778 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 18:00:39 +0200 Subject: [PATCH 04/27] increase timeouts and frames --- src/components/P2P/handleProtocolCommands.ts | 2 +- src/components/P2P/index.ts | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 91b5abefb..ac1dc2fe8 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -144,7 +144,7 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect // Handle backpressure - if send returns false, wait for drain if (!stream.send(bytes)) { await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain + signal: AbortSignal.timeout(5 * 60 * 1000) // 5 minutes timeout for drain }) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 1336a74ca..4ffd2ef8a 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -425,7 +425,14 @@ export class OceanP2P extends EventEmitter { datastore: store, privateKey: this.keyManager.getLibp2pPrivateKey(), transports, - streamMuxers: [yamux()], + streamMuxers: [ + yamux({ + maxMessageSize: 5 * 1024 * 1024, + streamOptions: { + maxStreamWindowSize: 5 * 1024 * 1024 + } + }) + ], connectionEncrypters: [ noise(), tls() From 47857a0413481a3f0c1c0f47c7a91e0eb451f848 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 18:29:50 +0200 Subject: [PATCH 05/27] do not extra pause stream --- src/components/P2P/handleProtocolCommands.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index ac1dc2fe8..e9abd2ca6 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -34,9 +34,6 @@ export class ReadableString extends Readable { export async function handleProtocolCommands(stream: Stream, connection: Connection) { const { remotePeer, remoteAddr } = connection - // Pause the stream. We do async operations here before writing. - stream.pause() - P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) @@ -48,8 +45,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume stream in case it's paused - we need to write - stream.resume() const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() @@ -90,9 +85,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now write. - stream.resume() - // v3 streams are AsyncIterable let task: Command try { From 6a7b9fbdac70f4749c8ea6b95452a9edd54c7861 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 09:09:47 +0200 Subject: [PATCH 06/27] safe err --- src/components/P2P/handleProtocolCommands.ts | 46 ++++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index e9abd2ca6..5b6ba63c5 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -14,6 +14,24 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' +/** Safe string for logging/sending; never yields "undefined". */ +function safeErrorMessage(err: unknown): string { + if (err == null) return 'Unknown error' + if (typeof (err as Error).message === 'string' && (err as Error).message !== '') { + return (err as Error).message + } + return String(err) +} + +/** True if the error indicates the stream is already closed/reset (no point sending). */ +function isStreamGoneError(err: unknown): boolean { + const msg = safeErrorMessage(err).toLowerCase() + return ( + msg.includes('stream') && + (msg.includes('reset') || msg.includes('closed') || msg.includes('aborted')) + ) +} + export class ReadableString extends Readable { private sent = false @@ -39,9 +57,9 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - // Check if stream is already closed - if (stream.status === 'closed' || stream.status === 'closing') { - P2P_LOGGER.warn('Stream already closed, cannot send error response') + // Skip if stream is already closed, closing, aborted, or reset + if (['closed', 'closing', 'aborted', 'reset'].includes(stream.status)) { + P2P_LOGGER.warn('Stream already closed or reset, cannot send error response') return } @@ -49,7 +67,13 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${e.message}`) + const msg = safeErrorMessage(e) + // Expected when peer closed/reset the stream; avoid noisy error log + if (msg.toLowerCase().includes('closed') || msg.toLowerCase().includes('reset')) { + P2P_LOGGER.warn(`Could not send error response (stream gone): ${msg}`) + } else { + P2P_LOGGER.error(`Error sending error response: ${msg}`) + } try { stream.abort(e as Error) } catch {} @@ -98,11 +122,14 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } } } catch (err) { + const msg = safeErrorMessage(err) P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err.message}` + `Unable to process P2P command: ${msg}` ) - await sendErrorAndClose(400, 'Invalid command') + if (!isStreamGoneError(err)) { + await sendErrorAndClose(400, 'Invalid command') + } return } @@ -144,12 +171,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { + const msg = safeErrorMessage(err) P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + err.message, + 'handleProtocolCommands Error: ' + msg, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, err.message) + if (!isStreamGoneError(err)) { + await sendErrorAndClose(500, msg) + } } } From d6ab8be8246fef636180e6102eb35c2c3adf9d6a Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 09:21:00 +0200 Subject: [PATCH 07/27] read data first --- src/components/P2P/handleProtocolCommands.ts | 57 +++++++++++--------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 5b6ba63c5..82fad167e 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -55,6 +55,39 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) + // Read command from stream immediately so we don't leave data unread (avoids + // read-buffer overflow reset) and so the client sees progress before any slow work. + let task: Command | null | undefined + try { + for await (const chunk of stream) { + try { + const str = uint8ArrayToString(chunk.subarray()) + task = JSON.parse(str) as Command + } catch (e) { + task = null + break + } + break + } + } catch (err) { + const msg = safeErrorMessage(err) + P2P_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Unable to process P2P command: ${msg}` + ) + if (!isStreamGoneError(err)) { + // sendErrorAndClose not yet defined; stream may be gone anyway + try { + if (!['closed', 'closing', 'aborted', 'reset'].includes(stream.status)) { + const status = { httpStatus: 400, error: msg } + stream.send(uint8ArrayFromString(JSON.stringify(status))) + await stream.close() + } + } catch {} + } + return + } + const sendErrorAndClose = async (httpStatus: number, error: string) => { try { // Skip if stream is already closed, closing, aborted, or reset @@ -109,30 +142,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // v3 streams are AsyncIterable - let task: Command - try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - await sendErrorAndClose(400, 'Invalid command') - return - } - } - } catch (err) { - const msg = safeErrorMessage(err) - P2P_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${msg}` - ) - if (!isStreamGoneError(err)) { - await sendErrorAndClose(400, 'Invalid command') - } - return - } - if (!task) { P2P_LOGGER.error('Invalid or missing task/command data!') await sendErrorAndClose(400, 'Invalid command') From 2bc3d77302dc0359751fad810d31a4c332c42dd2 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 09:40:40 +0200 Subject: [PATCH 08/27] log err --- src/components/P2P/handleProtocolCommands.ts | 28 +++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 82fad167e..15f9a0cf6 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -14,16 +14,27 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' -/** Safe string for logging/sending; never yields "undefined". */ +function unwrapError(err: unknown): unknown { + if ( + err != null && + typeof err === 'object' && + 'error' in err && + (err as { error: unknown }).error instanceof Error + ) { + return (err as { error: Error }).error + } + return err +} + function safeErrorMessage(err: unknown): string { - if (err == null) return 'Unknown error' - if (typeof (err as Error).message === 'string' && (err as Error).message !== '') { - return (err as Error).message + const e = unwrapError(err) + if (e == null) return 'Unknown error' + if (e instanceof Error && typeof e.message === 'string' && e.message !== '') { + return e.message } - return String(err) + return String(e) } -/** True if the error indicates the stream is already closed/reset (no point sending). */ function isStreamGoneError(err: unknown): boolean { const msg = safeErrorMessage(err).toLowerCase() return ( @@ -71,10 +82,7 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } } catch (err) { const msg = safeErrorMessage(err) - P2P_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${msg}` - ) + P2P_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Unable to process P2P command: ${msg}`) if (!isStreamGoneError(err)) { // sendErrorAndClose not yet defined; stream may be gone anyway try { From 6316567f40797bfffff31b601a8a0e081ff08c3f Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 09:54:15 +0200 Subject: [PATCH 09/27] respect backpressure --- src/components/P2P/handleProtocolCommands.ts | 18 +++++++++++------- src/components/P2P/index.ts | 1 + 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 15f9a0cf6..5bff90fc6 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -172,16 +172,20 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect // Send status first stream.send(uint8ArrayFromString(JSON.stringify(response.status))) - // Stream data chunks without buffering, with backpressure support + const SEND_CHUNK_SIZE = 64 * 1024 if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - - // Handle backpressure - if send returns false, wait for drain - if (!stream.send(bytes)) { - await stream.onDrain({ - signal: AbortSignal.timeout(5 * 60 * 1000) // 5 minutes timeout for drain - }) + for (let offset = 0; offset < bytes.length; offset += SEND_CHUNK_SIZE) { + const slice = bytes.subarray( + offset, + Math.min(offset + SEND_CHUNK_SIZE, bytes.length) + ) + if (!stream.send(slice)) { + await stream.onDrain({ + signal: AbortSignal.timeout(5 * 60 * 1000) // 5 minutes timeout for drain + }) + } } } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 4ffd2ef8a..5f43f9f18 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -429,6 +429,7 @@ export class OceanP2P extends EventEmitter { yamux({ maxMessageSize: 5 * 1024 * 1024, streamOptions: { + initialStreamWindowSize: 5 * 1024 * 1024, maxStreamWindowSize: 5 * 1024 * 1024 } }) From 95bcd8c68d1dbe4a3c4deb1c335317e1d8e29714 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 11:46:08 +0200 Subject: [PATCH 10/27] flush on drain --- src/components/P2P/handleProtocolCommands.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 5bff90fc6..ae3702fc4 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -188,6 +188,8 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } } } + // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) + await stream.onDrain({ signal: AbortSignal.timeout(30000) }) } await stream.close() From 77742b7881f9876e2bbf98aca94e532f71bd2e5d Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 15:10:11 +0200 Subject: [PATCH 11/27] Revert "respect backpressure" This reverts commit 6316567f40797bfffff31b601a8a0e081ff08c3f. --- src/components/P2P/handleProtocolCommands.ts | 127 ++++++------------- src/components/P2P/index.ts | 10 +- 2 files changed, 43 insertions(+), 94 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index ae3702fc4..6fe9ba5de 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -14,35 +14,6 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' -function unwrapError(err: unknown): unknown { - if ( - err != null && - typeof err === 'object' && - 'error' in err && - (err as { error: unknown }).error instanceof Error - ) { - return (err as { error: Error }).error - } - return err -} - -function safeErrorMessage(err: unknown): string { - const e = unwrapError(err) - if (e == null) return 'Unknown error' - if (e instanceof Error && typeof e.message === 'string' && e.message !== '') { - return e.message - } - return String(e) -} - -function isStreamGoneError(err: unknown): boolean { - const msg = safeErrorMessage(err).toLowerCase() - return ( - msg.includes('stream') && - (msg.includes('reset') || msg.includes('closed') || msg.includes('aborted')) - ) -} - export class ReadableString extends Readable { private sent = false @@ -63,58 +34,27 @@ export class ReadableString extends Readable { export async function handleProtocolCommands(stream: Stream, connection: Connection) { const { remotePeer, remoteAddr } = connection + // Pause the stream. We do async operations here before writing. + stream.pause() + P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) - // Read command from stream immediately so we don't leave data unread (avoids - // read-buffer overflow reset) and so the client sees progress before any slow work. - let task: Command | null | undefined - try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - task = null - break - } - break - } - } catch (err) { - const msg = safeErrorMessage(err) - P2P_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Unable to process P2P command: ${msg}`) - if (!isStreamGoneError(err)) { - // sendErrorAndClose not yet defined; stream may be gone anyway - try { - if (!['closed', 'closing', 'aborted', 'reset'].includes(stream.status)) { - const status = { httpStatus: 400, error: msg } - stream.send(uint8ArrayFromString(JSON.stringify(status))) - await stream.close() - } - } catch {} - } - return - } - const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - // Skip if stream is already closed, closing, aborted, or reset - if (['closed', 'closing', 'aborted', 'reset'].includes(stream.status)) { - P2P_LOGGER.warn('Stream already closed or reset, cannot send error response') + // Check if stream is already closed + if (stream.status === 'closed' || stream.status === 'closing') { + P2P_LOGGER.warn('Stream already closed, cannot send error response') return } + // Resume stream in case it's paused - we need to write + stream.resume() const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() } catch (e) { - const msg = safeErrorMessage(e) - // Expected when peer closed/reset the stream; avoid noisy error log - if (msg.toLowerCase().includes('closed') || msg.toLowerCase().includes('reset')) { - P2P_LOGGER.warn(`Could not send error response (stream gone): ${msg}`) - } else { - P2P_LOGGER.error(`Error sending error response: ${msg}`) - } + P2P_LOGGER.error(`Error sending error response: ${e.message}`) try { stream.abort(e as Error) } catch {} @@ -150,6 +90,30 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } + // Resume the stream. We can now write. + stream.resume() + + // v3 streams are AsyncIterable + let task: Command + try { + for await (const chunk of stream) { + try { + const str = uint8ArrayToString(chunk.subarray()) + task = JSON.parse(str) as Command + } catch (e) { + await sendErrorAndClose(400, 'Invalid command') + return + } + } + } catch (err) { + P2P_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Unable to process P2P command: ${err.message}` + ) + await sendErrorAndClose(400, 'Invalid command') + return + } + if (!task) { P2P_LOGGER.error('Invalid or missing task/command data!') await sendErrorAndClose(400, 'Invalid command') @@ -172,20 +136,16 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect // Send status first stream.send(uint8ArrayFromString(JSON.stringify(response.status))) - const SEND_CHUNK_SIZE = 64 * 1024 + // Stream data chunks without buffering, with backpressure support if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - for (let offset = 0; offset < bytes.length; offset += SEND_CHUNK_SIZE) { - const slice = bytes.subarray( - offset, - Math.min(offset + SEND_CHUNK_SIZE, bytes.length) - ) - if (!stream.send(slice)) { - await stream.onDrain({ - signal: AbortSignal.timeout(5 * 60 * 1000) // 5 minutes timeout for drain - }) - } + + // Handle backpressure - if send returns false, wait for drain + if (!stream.send(bytes)) { + await stream.onDrain({ + signal: AbortSignal.timeout(30000) // 30 second timeout for drain + }) } } // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) @@ -194,15 +154,12 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { - const msg = safeErrorMessage(err) P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + msg, + 'handleProtocolCommands Error: ' + err.message, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - if (!isStreamGoneError(err)) { - await sendErrorAndClose(500, msg) - } + await sendErrorAndClose(500, err.message) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 5f43f9f18..1336a74ca 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -425,15 +425,7 @@ export class OceanP2P extends EventEmitter { datastore: store, privateKey: this.keyManager.getLibp2pPrivateKey(), transports, - streamMuxers: [ - yamux({ - maxMessageSize: 5 * 1024 * 1024, - streamOptions: { - initialStreamWindowSize: 5 * 1024 * 1024, - maxStreamWindowSize: 5 * 1024 * 1024 - } - }) - ], + streamMuxers: [yamux()], connectionEncrypters: [ noise(), tls() From 3559264605ebcfc343325e094de9e0477e005ae0 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 15:22:02 +0200 Subject: [PATCH 12/27] try bytestream --- package-lock.json | 1 + package.json | 1 + src/components/P2P/handleProtocolCommands.ts | 38 ++++++++++---------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/package-lock.json b/package-lock.json index 717aa591e..a1c26e461 100644 --- a/package-lock.json +++ b/package-lock.json @@ -31,6 +31,7 @@ "@libp2p/tcp": "^11.0.9", "@libp2p/tls": "^3.0.10", "@libp2p/upnp-nat": "^4.0.9", + "@libp2p/utils": "^7.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", "@oceanprotocol/contracts": "^2.6.0", diff --git a/package.json b/package.json index 8dd057858..963a46fc1 100644 --- a/package.json +++ b/package.json @@ -69,6 +69,7 @@ "@libp2p/tcp": "^11.0.9", "@libp2p/tls": "^3.0.10", "@libp2p/upnp-nat": "^4.0.9", + "@libp2p/utils": "^7.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", "@oceanprotocol/contracts": "^2.6.0", diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 6fe9ba5de..d4cc35142 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -1,6 +1,7 @@ import { Readable } from 'stream' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { byteStream } from '@libp2p/utils' import { P2P_LOGGER } from '../../utils/logging/common.js' import { Command } from '../../@types/commands.js' @@ -14,6 +15,9 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' +const P2P_READ_TIMEOUT_MS = 30_000 +const P2P_DRAIN_TIMEOUT_MS = 30_000 + export class ReadableString extends Readable { private sent = false @@ -90,25 +94,21 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now write. + // Resume the stream. We can now read/write. stream.resume() - // v3 streams are AsyncIterable + const bytes = byteStream(stream) let task: Command try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - await sendErrorAndClose(400, 'Invalid command') - return - } - } + const data = await bytes.read({ + signal: AbortSignal.timeout(P2P_READ_TIMEOUT_MS) + }) + const str = uint8ArrayToString(data.subarray()) + task = JSON.parse(str) as Command } catch (err) { P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err.message}` + `Unable to process P2P command: ${err?.message ?? err}` ) await sendErrorAndClose(400, 'Invalid command') return @@ -133,23 +133,25 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect task.caller = remotePeer.toString() const response: P2PCommandResponse = await handler.handle(task) - // Send status first - stream.send(uint8ArrayFromString(JSON.stringify(response.status))) + // Send status first (byteStream imperative write) + await bytes.write(uint8ArrayFromString(JSON.stringify(response.status)), { + signal: AbortSignal.timeout(P2P_READ_TIMEOUT_MS) + }) // Stream data chunks without buffering, with backpressure support if (response.stream) { for await (const chunk of response.stream as Readable) { - const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) + const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) // Handle backpressure - if send returns false, wait for drain - if (!stream.send(bytes)) { + if (!stream.send(buf)) { await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain + signal: AbortSignal.timeout(P2P_DRAIN_TIMEOUT_MS) }) } } // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) - await stream.onDrain({ signal: AbortSignal.timeout(30000) }) + await stream.onDrain({ signal: AbortSignal.timeout(P2P_DRAIN_TIMEOUT_MS) }) } await stream.close() From c4c1a3ba0f5288580ff316a166b9c15454bd7849 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 16:00:24 +0200 Subject: [PATCH 13/27] Revert "try bytestream" This reverts commit 3559264605ebcfc343325e094de9e0477e005ae0. --- package-lock.json | 1 - package.json | 1 - src/components/P2P/handleProtocolCommands.ts | 38 ++++++++++---------- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/package-lock.json b/package-lock.json index a1c26e461..717aa591e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -31,7 +31,6 @@ "@libp2p/tcp": "^11.0.9", "@libp2p/tls": "^3.0.10", "@libp2p/upnp-nat": "^4.0.9", - "@libp2p/utils": "^7.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", "@oceanprotocol/contracts": "^2.6.0", diff --git a/package.json b/package.json index 963a46fc1..8dd057858 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,6 @@ "@libp2p/tcp": "^11.0.9", "@libp2p/tls": "^3.0.10", "@libp2p/upnp-nat": "^4.0.9", - "@libp2p/utils": "^7.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", "@oceanprotocol/contracts": "^2.6.0", diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index d4cc35142..6fe9ba5de 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -1,7 +1,6 @@ import { Readable } from 'stream' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { byteStream } from '@libp2p/utils' import { P2P_LOGGER } from '../../utils/logging/common.js' import { Command } from '../../@types/commands.js' @@ -15,9 +14,6 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' -const P2P_READ_TIMEOUT_MS = 30_000 -const P2P_DRAIN_TIMEOUT_MS = 30_000 - export class ReadableString extends Readable { private sent = false @@ -94,21 +90,25 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now read/write. + // Resume the stream. We can now write. stream.resume() - const bytes = byteStream(stream) + // v3 streams are AsyncIterable let task: Command try { - const data = await bytes.read({ - signal: AbortSignal.timeout(P2P_READ_TIMEOUT_MS) - }) - const str = uint8ArrayToString(data.subarray()) - task = JSON.parse(str) as Command + for await (const chunk of stream) { + try { + const str = uint8ArrayToString(chunk.subarray()) + task = JSON.parse(str) as Command + } catch (e) { + await sendErrorAndClose(400, 'Invalid command') + return + } + } } catch (err) { P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err?.message ?? err}` + `Unable to process P2P command: ${err.message}` ) await sendErrorAndClose(400, 'Invalid command') return @@ -133,25 +133,23 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect task.caller = remotePeer.toString() const response: P2PCommandResponse = await handler.handle(task) - // Send status first (byteStream imperative write) - await bytes.write(uint8ArrayFromString(JSON.stringify(response.status)), { - signal: AbortSignal.timeout(P2P_READ_TIMEOUT_MS) - }) + // Send status first + stream.send(uint8ArrayFromString(JSON.stringify(response.status))) // Stream data chunks without buffering, with backpressure support if (response.stream) { for await (const chunk of response.stream as Readable) { - const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) + const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) // Handle backpressure - if send returns false, wait for drain - if (!stream.send(buf)) { + if (!stream.send(bytes)) { await stream.onDrain({ - signal: AbortSignal.timeout(P2P_DRAIN_TIMEOUT_MS) + signal: AbortSignal.timeout(30000) // 30 second timeout for drain }) } } // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) - await stream.onDrain({ signal: AbortSignal.timeout(P2P_DRAIN_TIMEOUT_MS) }) + await stream.onDrain({ signal: AbortSignal.timeout(30000) }) } await stream.close() From 295efd84d8a37f5acf7ebc2a60bbf76f9e52cbcf Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 16:51:33 +0200 Subject: [PATCH 14/27] use lps --- src/components/P2P/handleProtocolCommands.ts | 50 ++++++++------------ src/components/P2P/index.ts | 39 ++++++++++----- 2 files changed, 45 insertions(+), 44 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 6fe9ba5de..c29e25ae3 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -12,6 +12,7 @@ import { checkGlobalConnectionsRateLimit, checkRequestsRateLimit } from '../../utils/validators.js' +import { lpStream } from '@libp2p/utils' import type { Connection, Stream } from '@libp2p/interface' export class ReadableString extends Readable { @@ -40,18 +41,21 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) + // Resume and use length-prefixed messages (libp2p v3 byteStream migration) + stream.resume() + const lp = lpStream(stream) + const readWriteSignal = () => AbortSignal.timeout(30_000) + const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - // Check if stream is already closed if (stream.status === 'closed' || stream.status === 'closing') { P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - - // Resume stream in case it's paused - we need to write - stream.resume() const status = { httpStatus, error } - stream.send(uint8ArrayFromString(JSON.stringify(status))) + await lp.write(uint8ArrayFromString(JSON.stringify(status)), { + signal: readWriteSignal() + }) await stream.close() } catch (e) { P2P_LOGGER.error(`Error sending error response: ${e.message}`) @@ -90,25 +94,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now write. - stream.resume() - - // v3 streams are AsyncIterable let task: Command try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - await sendErrorAndClose(400, 'Invalid command') - return - } - } + const cmdBytes = await lp.read({ signal: readWriteSignal() }) + const str = uint8ArrayToString(cmdBytes.subarray()) + task = JSON.parse(str) as Command } catch (err) { P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err.message}` + `Unable to process P2P command: ${err?.message ?? err}` ) await sendErrorAndClose(400, 'Invalid command') return @@ -133,23 +127,17 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect task.caller = remotePeer.toString() const response: P2PCommandResponse = await handler.handle(task) - // Send status first - stream.send(uint8ArrayFromString(JSON.stringify(response.status))) + // Send status first (length-prefixed) + await lp.write(uint8ArrayFromString(JSON.stringify(response.status)), { + signal: readWriteSignal() + }) - // Stream data chunks without buffering, with backpressure support + // Stream data chunks as length-prefixed messages if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - - // Handle backpressure - if send returns false, wait for drain - if (!stream.send(bytes)) { - await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain - }) - } + await lp.write(bytes, { signal: readWriteSignal() }) } - // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) - await stream.onDrain({ signal: AbortSignal.timeout(30000) }) } await stream.close() diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 1336a74ca..c1ce3050f 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -5,6 +5,7 @@ import { handleProtocolCommands } from './handlers.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { lpStream } from '@libp2p/utils' import type { Stream } from '@libp2p/interface' import { bootstrap } from '@libp2p/bootstrap' @@ -778,22 +779,34 @@ export class OceanP2P extends EventEmitter { } try { - // Send message and close write side - stream.send(uint8ArrayFromString(message)) - await stream.close() + const lp = lpStream(stream) + const writeSignal = AbortSignal.timeout(10_000) + const readSignal = AbortSignal.timeout(10_000) - // Read and parse status from first chunk - const iterator = stream[Symbol.asyncIterator]() - const { done, value } = await iterator.next() + await lp.write(uint8ArrayFromString(message), { signal: writeSignal }) - if (done || !value) { - return { status: { httpStatus: 500, error: 'No response from peer' } } - } - - const status = JSON.parse(uint8ArrayToString(value.subarray())) + const statusBytes = await lp.read({ signal: readSignal }) + const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) - // Return status and remaining stream - return { status, stream: { [Symbol.asyncIterator]: () => iterator } } + // Return status and remaining stream (length-prefixed messages) + const streamTimeout = 30_000 + return { + status, + stream: { + [Symbol.asyncIterator]: async function* () { + try { + while (true) { + const chunk = await lp.read({ + signal: AbortSignal.timeout(streamTimeout) + }) + yield chunk.subarray ? chunk.subarray() : chunk + } + } catch { + // stream ended or closed + } + } + } + } } catch (err) { P2P_LOGGER.error(`P2P communication error: ${err.message}`) try { From fc1d09e6a56dc272b6d4b41f73170bf62c1b9682 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 17:50:50 +0200 Subject: [PATCH 15/27] err msg log --- src/components/P2P/handleProtocolCommands.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index c29e25ae3..e0e5b3a05 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -58,7 +58,8 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect }) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${e.message}`) + const msg = e instanceof Error ? e.message : e != null ? String(e) : 'Unknown error' + P2P_LOGGER.error(`Error sending error response: ${msg}`) try { stream.abort(e as Error) } catch {} @@ -142,12 +143,14 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { + const errMessage = + err instanceof Error ? err.message : err != null ? String(err) : 'Unknown error' P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + err.message, + 'handleProtocolCommands Error: ' + errMessage, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, err.message) + await sendErrorAndClose(500, errMessage) } } From 10aa46cfa8baefd0e4ff8df7cb3c9d896b4acabf Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 17:57:11 +0200 Subject: [PATCH 16/27] serialize error --- src/components/P2P/handleProtocolCommands.ts | 51 ++++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index e0e5b3a05..3a96380f5 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -32,6 +32,35 @@ export class ReadableString extends Readable { } } +/** Serialize any thrown value for debugging (Error, Event, plain object). */ +function serializeErrorForDebug(err: unknown): Record { + try { + if (err instanceof Error) { + return { name: err.name, message: err.message, stack: err.stack } + } + if (err != null && typeof err === 'object') { + const o = err as Record + const out: Record = {} + for (const key of Object.keys(o)) { + try { + const v = o[key] + if (v === null || typeof v !== 'object' || typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { + out[key] = v + } else { + out[key] = String(v) + } + } catch { + out[key] = '[unserializable]' + } + } + return out + } + return { value: err } + } catch { + return { raw: String(err) } + } +} + export async function handleProtocolCommands(stream: Stream, connection: Connection) { const { remotePeer, remoteAddr } = connection @@ -46,13 +75,17 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect const lp = lpStream(stream) const readWriteSignal = () => AbortSignal.timeout(30_000) - const sendErrorAndClose = async (httpStatus: number, error: string) => { + const sendErrorAndClose = async ( + httpStatus: number, + error: string, + errorDebug?: Record + ) => { try { if (stream.status === 'closed' || stream.status === 'closing') { P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - const status = { httpStatus, error } + const status = errorDebug ? { httpStatus, error, errorDebug } : { httpStatus, error } await lp.write(uint8ArrayFromString(JSON.stringify(status)), { signal: readWriteSignal() }) @@ -143,14 +176,22 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { - const errMessage = - err instanceof Error ? err.message : err != null ? String(err) : 'Unknown error' + const errMessage = (() => { + if (err instanceof Error) return err.message + if (err != null && typeof err === 'object' && 'type' in err) { + const e = err as { type?: string; message?: string } + return e.message ?? `Event: ${e.type ?? 'unknown'}` + } + return err != null ? String(err) : 'Unknown error' + })() + const errorDebug = serializeErrorForDebug(err) P2P_LOGGER.logMessageWithEmoji( 'handleProtocolCommands Error: ' + errMessage, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, errMessage) + P2P_LOGGER.error('handleProtocolCommands error object (debug): ' + JSON.stringify(errorDebug)) + await sendErrorAndClose(500, errMessage, errorDebug) } } From 1dc500c2e0ffdac827e25cf0c4006de19bacb59b Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Sat, 28 Feb 2026 10:31:17 +0200 Subject: [PATCH 17/27] cleanup --- src/components/P2P/handleProtocolCommands.ts | 38 +++----------------- src/components/P2P/index.ts | 13 +++---- 2 files changed, 9 insertions(+), 42 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 3a96380f5..0bc2cc25a 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -32,35 +32,6 @@ export class ReadableString extends Readable { } } -/** Serialize any thrown value for debugging (Error, Event, plain object). */ -function serializeErrorForDebug(err: unknown): Record { - try { - if (err instanceof Error) { - return { name: err.name, message: err.message, stack: err.stack } - } - if (err != null && typeof err === 'object') { - const o = err as Record - const out: Record = {} - for (const key of Object.keys(o)) { - try { - const v = o[key] - if (v === null || typeof v !== 'object' || typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { - out[key] = v - } else { - out[key] = String(v) - } - } catch { - out[key] = '[unserializable]' - } - } - return out - } - return { value: err } - } catch { - return { raw: String(err) } - } -} - export async function handleProtocolCommands(stream: Stream, connection: Connection) { const { remotePeer, remoteAddr } = connection @@ -70,7 +41,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) - // Resume and use length-prefixed messages (libp2p v3 byteStream migration) stream.resume() const lp = lpStream(stream) const readWriteSignal = () => AbortSignal.timeout(30_000) @@ -85,7 +55,9 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - const status = errorDebug ? { httpStatus, error, errorDebug } : { httpStatus, error } + const status = errorDebug + ? { httpStatus, error, errorDebug } + : { httpStatus, error } await lp.write(uint8ArrayFromString(JSON.stringify(status)), { signal: readWriteSignal() }) @@ -184,14 +156,12 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } return err != null ? String(err) : 'Unknown error' })() - const errorDebug = serializeErrorForDebug(err) P2P_LOGGER.logMessageWithEmoji( 'handleProtocolCommands Error: ' + errMessage, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - P2P_LOGGER.error('handleProtocolCommands error object (debug): ' + JSON.stringify(errorDebug)) - await sendErrorAndClose(500, errMessage, errorDebug) + await sendErrorAndClose(500, errMessage) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index c1ce3050f..378dde3fe 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -438,6 +438,9 @@ export class OceanP2P extends EventEmitter { dialTimeout: config.p2pConfig.connectionsDialTimeout, maxConnections: config.p2pConfig.maxConnections, maxPeerAddrsToDial: config.p2pConfig.maxPeerAddrsToDial + }, + connectionMonitor: { + abortConnectionOnPingFailure: false } } if (config.p2pConfig.bootstrapNodes && config.p2pConfig.bootstrapNodes.length > 0) { @@ -788,22 +791,16 @@ export class OceanP2P extends EventEmitter { const statusBytes = await lp.read({ signal: readSignal }) const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) - // Return status and remaining stream (length-prefixed messages) - const streamTimeout = 30_000 return { status, stream: { [Symbol.asyncIterator]: async function* () { try { while (true) { - const chunk = await lp.read({ - signal: AbortSignal.timeout(streamTimeout) - }) + const chunk = await lp.read() yield chunk.subarray ? chunk.subarray() : chunk } - } catch { - // stream ended or closed - } + } catch {} } } } From 883a284ca8fa922151f828e97778ec9c5e54e54a Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Sat, 28 Feb 2026 10:32:29 +0200 Subject: [PATCH 18/27] cleanup --- src/components/P2P/handleProtocolCommands.ts | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 0bc2cc25a..bbf942f98 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -148,20 +148,12 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { - const errMessage = (() => { - if (err instanceof Error) return err.message - if (err != null && typeof err === 'object' && 'type' in err) { - const e = err as { type?: string; message?: string } - return e.message ?? `Event: ${e.type ?? 'unknown'}` - } - return err != null ? String(err) : 'Unknown error' - })() P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + errMessage, + 'handleProtocolCommands Error: ' + err.message, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, errMessage) + await sendErrorAndClose(500, err.message) } } From d69881b4041c2c0647b0e44f58e25ded84ab7ce5 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Sat, 28 Feb 2026 14:44:15 +0200 Subject: [PATCH 19/27] system tests target --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9f2e47b7..6f1fa84bd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -317,7 +317,7 @@ jobs: with: repository: 'oceanprotocol/ocean.js' path: 'ocean.js' - ref: feature/refactor_signatures + ref: main - name: Build ocean-js working-directory: ${{ github.workspace }}/ocean.js run: | From c2326c0ef333ac4fac409d60f48a81d8b3375990 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Wed, 11 Mar 2026 10:02:47 +0200 Subject: [PATCH 20/27] fix abort --- src/components/P2P/index.ts | 9 +++++---- src/components/httpRoutes/commands.ts | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 378dde3fe..0e2a99f6e 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -725,7 +725,8 @@ export class OceanP2P extends EventEmitter { async sendTo( peerName: string, message: string, - multiAddrs?: string[] + multiAddrs?: string[], + abortSignal?: AbortSignal ): Promise<{ status: any; stream?: AsyncIterable }> { P2P_LOGGER.logMessage('SendTo() node ' + peerName + ' task: ' + message, true) @@ -783,8 +784,8 @@ export class OceanP2P extends EventEmitter { try { const lp = lpStream(stream) - const writeSignal = AbortSignal.timeout(10_000) - const readSignal = AbortSignal.timeout(10_000) + const writeSignal = abortSignal ?? AbortSignal.timeout(10_000) + const readSignal = abortSignal ?? AbortSignal.timeout(10_000) await lp.write(uint8ArrayFromString(message), { signal: writeSignal }) @@ -797,7 +798,7 @@ export class OceanP2P extends EventEmitter { [Symbol.asyncIterator]: async function* () { try { while (true) { - const chunk = await lp.read() + const chunk = await lp.read({ signal: abortSignal }) yield chunk.subarray ? chunk.subarray() : chunk } } catch {} diff --git a/src/components/httpRoutes/commands.ts b/src/components/httpRoutes/commands.ts index 746d4caf7..58ddc81c8 100644 --- a/src/components/httpRoutes/commands.ts +++ b/src/components/httpRoutes/commands.ts @@ -58,6 +58,7 @@ directCommandRoute.post( express.json(), async (req: Request, res: Response): Promise => { let closedResponse = false + const abortController = new AbortController() try { const validate = validateCommandParameters(req.body, []) @@ -69,6 +70,7 @@ directCommandRoute.post( res.on('close', () => { if (!closedResponse) { HTTP_LOGGER.error('TCP connection was closed before we could send a response!') + abortController.abort() } closedResponse = true }) @@ -108,7 +110,12 @@ directCommandRoute.post( // Remote command - use P2P sendTo const response = await req.oceanNode .getP2PNode() - .sendTo(req.body.node as string, JSON.stringify(req.body), req.body.multiAddrs) + .sendTo( + req.body.node as string, + JSON.stringify(req.body), + req.body.multiAddrs, + abortController.signal + ) res.status(response.status.httpStatus) if (response.status.headers) { @@ -135,7 +142,10 @@ directCommandRoute.post( } } catch (err) { HTTP_LOGGER.error(err.message) - res.status(500).send(err.message) + closedResponse = true + if (!res.headersSent) { + res.status(500).send(err.message) + } } } ) From 2f38dd4953fe0b60ec732e644fd340f1c0c3b03b Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Wed, 11 Mar 2026 11:38:53 +0200 Subject: [PATCH 21/27] do not abore on close --- src/components/P2P/index.ts | 9 ++++----- src/components/httpRoutes/commands.ts | 9 +-------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 0e2a99f6e..378dde3fe 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -725,8 +725,7 @@ export class OceanP2P extends EventEmitter { async sendTo( peerName: string, message: string, - multiAddrs?: string[], - abortSignal?: AbortSignal + multiAddrs?: string[] ): Promise<{ status: any; stream?: AsyncIterable }> { P2P_LOGGER.logMessage('SendTo() node ' + peerName + ' task: ' + message, true) @@ -784,8 +783,8 @@ export class OceanP2P extends EventEmitter { try { const lp = lpStream(stream) - const writeSignal = abortSignal ?? AbortSignal.timeout(10_000) - const readSignal = abortSignal ?? AbortSignal.timeout(10_000) + const writeSignal = AbortSignal.timeout(10_000) + const readSignal = AbortSignal.timeout(10_000) await lp.write(uint8ArrayFromString(message), { signal: writeSignal }) @@ -798,7 +797,7 @@ export class OceanP2P extends EventEmitter { [Symbol.asyncIterator]: async function* () { try { while (true) { - const chunk = await lp.read({ signal: abortSignal }) + const chunk = await lp.read() yield chunk.subarray ? chunk.subarray() : chunk } } catch {} diff --git a/src/components/httpRoutes/commands.ts b/src/components/httpRoutes/commands.ts index 58ddc81c8..779340638 100644 --- a/src/components/httpRoutes/commands.ts +++ b/src/components/httpRoutes/commands.ts @@ -58,7 +58,6 @@ directCommandRoute.post( express.json(), async (req: Request, res: Response): Promise => { let closedResponse = false - const abortController = new AbortController() try { const validate = validateCommandParameters(req.body, []) @@ -70,7 +69,6 @@ directCommandRoute.post( res.on('close', () => { if (!closedResponse) { HTTP_LOGGER.error('TCP connection was closed before we could send a response!') - abortController.abort() } closedResponse = true }) @@ -110,12 +108,7 @@ directCommandRoute.post( // Remote command - use P2P sendTo const response = await req.oceanNode .getP2PNode() - .sendTo( - req.body.node as string, - JSON.stringify(req.body), - req.body.multiAddrs, - abortController.signal - ) + .sendTo(req.body.node as string, JSON.stringify(req.body), req.body.multiAddrs) res.status(response.status.httpStatus) if (response.status.headers) { From c27c255759b35a2b3ad9e42203c9bd197fc89ab9 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Wed, 11 Mar 2026 12:48:06 +0200 Subject: [PATCH 22/27] rate limiting fixes --- src/components/P2P/handleProtocolCommands.ts | 32 +++++++++++--------- src/components/P2P/index.ts | 12 +++++++- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index bbf942f98..0d1991501 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -71,7 +71,23 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } } - // Rate limiting and deny list checks + // Read the command first so the client always gets a response after writing. + // Rate limiting checks happen after reading to maintain the write→read protocol order. + let task: Command + try { + const cmdBytes = await lp.read({ signal: readWriteSignal() }) + const str = uint8ArrayToString(cmdBytes.subarray()) + task = JSON.parse(str) as Command + } catch (err) { + P2P_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Unable to process P2P command: ${err?.message ?? err}` + ) + await sendErrorAndClose(400, 'Invalid command') + return + } + + // Rate limiting and deny list checks (after reading command) const configuration = await getConfiguration() const { denyList } = configuration @@ -100,20 +116,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - let task: Command - try { - const cmdBytes = await lp.read({ signal: readWriteSignal() }) - const str = uint8ArrayToString(cmdBytes.subarray()) - task = JSON.parse(str) as Command - } catch (err) { - P2P_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err?.message ?? err}` - ) - await sendErrorAndClose(400, 'Invalid command') - return - } - if (!task) { P2P_LOGGER.error('Invalid or missing task/command data!') await sendErrorAndClose(400, 'Invalid command') diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 378dde3fe..7ae7781dc 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -786,7 +786,17 @@ export class OceanP2P extends EventEmitter { const writeSignal = AbortSignal.timeout(10_000) const readSignal = AbortSignal.timeout(10_000) - await lp.write(uint8ArrayFromString(message), { signal: writeSignal }) + try { + await lp.write(uint8ArrayFromString(message), { signal: writeSignal }) + } catch (writeErr) { + // Remote may have closed the stream after sending an error status (e.g. rate limit) + try { + const statusBytes = await lp.read({ signal: AbortSignal.timeout(1_000) }) + return { status: JSON.parse(uint8ArrayToString(statusBytes.subarray())) } + } catch { + throw writeErr + } + } const statusBytes = await lp.read({ signal: readSignal }) const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) From 35a648d31c617b5f8129ac22a6225ba9c5f8f37a Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Wed, 11 Mar 2026 13:24:30 +0200 Subject: [PATCH 23/27] retry on stale connection --- src/components/P2P/index.ts | 106 ++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 34 deletions(-) diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 7ae7781dc..a1ed37a5f 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -6,7 +6,7 @@ import { handleProtocolCommands } from './handlers.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { lpStream } from '@libp2p/utils' -import type { Stream } from '@libp2p/interface' +import type { Connection, Stream } from '@libp2p/interface' import { bootstrap } from '@libp2p/bootstrap' import { noise } from '@chainsafe/libp2p-noise' @@ -727,9 +727,19 @@ export class OceanP2P extends EventEmitter { message: string, multiAddrs?: string[] ): Promise<{ status: any; stream?: AsyncIterable }> { - P2P_LOGGER.logMessage('SendTo() node ' + peerName + ' task: ' + message, true) + const signal = AbortSignal.timeout(10_000) + const options = { + signal: AbortSignal.timeout(10000), + priority: 100, + runOnLimitedConnection: true + } + let connection: Connection + let stream: Stream let peerId + + P2P_LOGGER.logMessage('SendTo() node ' + peerName + ' task: ' + message, true) + try { peerId = peerIdFromString(peerName) } catch (e) { @@ -757,18 +767,12 @@ export class OceanP2P extends EventEmitter { return { status: { httpStatus: 404, error } } } - let stream: Stream try { - const options = { - signal: AbortSignal.timeout(10000), - priority: 100, - runOnLimitedConnection: true - } - const connection = await this._libp2p.dial(multiaddrs, options) + connection = await this._libp2p.dial(multiaddrs, options) if (connection.remotePeer.toString() !== peerId.toString()) { - const error = `Invalid peer on the other side: ${connection.remotePeer.toString()}` - P2P_LOGGER.error(error) - return { status: { httpStatus: 404, error } } + throw new Error( + `Invalid peer on the other side: ${connection.remotePeer.toString()}` + ) } stream = await connection.newStream(this._protocol, options) } catch (e) { @@ -777,49 +781,83 @@ export class OceanP2P extends EventEmitter { return { status: { httpStatus: 404, error } } } - if (!stream) { - return { status: { httpStatus: 404, error: 'Unable to get remote P2P stream' } } + const lp = lpStream(stream) + let streamErr: Error | null = null + try { + await lp.write(uint8ArrayFromString(message), { signal }) + const statusBytes = await lp.read({ signal }) + const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) + return { + status, + stream: { + [Symbol.asyncIterator]: async function* () { + try { + while (true) { + const chunk = await lp.read() + yield chunk.subarray ? chunk.subarray() : chunk + } + } catch {} + } + } + } + } catch (err) { + try { + stream.abort(err as Error) + } catch {} + streamErr = err + } + + // abortConnectionOnPingFailure is disabled to keep long-running download streams alive, + // so stale connections are not evicted automatically. On a stale stream error, close the + // connection and retry once so the next dial establishes a fresh one. + if (!streamErr.message.includes('closed') && !streamErr.message.includes('reset')) { + P2P_LOGGER.error(`P2P communication error: ${streamErr.message}`) + return { status: { httpStatus: 500, error: `P2P error: ${streamErr.message}` } } } + P2P_LOGGER.warn(`Stale connection to ${peerId}, retrying: ${streamErr.message}`) try { - const lp = lpStream(stream) - const writeSignal = AbortSignal.timeout(10_000) - const readSignal = AbortSignal.timeout(10_000) + await connection.close() + } catch {} - try { - await lp.write(uint8ArrayFromString(message), { signal: writeSignal }) - } catch (writeErr) { - // Remote may have closed the stream after sending an error status (e.g. rate limit) - try { - const statusBytes = await lp.read({ signal: AbortSignal.timeout(1_000) }) - return { status: JSON.parse(uint8ArrayToString(statusBytes.subarray())) } - } catch { - throw writeErr - } + try { + connection = await this._libp2p.dial(multiaddrs, options) + if (connection.remotePeer.toString() !== peerId.toString()) { + throw new Error( + `Invalid peer on the other side: ${connection.remotePeer.toString()}` + ) } + stream = await connection.newStream(this._protocol, options) + } catch (e) { + const error = `Cannot reconnect to peer ${peerId}: ${e.message}` + P2P_LOGGER.error(error) + return { status: { httpStatus: 404, error } } + } - const statusBytes = await lp.read({ signal: readSignal }) + const retryLp = lpStream(stream) + try { + await retryLp.write(uint8ArrayFromString(message), { signal }) + const statusBytes = await retryLp.read({ signal }) const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) - return { status, stream: { [Symbol.asyncIterator]: async function* () { try { while (true) { - const chunk = await lp.read() + const chunk = await retryLp.read() yield chunk.subarray ? chunk.subarray() : chunk } } catch {} } } } - } catch (err) { - P2P_LOGGER.error(`P2P communication error: ${err.message}`) + } catch (retryErr) { try { - stream.abort(err as Error) + stream.abort(retryErr as Error) } catch {} - return { status: { httpStatus: 500, error: `P2P error: ${err.message}` } } + P2P_LOGGER.error(`P2P communication error on retry: ${retryErr.message}`) + return { status: { httpStatus: 500, error: `P2P error: ${retryErr.message}` } } } } From bbec66f15fa1cf4a9aa39c68c9fbc47c3c8bed83 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Wed, 11 Mar 2026 13:57:21 +0200 Subject: [PATCH 24/27] cleanup --- src/components/P2P/index.ts | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index a1ed37a5f..5c78b5b06 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -727,9 +727,8 @@ export class OceanP2P extends EventEmitter { message: string, multiAddrs?: string[] ): Promise<{ status: any; stream?: AsyncIterable }> { - const signal = AbortSignal.timeout(10_000) const options = { - signal: AbortSignal.timeout(10000), + signal: AbortSignal.timeout(10_000), priority: 100, runOnLimitedConnection: true } @@ -784,8 +783,8 @@ export class OceanP2P extends EventEmitter { const lp = lpStream(stream) let streamErr: Error | null = null try { - await lp.write(uint8ArrayFromString(message), { signal }) - const statusBytes = await lp.read({ signal }) + await lp.write(uint8ArrayFromString(message), { signal: options.signal }) + const statusBytes = await lp.read({ signal: options.signal }) const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) return { status, @@ -816,28 +815,15 @@ export class OceanP2P extends EventEmitter { } P2P_LOGGER.warn(`Stale connection to ${peerId}, retrying: ${streamErr.message}`) - try { - await connection.close() - } catch {} + try { await connection.close() } catch {} - try { - connection = await this._libp2p.dial(multiaddrs, options) - if (connection.remotePeer.toString() !== peerId.toString()) { - throw new Error( - `Invalid peer on the other side: ${connection.remotePeer.toString()}` - ) - } - stream = await connection.newStream(this._protocol, options) - } catch (e) { - const error = `Cannot reconnect to peer ${peerId}: ${e.message}` - P2P_LOGGER.error(error) - return { status: { httpStatus: 404, error } } - } + connection = await this._libp2p.dial(multiaddrs, options) + stream = await connection.newStream(this._protocol, options) const retryLp = lpStream(stream) try { - await retryLp.write(uint8ArrayFromString(message), { signal }) - const statusBytes = await retryLp.read({ signal }) + await retryLp.write(uint8ArrayFromString(message), { signal: options.signal }) + const statusBytes = await retryLp.read({ signal: options.signal }) const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) return { status, From 80c579e0570870d2d625ce4de7c129c4626d28f7 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Wed, 11 Mar 2026 16:27:27 +0200 Subject: [PATCH 25/27] extract logic --- src/components/P2P/index.ts | 76 +++++++++++++++---------------------- 1 file changed, 31 insertions(+), 45 deletions(-) diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 5c78b5b06..793a036aa 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -5,7 +5,7 @@ import { handleProtocolCommands } from './handlers.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { lpStream } from '@libp2p/utils' +import { LengthPrefixedStream, lpStream } from '@libp2p/utils' import type { Connection, Stream } from '@libp2p/interface' import { bootstrap } from '@libp2p/bootstrap' @@ -722,6 +722,28 @@ export class OceanP2P extends EventEmitter { return null } + async send( + lp: LengthPrefixedStream, + message: string, + options: { signal: AbortSignal } + ) { + await lp.write(uint8ArrayFromString(message), { signal: options.signal }) + const statusBytes = await lp.read({ signal: options.signal }) + return { + status: JSON.parse(uint8ArrayToString(statusBytes.subarray())), + stream: { + [Symbol.asyncIterator]: async function* () { + try { + while (true) { + const chunk = await lp.read() + yield chunk.subarray ? chunk.subarray() : chunk + } + } catch {} + } + } + } + } + async sendTo( peerName: string, message: string, @@ -751,14 +773,9 @@ export class OceanP2P extends EventEmitter { return { status: { httpStatus: 404, error: 'Invalid peer' } } } - let multiaddrs: Multiaddr[] = [] - if (!multiAddrs || multiAddrs.length < 1) { - multiaddrs = await this.getPeerMultiaddrs(peerName) - } else { - for (const addr of multiAddrs) { - multiaddrs.push(multiaddr(addr)) - } - } + const multiaddrs = multiAddrs?.length + ? multiAddrs.map((addr) => multiaddr(addr)) + : await this.getPeerMultiaddrs(peerName) if (multiaddrs.length < 1) { const error = `Cannot find any address to dial for peer: ${peerId}` @@ -780,25 +797,9 @@ export class OceanP2P extends EventEmitter { return { status: { httpStatus: 404, error } } } - const lp = lpStream(stream) let streamErr: Error | null = null try { - await lp.write(uint8ArrayFromString(message), { signal: options.signal }) - const statusBytes = await lp.read({ signal: options.signal }) - const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) - return { - status, - stream: { - [Symbol.asyncIterator]: async function* () { - try { - while (true) { - const chunk = await lp.read() - yield chunk.subarray ? chunk.subarray() : chunk - } - } catch {} - } - } - } + return await this.send(lpStream(stream), message, options) } catch (err) { try { stream.abort(err as Error) @@ -815,29 +816,14 @@ export class OceanP2P extends EventEmitter { } P2P_LOGGER.warn(`Stale connection to ${peerId}, retrying: ${streamErr.message}`) - try { await connection.close() } catch {} - + try { + await connection.close() + } catch {} connection = await this._libp2p.dial(multiaddrs, options) stream = await connection.newStream(this._protocol, options) - const retryLp = lpStream(stream) try { - await retryLp.write(uint8ArrayFromString(message), { signal: options.signal }) - const statusBytes = await retryLp.read({ signal: options.signal }) - const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) - return { - status, - stream: { - [Symbol.asyncIterator]: async function* () { - try { - while (true) { - const chunk = await retryLp.read() - yield chunk.subarray ? chunk.subarray() : chunk - } - } catch {} - } - } - } + return await this.send(lpStream(stream), message, options) } catch (retryErr) { try { stream.abort(retryErr as Error) From 669633cf6d9532119785c48e5543056ea5f3eee7 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 13 Mar 2026 12:22:12 +0200 Subject: [PATCH 26/27] increment abort time --- src/components/P2P/handleProtocolCommands.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 0d1991501..0750dec33 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -43,7 +43,8 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect stream.resume() const lp = lpStream(stream) - const readWriteSignal = () => AbortSignal.timeout(30_000) + const handshakeSignal = () => AbortSignal.timeout(30_000) + const dataWriteSignal = () => AbortSignal.timeout(30 * 60_000) const sendErrorAndClose = async ( httpStatus: number, @@ -59,7 +60,7 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect ? { httpStatus, error, errorDebug } : { httpStatus, error } await lp.write(uint8ArrayFromString(JSON.stringify(status)), { - signal: readWriteSignal() + signal: handshakeSignal() }) await stream.close() } catch (e) { @@ -75,7 +76,7 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect // Rate limiting checks happen after reading to maintain the write→read protocol order. let task: Command try { - const cmdBytes = await lp.read({ signal: readWriteSignal() }) + const cmdBytes = await lp.read({ signal: handshakeSignal() }) const str = uint8ArrayToString(cmdBytes.subarray()) task = JSON.parse(str) as Command } catch (err) { @@ -137,14 +138,14 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect // Send status first (length-prefixed) await lp.write(uint8ArrayFromString(JSON.stringify(response.status)), { - signal: readWriteSignal() + signal: handshakeSignal() }) // Stream data chunks as length-prefixed messages if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - await lp.write(bytes, { signal: readWriteSignal() }) + await lp.write(bytes, { signal: dataWriteSignal() }) } } From 7e637e543a3f01445a7c49ec22100a2521821101 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 13 Mar 2026 13:24:06 +0200 Subject: [PATCH 27/27] allow partial retry on download --- src/@types/commands.ts | 1 + src/components/c2d/compute_engine_base.ts | 3 ++- src/components/c2d/compute_engine_docker.ts | 6 ++++-- src/components/core/compute/getResults.ts | 3 ++- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/@types/commands.ts b/src/@types/commands.ts index fb3cd5d37..984c17b80 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -264,6 +264,7 @@ export interface ComputeGetResultCommand extends Command { nonce: string jobId: string index: number + offset?: number } export interface ComputeGetStreamableLogsCommand extends Command { consumerAddress: string diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 671eee947..17699b7df 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -112,7 +112,8 @@ export abstract class C2DEngine { public abstract getComputeJobResult( consumerAddress: string, jobId: string, - index: number + index: number, + offset?: number ): Promise<{ stream: Readable; headers: any }> public abstract cleanupExpiredStorage(job: DBComputeJob): Promise diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index c43f19658..8f293ebdf 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -1294,7 +1294,8 @@ export class C2DEngineDocker extends C2DEngine { public override async getComputeJobResult( consumerAddress: string, jobId: string, - index: number + index: number, + offset: number = 0 ): Promise<{ stream: Readable; headers: any }> { const jobs = await this.db.getJob(jobId, null, null) if (jobs.length === 0 || jobs.length > 1) { @@ -1358,7 +1359,8 @@ export class C2DEngineDocker extends C2DEngine { if (i.type === 'output') { return { stream: createReadStream( - this.getC2DConfig().tempFolder + '/' + jobId + '/data/outputs/outputs.tar' + this.getC2DConfig().tempFolder + '/' + jobId + '/data/outputs/outputs.tar', + offset > 0 ? { start: offset } : undefined ), headers: { 'Content-Type': 'application/octet-stream' diff --git a/src/components/core/compute/getResults.ts b/src/components/core/compute/getResults.ts index 888d6f61d..6c1143457 100644 --- a/src/components/core/compute/getResults.ts +++ b/src/components/core/compute/getResults.ts @@ -65,7 +65,8 @@ export class ComputeGetResultHandler extends CommandHandler { const respStream = await engine.getComputeJobResult( task.consumerAddress, jobId, - task.index + task.index, + task.offset ?? 0 ) const response: P2PCommandResponse = { stream: respStream?.stream,