diff --git a/hypaware-core/plugins-workspace/ai-gateway/src/recorder.js b/hypaware-core/plugins-workspace/ai-gateway/src/recorder.js index 0d19d62..3e49055 100644 --- a/hypaware-core/plugins-workspace/ai-gateway/src/recorder.js +++ b/hypaware-core/plugins-workspace/ai-gateway/src/recorder.js @@ -1,6 +1,7 @@ // @ts-check import { randomBytes } from 'node:crypto' +import { brotliDecompressSync, gunzipSync, inflateRawSync, inflateSync } from 'node:zlib' import { isSseHeaders, SseParser } from './sse.js' @@ -116,6 +117,12 @@ export class Exchange { this.requestBytes = 0 /** @type {ResponseStart | undefined} */ this.response = undefined + /** + * Raw upstream `content-encoding`, captured before header redaction so + * body decoding still works when an operator redacts that header. + * @type {string | string[] | undefined} + */ + this.responseContentEncoding = undefined /** @type {Buffer[]} */ this.responseChunks = [] /** @type {number} */ @@ -178,6 +185,7 @@ export class Exchange { status: init.status, headers: redactHeaders(init.headers, this.redactSet), } + this.responseContentEncoding = headerValue(init.headers, 'content-encoding') this.isSse = isSseHeaders(init.headers) } @@ -256,10 +264,17 @@ export class Exchange { this.finished = true const tsEndMs = Date.now() - const requestBody = Buffer.concat(this.requestChunks).toString('utf8') + // The proxy is a pass-through, so a body carries whatever + // `content-encoding` the upstream (or client) applied. Decode it + // before stringifying or a gzip/br/deflate body lands in the cache + // as mojibake that no downstream projector can parse as JSON. + const requestBody = decodeBody( + Buffer.concat(this.requestChunks), + headerValue(this._rawRequestHeaders, 'content-encoding') + ) const responseBody = this.isSse ? null - : Buffer.concat(this.responseChunks).toString('utf8') + : decodeBody(Buffer.concat(this.responseChunks), this.responseContentEncoding) const devRunId = this.devRunIdFromHeaders() /** @type {Record} */ const metadata = {} @@ -294,6 +309,68 @@ export class Exchange { } } +/** + * Reverse the `content-encoding` applied to a captured body and decode + * it as UTF-8. Best-effort: an empty buffer yields `''`, an unknown or + * undecodable encoding falls back to the raw bytes so a (possibly + * garbled) row is still written rather than the exchange being dropped. + * + * @param {Buffer} buf + * @param {string | string[] | undefined} encodingHeader + * @returns {string} + */ +function decodeBody(buf, encodingHeader) { + if (buf.byteLength === 0) return '' + const encodings = parseEncodings(encodingHeader) + if (encodings.length === 0) return buf.toString('utf8') + let current = buf + // `content-encoding` lists transforms in the order they were applied; + // decode in reverse to undo them. + for (let i = encodings.length - 1; i >= 0; i--) { + const enc = encodings[i] + try { + if (enc === 'gzip' || enc === 'x-gzip') current = gunzipSync(current) + else if (enc === 'br') current = brotliDecompressSync(current) + else if (enc === 'deflate') current = inflateOrRaw(current) + else return current.toString('utf8') // unknown codec — stop, keep what we have + } catch { + return buf.toString('utf8') // undecodable — fall back to the raw bytes + } + } + return current.toString('utf8') +} + +/** + * Some servers emit raw DEFLATE without the zlib header. Try the + * conformant decoder first, then the headerless variant. + * + * @param {Buffer} buf + * @returns {Buffer} + */ +function inflateOrRaw(buf) { + try { + return inflateSync(buf) + } catch { + return inflateRawSync(buf) + } +} + +/** + * Normalize a `content-encoding` header into an ordered list of lowercase + * codec tokens, dropping `identity` and empties. + * + * @param {string | string[] | undefined} header + * @returns {string[]} + */ +function parseEncodings(header) { + if (header === undefined) return [] + const raw = Array.isArray(header) ? header.join(',') : header + return raw + .split(',') + .map((token) => token.trim().toLowerCase()) + .filter((token) => token.length > 0 && token !== 'identity') +} + /** * @param {readonly string[] | undefined} extra */ diff --git a/test/plugins/ai-gateway-recorder-encoding.test.js b/test/plugins/ai-gateway-recorder-encoding.test.js new file mode 100644 index 0000000..0cbf964 --- /dev/null +++ b/test/plugins/ai-gateway-recorder-encoding.test.js @@ -0,0 +1,115 @@ +// @ts-check + +import assert from 'node:assert/strict' +import test from 'node:test' +import { brotliCompressSync, deflateRawSync, deflateSync, gzipSync } from 'node:zlib' + +import { createRecorder } from '../../hypaware-core/plugins-workspace/ai-gateway/src/recorder.js' + +/** + * @param {{ responseHeaders?: Record, responseBody?: Buffer, requestHeaders?: Record, requestBody?: Buffer, redactHeaders?: string[] }} opts + */ +function finishExchange(opts) { + const recorder = createRecorder({ redactHeaders: opts.redactHeaders }) + const exchange = recorder.startExchange({ + upstream: 'test', + provider: 'test', + method: 'POST', + path: '/v1/messages', + requestHeaders: opts.requestHeaders ?? {}, + }) + if (opts.requestBody) exchange.appendRequestChunk(opts.requestBody) + exchange.setResponseStart({ status: 200, headers: opts.responseHeaders ?? {} }) + if (opts.responseBody) exchange.appendResponseChunk(opts.responseBody) + return exchange.finalize() +} + +const payload = JSON.stringify({ id: 'msg_1', content: [{ type: 'text', text: 'hello' }] }) + +test('decodes a gzip-encoded response body', () => { + const row = finishExchange({ + responseHeaders: { 'content-encoding': 'gzip' }, + responseBody: gzipSync(Buffer.from(payload)), + }) + assert.equal(row.response_body, payload) +}) + +test('decodes a brotli-encoded response body', () => { + const row = finishExchange({ + responseHeaders: { 'content-encoding': 'br' }, + responseBody: brotliCompressSync(Buffer.from(payload)), + }) + assert.equal(row.response_body, payload) +}) + +test('decodes a deflate-encoded response body (zlib and raw)', () => { + const zlib = finishExchange({ + responseHeaders: { 'content-encoding': 'deflate' }, + responseBody: deflateSync(Buffer.from(payload)), + }) + assert.equal(zlib.response_body, payload) + + const raw = finishExchange({ + responseHeaders: { 'content-encoding': 'deflate' }, + responseBody: deflateRawSync(Buffer.from(payload)), + }) + assert.equal(raw.response_body, payload) +}) + +test('passes through an identity / unencoded response body unchanged', () => { + const identity = finishExchange({ + responseHeaders: { 'content-encoding': 'identity' }, + responseBody: Buffer.from(payload), + }) + assert.equal(identity.response_body, payload) + + const none = finishExchange({ responseBody: Buffer.from(payload) }) + assert.equal(none.response_body, payload) +}) + +test('decodes a gzip-encoded request body', () => { + const row = finishExchange({ + requestHeaders: { 'content-encoding': 'gzip' }, + requestBody: gzipSync(Buffer.from(payload)), + }) + assert.equal(row.request_body, payload) +}) + +test('handles content-encoding header case-insensitively', () => { + const row = finishExchange({ + responseHeaders: { 'Content-Encoding': 'GZIP' }, + responseBody: gzipSync(Buffer.from(payload)), + }) + assert.equal(row.response_body, payload) +}) + +test('decodes the body even when content-encoding is a redacted header', () => { + // The raw upstream encoding is captured before redaction, so decoding + // still works when an operator adds content-encoding to redactHeaders. + const row = finishExchange({ + redactHeaders: ['content-encoding'], + responseHeaders: { 'content-encoding': 'gzip' }, + responseBody: gzipSync(Buffer.from(payload)), + }) + assert.equal(row.response_body, payload) + // The stored header is still redacted in the row. + const headers = JSON.parse(/** @type {string} */ (row.response_headers)) + assert.match(headers['content-encoding'], /^REDACTED:/) +}) + +test('falls back to raw bytes when the encoding is unknown', () => { + const row = finishExchange({ + responseHeaders: { 'content-encoding': 'snappy' }, + responseBody: Buffer.from(payload), + }) + assert.equal(row.response_body, payload) +}) + +test('falls back to raw bytes when a gzip body is corrupt', () => { + const corrupt = Buffer.from('not actually gzip') + const row = finishExchange({ + responseHeaders: { 'content-encoding': 'gzip' }, + responseBody: corrupt, + }) + assert.equal(row.response_body, corrupt.toString('utf8')) +})