Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a542295
timeout
giurgiur99 Feb 26, 2026
4b2e2f4
restart only when paused
giurgiur99 Feb 26, 2026
0579ca6
Revert "restart only when paused"
giurgiur99 Feb 26, 2026
89ad5e5
increase timeouts and frames
giurgiur99 Feb 26, 2026
47857a0
do not extra pause stream
giurgiur99 Feb 26, 2026
6a7b9fb
safe err
giurgiur99 Feb 27, 2026
d6ab8be
read data first
giurgiur99 Feb 27, 2026
2bc3d77
log err
giurgiur99 Feb 27, 2026
6316567
respect backpressure
giurgiur99 Feb 27, 2026
95bcd8c
flush on drain
giurgiur99 Feb 27, 2026
172ce88
Merge branch 'main' into debug-longer-timeout
giurgiur99 Feb 27, 2026
77742b7
Revert "respect backpressure"
giurgiur99 Feb 27, 2026
3559264
try bytestream
giurgiur99 Feb 27, 2026
c4c1a3b
Revert "try bytestream"
giurgiur99 Feb 27, 2026
295efd8
use lps
giurgiur99 Feb 27, 2026
fc1d09e
err msg log
giurgiur99 Feb 27, 2026
10aa46c
serialize error
giurgiur99 Feb 27, 2026
1dc500c
cleanup
giurgiur99 Feb 28, 2026
883a284
cleanup
giurgiur99 Feb 28, 2026
d69881b
system tests target
giurgiur99 Feb 28, 2026
f12a2dd
Merge branch 'main' into debug-longer-timeout
alexcos20 Mar 3, 2026
6186166
Merge branch 'main' into debug-longer-timeout
giurgiur99 Mar 11, 2026
c2326c0
fix abort
giurgiur99 Mar 11, 2026
2f38dd4
do not abore on close
giurgiur99 Mar 11, 2026
c27c255
rate limiting fixes
giurgiur99 Mar 11, 2026
35a648d
retry on stale connection
giurgiur99 Mar 11, 2026
bbec66f
cleanup
giurgiur99 Mar 11, 2026
80c579e
extract logic
giurgiur99 Mar 11, 2026
669633c
increment abort time
giurgiur99 Mar 13, 2026
7e637e5
allow partial retry on download
giurgiur99 Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ export interface ComputeGetResultCommand extends Command {
nonce: string
jobId: string
index: number
offset?: number
}
export interface ComputeGetStreamableLogsCommand extends Command {
consumerAddress: string
Expand Down
85 changes: 42 additions & 43 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
checkGlobalConnectionsRateLimit,
checkRequestsRateLimit
} from '../../utils/validators.js'
import { lpStream } from '@libp2p/utils'
import type { Connection, Stream } from '@libp2p/interface'

export class ReadableString extends Readable {
Expand Down Expand Up @@ -40,28 +41,54 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true)
P2P_LOGGER.logMessage('Using ' + remoteAddr, true)

const sendErrorAndClose = async (httpStatus: number, error: string) => {
stream.resume()
const lp = lpStream(stream)
const handshakeSignal = () => AbortSignal.timeout(30_000)
const dataWriteSignal = () => AbortSignal.timeout(30 * 60_000)

const sendErrorAndClose = async (
httpStatus: number,
error: string,
errorDebug?: Record<string, unknown>
) => {
try {
// Check if stream is already closed
if (stream.status === 'closed' || stream.status === 'closing') {
P2P_LOGGER.warn('Stream already closed, cannot send error response')
return
}

// Resume stream in case it's paused - we need to write
stream.resume()
const status = { httpStatus, error }
stream.send(uint8ArrayFromString(JSON.stringify(status)))
const status = errorDebug
? { httpStatus, error, errorDebug }
: { httpStatus, error }
await lp.write(uint8ArrayFromString(JSON.stringify(status)), {
signal: handshakeSignal()
})
await stream.close()
} catch (e) {
P2P_LOGGER.error(`Error sending error response: ${e.message}`)
const msg = e instanceof Error ? e.message : e != null ? String(e) : 'Unknown error'
P2P_LOGGER.error(`Error sending error response: ${msg}`)
try {
stream.abort(e as Error)
} catch {}
}
}

// Rate limiting and deny list checks
// Read the command first so the client always gets a response after writing.
// Rate limiting checks happen after reading to maintain the write→read protocol order.
let task: Command
try {
const cmdBytes = await lp.read({ signal: handshakeSignal() })
const str = uint8ArrayToString(cmdBytes.subarray())
task = JSON.parse(str) as Command
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to process P2P command: ${err?.message ?? err}`
)
await sendErrorAndClose(400, 'Invalid command')
return
}

// Rate limiting and deny list checks (after reading command)
const configuration = await getConfiguration()
const { denyList } = configuration

Expand Down Expand Up @@ -90,30 +117,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
return
}

// Resume the stream. We can now write.
stream.resume()

// v3 streams are AsyncIterable
let task: Command
try {
for await (const chunk of stream) {
try {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
await sendErrorAndClose(400, 'Invalid command')
return
}
}
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to process P2P command: ${err.message}`
)
await sendErrorAndClose(400, 'Invalid command')
return
}

if (!task) {
P2P_LOGGER.error('Invalid or missing task/command data!')
await sendErrorAndClose(400, 'Invalid command')
Expand All @@ -133,20 +136,16 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
task.caller = remotePeer.toString()
const response: P2PCommandResponse = await handler.handle(task)

// Send status first
stream.send(uint8ArrayFromString(JSON.stringify(response.status)))
// Send status first (length-prefixed)
await lp.write(uint8ArrayFromString(JSON.stringify(response.status)), {
signal: handshakeSignal()
})

// Stream data chunks without buffering, with backpressure support
// Stream data chunks as length-prefixed messages
if (response.stream) {
for await (const chunk of response.stream as Readable) {
const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)

// Handle backpressure - if send returns false, wait for drain
if (!stream.send(bytes)) {
await stream.onDrain({
signal: AbortSignal.timeout(30000) // 30 second timeout for drain
})
}
await lp.write(bytes, { signal: dataWriteSignal() })
}
}

Expand Down
109 changes: 68 additions & 41 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { handleProtocolCommands } from './handlers.js'

import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import type { Stream } from '@libp2p/interface'
import { LengthPrefixedStream, lpStream } from '@libp2p/utils'
import type { Connection, Stream } from '@libp2p/interface'

import { bootstrap } from '@libp2p/bootstrap'
import { noise } from '@chainsafe/libp2p-noise'
Expand Down Expand Up @@ -721,14 +722,45 @@ export class OceanP2P extends EventEmitter {
return null
}

async send(
lp: LengthPrefixedStream<Stream>,
message: string,
options: { signal: AbortSignal }
) {
await lp.write(uint8ArrayFromString(message), { signal: options.signal })
const statusBytes = await lp.read({ signal: options.signal })
return {
status: JSON.parse(uint8ArrayToString(statusBytes.subarray())),
stream: {
[Symbol.asyncIterator]: async function* () {
try {
while (true) {
const chunk = await lp.read()
yield chunk.subarray ? chunk.subarray() : chunk
}
} catch {}
}
}
}
}

async sendTo(
peerName: string,
message: string,
multiAddrs?: string[]
): Promise<{ status: any; stream?: AsyncIterable<any> }> {
P2P_LOGGER.logMessage('SendTo() node ' + peerName + ' task: ' + message, true)
const options = {
signal: AbortSignal.timeout(10_000),
priority: 100,
runOnLimitedConnection: true
}

let connection: Connection
let stream: Stream
let peerId

P2P_LOGGER.logMessage('SendTo() node ' + peerName + ' task: ' + message, true)

try {
peerId = peerIdFromString(peerName)
} catch (e) {
Expand All @@ -741,33 +773,22 @@ export class OceanP2P extends EventEmitter {
return { status: { httpStatus: 404, error: 'Invalid peer' } }
}

let multiaddrs: Multiaddr[] = []
if (!multiAddrs || multiAddrs.length < 1) {
multiaddrs = await this.getPeerMultiaddrs(peerName)
} else {
for (const addr of multiAddrs) {
multiaddrs.push(multiaddr(addr))
}
}
const multiaddrs = multiAddrs?.length
? multiAddrs.map((addr) => multiaddr(addr))
: await this.getPeerMultiaddrs(peerName)

if (multiaddrs.length < 1) {
const error = `Cannot find any address to dial for peer: ${peerId}`
P2P_LOGGER.error(error)
return { status: { httpStatus: 404, error } }
}

let stream: Stream
try {
const options = {
signal: AbortSignal.timeout(10000),
priority: 100,
runOnLimitedConnection: true
}
const connection = await this._libp2p.dial(multiaddrs, options)
connection = await this._libp2p.dial(multiaddrs, options)
if (connection.remotePeer.toString() !== peerId.toString()) {
const error = `Invalid peer on the other side: ${connection.remotePeer.toString()}`
P2P_LOGGER.error(error)
return { status: { httpStatus: 404, error } }
throw new Error(
`Invalid peer on the other side: ${connection.remotePeer.toString()}`
)
}
stream = await connection.newStream(this._protocol, options)
} catch (e) {
Expand All @@ -776,33 +797,39 @@ export class OceanP2P extends EventEmitter {
return { status: { httpStatus: 404, error } }
}

if (!stream) {
return { status: { httpStatus: 404, error: 'Unable to get remote P2P stream' } }
}

let streamErr: Error | null = null
try {
// Send message and close write side
stream.send(uint8ArrayFromString(message))
await stream.close()

// Read and parse status from first chunk
const iterator = stream[Symbol.asyncIterator]()
const { done, value } = await iterator.next()
return await this.send(lpStream(stream), message, options)
} catch (err) {
try {
stream.abort(err as Error)
} catch {}
streamErr = err
}

if (done || !value) {
return { status: { httpStatus: 500, error: 'No response from peer' } }
}
// abortConnectionOnPingFailure is disabled to keep long-running download streams alive,
// so stale connections are not evicted automatically. On a stale stream error, close the
// connection and retry once so the next dial establishes a fresh one.
if (!streamErr.message.includes('closed') && !streamErr.message.includes('reset')) {
P2P_LOGGER.error(`P2P communication error: ${streamErr.message}`)
return { status: { httpStatus: 500, error: `P2P error: ${streamErr.message}` } }
}

const status = JSON.parse(uint8ArrayToString(value.subarray()))
P2P_LOGGER.warn(`Stale connection to ${peerId}, retrying: ${streamErr.message}`)
try {
await connection.close()
} catch {}
connection = await this._libp2p.dial(multiaddrs, options)
stream = await connection.newStream(this._protocol, options)

// Return status and remaining stream
return { status, stream: { [Symbol.asyncIterator]: () => iterator } }
} catch (err) {
P2P_LOGGER.error(`P2P communication error: ${err.message}`)
try {
return await this.send(lpStream(stream), message, options)
} catch (retryErr) {
try {
stream.abort(err as Error)
stream.abort(retryErr as Error)
} catch {}
return { status: { httpStatus: 500, error: `P2P error: ${err.message}` } }
P2P_LOGGER.error(`P2P communication error on retry: ${retryErr.message}`)
return { status: { httpStatus: 500, error: `P2P error: ${retryErr.message}` } }
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/components/c2d/compute_engine_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ export abstract class C2DEngine {
public abstract getComputeJobResult(
consumerAddress: string,
jobId: string,
index: number
index: number,
offset?: number
): Promise<{ stream: Readable; headers: any }>

public abstract cleanupExpiredStorage(job: DBComputeJob): Promise<boolean>
Expand Down
6 changes: 4 additions & 2 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,8 @@ export class C2DEngineDocker extends C2DEngine {
public override async getComputeJobResult(
consumerAddress: string,
jobId: string,
index: number
index: number,
offset: number = 0
): Promise<{ stream: Readable; headers: any }> {
const jobs = await this.db.getJob(jobId, null, null)
if (jobs.length === 0 || jobs.length > 1) {
Expand Down Expand Up @@ -1358,7 +1359,8 @@ export class C2DEngineDocker extends C2DEngine {
if (i.type === 'output') {
return {
stream: createReadStream(
this.getC2DConfig().tempFolder + '/' + jobId + '/data/outputs/outputs.tar'
this.getC2DConfig().tempFolder + '/' + jobId + '/data/outputs/outputs.tar',
offset > 0 ? { start: offset } : undefined
),
headers: {
'Content-Type': 'application/octet-stream'
Expand Down
3 changes: 2 additions & 1 deletion src/components/core/compute/getResults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ export class ComputeGetResultHandler extends CommandHandler {
const respStream = await engine.getComputeJobResult(
task.consumerAddress,
jobId,
task.index
task.index,
task.offset ?? 0
)
const response: P2PCommandResponse = {
stream: respStream?.stream,
Expand Down
5 changes: 4 additions & 1 deletion src/components/httpRoutes/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ directCommandRoute.post(
}
} catch (err) {
HTTP_LOGGER.error(err.message)
res.status(500).send(err.message)
closedResponse = true
if (!res.headersSent) {
res.status(500).send(err.message)
}
}
}
)
Loading