diff --git a/src/@types/commands.ts b/src/@types/commands.ts index fb3cd5d37..984c17b80 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -264,6 +264,7 @@ export interface ComputeGetResultCommand extends Command { nonce: string jobId: string index: number + offset?: number } export interface ComputeGetStreamableLogsCommand extends Command { consumerAddress: string diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 91b5abefb..0750dec33 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -12,6 +12,7 @@ import { checkGlobalConnectionsRateLimit, checkRequestsRateLimit } from '../../utils/validators.js' +import { lpStream } from '@libp2p/utils' import type { Connection, Stream } from '@libp2p/interface' export class ReadableString extends Readable { @@ -40,28 +41,54 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) - const sendErrorAndClose = async (httpStatus: number, error: string) => { + stream.resume() + const lp = lpStream(stream) + const handshakeSignal = () => AbortSignal.timeout(30_000) + const dataWriteSignal = () => AbortSignal.timeout(30 * 60_000) + + const sendErrorAndClose = async ( + httpStatus: number, + error: string, + errorDebug?: Record + ) => { try { - // Check if stream is already closed if (stream.status === 'closed' || stream.status === 'closing') { P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - - // Resume stream in case it's paused - we need to write - stream.resume() - const status = { httpStatus, error } - stream.send(uint8ArrayFromString(JSON.stringify(status))) + const status = errorDebug + ? { httpStatus, error, errorDebug } + : { httpStatus, error } + await lp.write(uint8ArrayFromString(JSON.stringify(status)), { + signal: handshakeSignal() + }) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${e.message}`) + const msg = e instanceof Error ? e.message : e != null ? String(e) : 'Unknown error' + P2P_LOGGER.error(`Error sending error response: ${msg}`) try { stream.abort(e as Error) } catch {} } } - // Rate limiting and deny list checks + // Read the command first so the client always gets a response after writing. + // Rate limiting checks happen after reading to maintain the write→read protocol order. + let task: Command + try { + const cmdBytes = await lp.read({ signal: handshakeSignal() }) + const str = uint8ArrayToString(cmdBytes.subarray()) + task = JSON.parse(str) as Command + } catch (err) { + P2P_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Unable to process P2P command: ${err?.message ?? err}` + ) + await sendErrorAndClose(400, 'Invalid command') + return + } + + // Rate limiting and deny list checks (after reading command) const configuration = await getConfiguration() const { denyList } = configuration @@ -90,30 +117,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now write. - stream.resume() - - // v3 streams are AsyncIterable - let task: Command - try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - await sendErrorAndClose(400, 'Invalid command') - return - } - } - } catch (err) { - P2P_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err.message}` - ) - await sendErrorAndClose(400, 'Invalid command') - return - } - if (!task) { P2P_LOGGER.error('Invalid or missing task/command data!') await sendErrorAndClose(400, 'Invalid command') @@ -133,20 +136,16 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect task.caller = remotePeer.toString() const response: P2PCommandResponse = await handler.handle(task) - // Send status first - stream.send(uint8ArrayFromString(JSON.stringify(response.status))) + // Send status first (length-prefixed) + await lp.write(uint8ArrayFromString(JSON.stringify(response.status)), { + signal: handshakeSignal() + }) - // Stream data chunks without buffering, with backpressure support + // Stream data chunks as length-prefixed messages if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - - // Handle backpressure - if send returns false, wait for drain - if (!stream.send(bytes)) { - await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain - }) - } + await lp.write(bytes, { signal: dataWriteSignal() }) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 87ef1f5c1..793a036aa 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -5,7 +5,8 @@ import { handleProtocolCommands } from './handlers.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import type { Stream } from '@libp2p/interface' +import { LengthPrefixedStream, lpStream } from '@libp2p/utils' +import type { Connection, Stream } from '@libp2p/interface' import { bootstrap } from '@libp2p/bootstrap' import { noise } from '@chainsafe/libp2p-noise' @@ -721,14 +722,45 @@ export class OceanP2P extends EventEmitter { return null } + async send( + lp: LengthPrefixedStream, + message: string, + options: { signal: AbortSignal } + ) { + await lp.write(uint8ArrayFromString(message), { signal: options.signal }) + const statusBytes = await lp.read({ signal: options.signal }) + return { + status: JSON.parse(uint8ArrayToString(statusBytes.subarray())), + stream: { + [Symbol.asyncIterator]: async function* () { + try { + while (true) { + const chunk = await lp.read() + yield chunk.subarray ? chunk.subarray() : chunk + } + } catch {} + } + } + } + } + async sendTo( peerName: string, message: string, multiAddrs?: string[] ): Promise<{ status: any; stream?: AsyncIterable }> { - P2P_LOGGER.logMessage('SendTo() node ' + peerName + ' task: ' + message, true) + const options = { + signal: AbortSignal.timeout(10_000), + priority: 100, + runOnLimitedConnection: true + } + let connection: Connection + let stream: Stream let peerId + + P2P_LOGGER.logMessage('SendTo() node ' + peerName + ' task: ' + message, true) + try { peerId = peerIdFromString(peerName) } catch (e) { @@ -741,14 +773,9 @@ export class OceanP2P extends EventEmitter { return { status: { httpStatus: 404, error: 'Invalid peer' } } } - let multiaddrs: Multiaddr[] = [] - if (!multiAddrs || multiAddrs.length < 1) { - multiaddrs = await this.getPeerMultiaddrs(peerName) - } else { - for (const addr of multiAddrs) { - multiaddrs.push(multiaddr(addr)) - } - } + const multiaddrs = multiAddrs?.length + ? multiAddrs.map((addr) => multiaddr(addr)) + : await this.getPeerMultiaddrs(peerName) if (multiaddrs.length < 1) { const error = `Cannot find any address to dial for peer: ${peerId}` @@ -756,18 +783,12 @@ export class OceanP2P extends EventEmitter { return { status: { httpStatus: 404, error } } } - let stream: Stream try { - const options = { - signal: AbortSignal.timeout(10000), - priority: 100, - runOnLimitedConnection: true - } - const connection = await this._libp2p.dial(multiaddrs, options) + connection = await this._libp2p.dial(multiaddrs, options) if (connection.remotePeer.toString() !== peerId.toString()) { - const error = `Invalid peer on the other side: ${connection.remotePeer.toString()}` - P2P_LOGGER.error(error) - return { status: { httpStatus: 404, error } } + throw new Error( + `Invalid peer on the other side: ${connection.remotePeer.toString()}` + ) } stream = await connection.newStream(this._protocol, options) } catch (e) { @@ -776,33 +797,39 @@ export class OceanP2P extends EventEmitter { return { status: { httpStatus: 404, error } } } - if (!stream) { - return { status: { httpStatus: 404, error: 'Unable to get remote P2P stream' } } - } - + let streamErr: Error | null = null try { - // Send message and close write side - stream.send(uint8ArrayFromString(message)) - await stream.close() - - // Read and parse status from first chunk - const iterator = stream[Symbol.asyncIterator]() - const { done, value } = await iterator.next() + return await this.send(lpStream(stream), message, options) + } catch (err) { + try { + stream.abort(err as Error) + } catch {} + streamErr = err + } - if (done || !value) { - return { status: { httpStatus: 500, error: 'No response from peer' } } - } + // abortConnectionOnPingFailure is disabled to keep long-running download streams alive, + // so stale connections are not evicted automatically. On a stale stream error, close the + // connection and retry once so the next dial establishes a fresh one. + if (!streamErr.message.includes('closed') && !streamErr.message.includes('reset')) { + P2P_LOGGER.error(`P2P communication error: ${streamErr.message}`) + return { status: { httpStatus: 500, error: `P2P error: ${streamErr.message}` } } + } - const status = JSON.parse(uint8ArrayToString(value.subarray())) + P2P_LOGGER.warn(`Stale connection to ${peerId}, retrying: ${streamErr.message}`) + try { + await connection.close() + } catch {} + connection = await this._libp2p.dial(multiaddrs, options) + stream = await connection.newStream(this._protocol, options) - // Return status and remaining stream - return { status, stream: { [Symbol.asyncIterator]: () => iterator } } - } catch (err) { - P2P_LOGGER.error(`P2P communication error: ${err.message}`) + try { + return await this.send(lpStream(stream), message, options) + } catch (retryErr) { try { - stream.abort(err as Error) + stream.abort(retryErr as Error) } catch {} - return { status: { httpStatus: 500, error: `P2P error: ${err.message}` } } + P2P_LOGGER.error(`P2P communication error on retry: ${retryErr.message}`) + return { status: { httpStatus: 500, error: `P2P error: ${retryErr.message}` } } } } diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 671eee947..17699b7df 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -112,7 +112,8 @@ export abstract class C2DEngine { public abstract getComputeJobResult( consumerAddress: string, jobId: string, - index: number + index: number, + offset?: number ): Promise<{ stream: Readable; headers: any }> public abstract cleanupExpiredStorage(job: DBComputeJob): Promise diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index c43f19658..8f293ebdf 100755 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -1294,7 +1294,8 @@ export class C2DEngineDocker extends C2DEngine { public override async getComputeJobResult( consumerAddress: string, jobId: string, - index: number + index: number, + offset: number = 0 ): Promise<{ stream: Readable; headers: any }> { const jobs = await this.db.getJob(jobId, null, null) if (jobs.length === 0 || jobs.length > 1) { @@ -1358,7 +1359,8 @@ export class C2DEngineDocker extends C2DEngine { if (i.type === 'output') { return { stream: createReadStream( - this.getC2DConfig().tempFolder + '/' + jobId + '/data/outputs/outputs.tar' + this.getC2DConfig().tempFolder + '/' + jobId + '/data/outputs/outputs.tar', + offset > 0 ? { start: offset } : undefined ), headers: { 'Content-Type': 'application/octet-stream' diff --git a/src/components/core/compute/getResults.ts b/src/components/core/compute/getResults.ts index 888d6f61d..6c1143457 100644 --- a/src/components/core/compute/getResults.ts +++ b/src/components/core/compute/getResults.ts @@ -65,7 +65,8 @@ export class ComputeGetResultHandler extends CommandHandler { const respStream = await engine.getComputeJobResult( task.consumerAddress, jobId, - task.index + task.index, + task.offset ?? 0 ) const response: P2PCommandResponse = { stream: respStream?.stream, diff --git a/src/components/httpRoutes/commands.ts b/src/components/httpRoutes/commands.ts index 746d4caf7..779340638 100644 --- a/src/components/httpRoutes/commands.ts +++ b/src/components/httpRoutes/commands.ts @@ -135,7 +135,10 @@ directCommandRoute.post( } } catch (err) { HTTP_LOGGER.error(err.message) - res.status(500).send(err.message) + closedResponse = true + if (!res.headersSent) { + res.status(500).send(err.message) + } } } )