From b8c5823b9ad0feb51ac500ded6df17f4a0e3c4b3 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Wed, 17 Jun 2026 08:54:11 -0500 Subject: [PATCH 1/2] fix(storage): handle early read stream destruction File#createReadStream emits the response event before attaching the raw response pipeline. A consumer can destroy the returned stream from that event, leaving pipeline() to receive a destroyed destination and throw ERR_STREAM_UNABLE_TO_PIPE synchronously. Check the public stream after the response arrives and discard the raw response body when the consumer has already closed it. Reuse the existing request-agent cleanup so the abandoned response does not retain its underlying socket resources. Add a regression test using the real stream event ordering. The test destroys the returned stream from its response handler and verifies that the delayed raw response and request agent are cleaned up. This adapts the core approach from PR #7604 by Michael Latman, which was closed before merge. --- handwritten/storage/src/file.ts | 20 ++++++++++++--- handwritten/storage/test/file.ts | 43 ++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/handwritten/storage/src/file.ts b/handwritten/storage/src/file.ts index 1e62634e4c64..469af2b4858c 100644 --- a/handwritten/storage/src/file.ts +++ b/handwritten/storage/src/file.ts @@ -1574,6 +1574,17 @@ class File extends ServiceObject { const shouldRunValidation = !rangeRequest && (crc32c || md5); + const cleanupRequest = () => { + if (request?.agent) { + request.agent.destroy(); + } + }; + + const cleanupRawResponse = (rawResponseStream: Readable) => { + rawResponseStream.destroy(); + cleanupRequest(); + }; + if (rangeRequest) { if ( typeof options.validation === 'string' || @@ -1590,9 +1601,7 @@ class File extends ServiceObject { if (err) { // There is an issue with node-fetch 2.x that if the stream errors the underlying socket connection is not closed. // This causes a memory leak, so cleanup the sockets manually here by destroying the agent. - if (request?.agent) { - request.agent.destroy(); - } + cleanupRequest(); throughStream.destroy(err); } }; @@ -1622,6 +1631,11 @@ class File extends ServiceObject { } request = (rawResponseStream as r.Response).request; + if (throughStream.destroyed) { + cleanupRawResponse(rawResponseStream as Readable); + return; + } + const headers = (rawResponseStream as ResponseBody).toJSON().headers; const isCompressed = headers['content-encoding'] === 'gzip'; const hashes: {crc32c?: string; md5?: string} = {}; diff --git a/handwritten/storage/test/file.ts b/handwritten/storage/test/file.ts index 311d5749582d..0fcd3dfad1b1 100644 --- a/handwritten/storage/test/file.ts +++ b/handwritten/storage/test/file.ts @@ -1165,6 +1165,49 @@ describe('File', () => { file.createReadStream().resume(); }); + it('should clean up if the returned stream is destroyed before the response is piped', done => { + const rawResponseStream = new PassThrough(); + const agentDestroy = sinon.spy(); + Object.assign(rawResponseStream, { + request: { + agent: { + destroy: agentDestroy, + }, + }, + toJSON() { + return {headers: {}}; + }, + }); + + handleRespOverride = ( + _err: Error, + _res: {}, + _body: {}, + callback: Function + ) => { + callback(null, null, rawResponseStream); + }; + + file.requestStream = () => { + const requestStream = new PassThrough(); + setImmediate(() => { + requestStream.emit('response', rawResponseStream); + }); + return requestStream; + }; + + const readStream = file.createReadStream({validation: false}); + readStream.once('response', () => { + readStream.destroy(); + }); + readStream.once('close', () => { + assert.strictEqual(rawResponseStream.destroyed, true); + assert.strictEqual(agentDestroy.calledOnce, true); + done(); + }); + readStream.resume(); + }); + describe('errors', () => { const ERROR = new Error('Error.'); From fd9edf1b983f7c786a25f8fac46dc79d895126c4 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Wed, 17 Jun 2026 08:45:04 -0500 Subject: [PATCH 2/2] fix(storage): handle upload errors before pipeline setup createWriteStream starts the simple or resumable upload before installing the pipeline that normally owns fileWriteStream errors. If startup fails synchronously, fileWriteStream and the public write stream can be destroyed before pipeline() runs. Node then throws ERR_STREAM_UNABLE_TO_PIPE while attaching invalid streams, and the original upload error can escape uncaught. Install a temporary error listener before upload startup and forward any early failure through the existing pipeline callback. If startup has already torn down any stream, skip pipeline setup and destroy the remaining internal streams. Keep the temporary listener until the upload stream closes so a queued terminal error is still handled, then remove it to avoid retaining the callback. Add coverage for a consumer destroying the returned stream during upload startup. The test verifies that the internal upload stream is destroyed and its temporary error listener is removed. Existing synchronous upload-error and pipeline-failure coverage continues to pass. --- handwritten/storage/src/file.ts | 24 ++++++++++++++++++++++++ handwritten/storage/test/file.ts | 21 +++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/handwritten/storage/src/file.ts b/handwritten/storage/src/file.ts index 469af2b4858c..81b6fd9f44ff 100644 --- a/handwritten/storage/src/file.ts +++ b/handwritten/storage/src/file.ts @@ -2192,12 +2192,36 @@ class File extends ServiceObject { }); writeStream.once('writing', () => { + const onPrePipelineError = (error: Error) => { + pipelineCallback(error); + }; + fileWriteStream.once('error', onPrePipelineError); + if (options.resumable === false) { this.startSimpleUpload_(fileWriteStream, options); } else { this.startResumableUpload_(fileWriteStream, options); } + if ( + fileWriteStream.destroyed || + writeStream.destroyed || + emitStream.destroyed + ) { + // Destroying an upload stream can queue its terminal error before + // close, so keep the temporary listener until teardown completes. + fileWriteStream.once('close', () => { + fileWriteStream.removeListener('error', onPrePipelineError); + }); + if (!fileWriteStream.destroyed) { + fileWriteStream.destroy(); + } + emitStream.destroy(); + return; + } + + fileWriteStream.removeListener('error', onPrePipelineError); + // remove temporary noop listener as we now create a pipeline that handles the errors emitStream.removeListener('error', noop); diff --git a/handwritten/storage/test/file.ts b/handwritten/storage/test/file.ts index 0fcd3dfad1b1..0d95aaf7243c 100644 --- a/handwritten/storage/test/file.ts +++ b/handwritten/storage/test/file.ts @@ -2149,6 +2149,27 @@ describe('File', () => { writable.write('data'); }); + it('should clean up if the returned stream is destroyed during upload startup', done => { + const writable = file.createWriteStream(); + let fileWriteStream: duplexify.Duplexify | undefined; + + file.startResumableUpload_ = (stream: duplexify.Duplexify) => { + fileWriteStream = stream; + writable.destroy(); + }; + + writable.on('close', () => { + setImmediate(() => { + assert(fileWriteStream); + assert.strictEqual(fileWriteStream.destroyed, true); + assert.strictEqual(fileWriteStream.listenerCount('error'), 0); + done(); + }); + }); + + writable.write('data'); + }); + it('should alias contentType to metadata object', done => { const contentType = 'text/html'; const writable = file.createWriteStream({contentType});