diff --git a/handwritten/storage/src/file.ts b/handwritten/storage/src/file.ts index 1e62634e4c64..81b6fd9f44ff 100644 --- a/handwritten/storage/src/file.ts +++ b/handwritten/storage/src/file.ts @@ -1574,6 +1574,17 @@ class File extends ServiceObject { const shouldRunValidation = !rangeRequest && (crc32c || md5); + const cleanupRequest = () => { + if (request?.agent) { + request.agent.destroy(); + } + }; + + const cleanupRawResponse = (rawResponseStream: Readable) => { + rawResponseStream.destroy(); + cleanupRequest(); + }; + if (rangeRequest) { if ( typeof options.validation === 'string' || @@ -1590,9 +1601,7 @@ class File extends ServiceObject { if (err) { // There is an issue with node-fetch 2.x that if the stream errors the underlying socket connection is not closed. // This causes a memory leak, so cleanup the sockets manually here by destroying the agent. - if (request?.agent) { - request.agent.destroy(); - } + cleanupRequest(); throughStream.destroy(err); } }; @@ -1622,6 +1631,11 @@ class File extends ServiceObject { } request = (rawResponseStream as r.Response).request; + if (throughStream.destroyed) { + cleanupRawResponse(rawResponseStream as Readable); + return; + } + const headers = (rawResponseStream as ResponseBody).toJSON().headers; const isCompressed = headers['content-encoding'] === 'gzip'; const hashes: {crc32c?: string; md5?: string} = {}; @@ -2178,12 +2192,36 @@ class File extends ServiceObject { }); writeStream.once('writing', () => { + const onPrePipelineError = (error: Error) => { + pipelineCallback(error); + }; + fileWriteStream.once('error', onPrePipelineError); + if (options.resumable === false) { this.startSimpleUpload_(fileWriteStream, options); } else { this.startResumableUpload_(fileWriteStream, options); } + if ( + fileWriteStream.destroyed || + writeStream.destroyed || + emitStream.destroyed + ) { + // Destroying an upload stream can queue its terminal error before + // close, so keep the temporary listener until teardown completes. + fileWriteStream.once('close', () => { + fileWriteStream.removeListener('error', onPrePipelineError); + }); + if (!fileWriteStream.destroyed) { + fileWriteStream.destroy(); + } + emitStream.destroy(); + return; + } + + fileWriteStream.removeListener('error', onPrePipelineError); + // remove temporary noop listener as we now create a pipeline that handles the errors emitStream.removeListener('error', noop); diff --git a/handwritten/storage/test/file.ts b/handwritten/storage/test/file.ts index 311d5749582d..0d95aaf7243c 100644 --- a/handwritten/storage/test/file.ts +++ b/handwritten/storage/test/file.ts @@ -1165,6 +1165,49 @@ describe('File', () => { file.createReadStream().resume(); }); + it('should clean up if the returned stream is destroyed before the response is piped', done => { + const rawResponseStream = new PassThrough(); + const agentDestroy = sinon.spy(); + Object.assign(rawResponseStream, { + request: { + agent: { + destroy: agentDestroy, + }, + }, + toJSON() { + return {headers: {}}; + }, + }); + + handleRespOverride = ( + _err: Error, + _res: {}, + _body: {}, + callback: Function + ) => { + callback(null, null, rawResponseStream); + }; + + file.requestStream = () => { + const requestStream = new PassThrough(); + setImmediate(() => { + requestStream.emit('response', rawResponseStream); + }); + return requestStream; + }; + + const readStream = file.createReadStream({validation: false}); + readStream.once('response', () => { + readStream.destroy(); + }); + readStream.once('close', () => { + assert.strictEqual(rawResponseStream.destroyed, true); + assert.strictEqual(agentDestroy.calledOnce, true); + done(); + }); + readStream.resume(); + }); + describe('errors', () => { const ERROR = new Error('Error.'); @@ -2106,6 +2149,27 @@ describe('File', () => { writable.write('data'); }); + it('should clean up if the returned stream is destroyed during upload startup', done => { + const writable = file.createWriteStream(); + let fileWriteStream: duplexify.Duplexify | undefined; + + file.startResumableUpload_ = (stream: duplexify.Duplexify) => { + fileWriteStream = stream; + writable.destroy(); + }; + + writable.on('close', () => { + setImmediate(() => { + assert(fileWriteStream); + assert.strictEqual(fileWriteStream.destroyed, true); + assert.strictEqual(fileWriteStream.listenerCount('error'), 0); + done(); + }); + }); + + writable.write('data'); + }); + it('should alias contentType to metadata object', done => { const contentType = 'text/html'; const writable = file.createWriteStream({contentType});