Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 79 additions & 2 deletions hypaware-core/plugins-workspace/ai-gateway/src/recorder.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// @ts-check

import { randomBytes } from 'node:crypto'
import { brotliDecompressSync, gunzipSync, inflateRawSync, inflateSync } from 'node:zlib'

import { isSseHeaders, SseParser } from './sse.js'

Expand Down Expand Up @@ -116,6 +117,12 @@ export class Exchange {
this.requestBytes = 0
/** @type {ResponseStart | undefined} */
this.response = undefined
/**
* Raw upstream `content-encoding`, captured before header redaction so
* body decoding still works when an operator redacts that header.
* @type {string | string[] | undefined}
*/
this.responseContentEncoding = undefined
/** @type {Buffer[]} */
this.responseChunks = []
/** @type {number} */
Expand Down Expand Up @@ -178,6 +185,7 @@ export class Exchange {
status: init.status,
headers: redactHeaders(init.headers, this.redactSet),
}
this.responseContentEncoding = headerValue(init.headers, 'content-encoding')
this.isSse = isSseHeaders(init.headers)
}

Expand Down Expand Up @@ -256,10 +264,17 @@ export class Exchange {
this.finished = true

const tsEndMs = Date.now()
const requestBody = Buffer.concat(this.requestChunks).toString('utf8')
// The proxy is a pass-through, so a body carries whatever
// `content-encoding` the upstream (or client) applied. Decode it
// before stringifying or a gzip/br/deflate body lands in the cache
// as mojibake that no downstream projector can parse as JSON.
const requestBody = decodeBody(
Buffer.concat(this.requestChunks),
headerValue(this._rawRequestHeaders, 'content-encoding')
)
const responseBody = this.isSse
? null
: Buffer.concat(this.responseChunks).toString('utf8')
: decodeBody(Buffer.concat(this.responseChunks), this.responseContentEncoding)
const devRunId = this.devRunIdFromHeaders()
/** @type {Record<string, unknown>} */
const metadata = {}
Expand Down Expand Up @@ -294,6 +309,68 @@ export class Exchange {
}
}

/**
* Reverse the `content-encoding` applied to a captured body and decode
* it as UTF-8. Best-effort: an empty buffer yields `''`, an unknown or
* undecodable encoding falls back to the raw bytes so a (possibly
* garbled) row is still written rather than the exchange being dropped.
*
* @param {Buffer} buf
* @param {string | string[] | undefined} encodingHeader
* @returns {string}
*/
function decodeBody(buf, encodingHeader) {
if (buf.byteLength === 0) return ''
const encodings = parseEncodings(encodingHeader)
if (encodings.length === 0) return buf.toString('utf8')
let current = buf
// `content-encoding` lists transforms in the order they were applied;
// decode in reverse to undo them.
for (let i = encodings.length - 1; i >= 0; i--) {
const enc = encodings[i]
try {
if (enc === 'gzip' || enc === 'x-gzip') current = gunzipSync(current)
else if (enc === 'br') current = brotliDecompressSync(current)
else if (enc === 'deflate') current = inflateOrRaw(current)
else return current.toString('utf8') // unknown codec — stop, keep what we have
} catch {
return buf.toString('utf8') // undecodable — fall back to the raw bytes
}
}
return current.toString('utf8')
}

/**
* Some servers emit raw DEFLATE without the zlib header. Try the
* conformant decoder first, then the headerless variant.
*
* @param {Buffer} buf
* @returns {Buffer}
*/
function inflateOrRaw(buf) {
try {
return inflateSync(buf)
} catch {
return inflateRawSync(buf)
}
}

/**
* Normalize a `content-encoding` header into an ordered list of lowercase
* codec tokens, dropping `identity` and empties.
*
* @param {string | string[] | undefined} header
* @returns {string[]}
*/
function parseEncodings(header) {
if (header === undefined) return []
const raw = Array.isArray(header) ? header.join(',') : header
return raw
.split(',')
.map((token) => token.trim().toLowerCase())
.filter((token) => token.length > 0 && token !== 'identity')
}

/**
* @param {readonly string[] | undefined} extra
*/
Expand Down
115 changes: 115 additions & 0 deletions test/plugins/ai-gateway-recorder-encoding.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// @ts-check

import assert from 'node:assert/strict'
import test from 'node:test'
import { brotliCompressSync, deflateRawSync, deflateSync, gzipSync } from 'node:zlib'

import { createRecorder } from '../../hypaware-core/plugins-workspace/ai-gateway/src/recorder.js'

/**
* @param {{ responseHeaders?: Record<string, string | string[]>, responseBody?: Buffer, requestHeaders?: Record<string, string>, requestBody?: Buffer, redactHeaders?: string[] }} opts
*/
function finishExchange(opts) {
const recorder = createRecorder({ redactHeaders: opts.redactHeaders })
const exchange = recorder.startExchange({
upstream: 'test',
provider: 'test',
method: 'POST',
path: '/v1/messages',
requestHeaders: opts.requestHeaders ?? {},
})
if (opts.requestBody) exchange.appendRequestChunk(opts.requestBody)
exchange.setResponseStart({ status: 200, headers: opts.responseHeaders ?? {} })
if (opts.responseBody) exchange.appendResponseChunk(opts.responseBody)
return exchange.finalize()
}

const payload = JSON.stringify({ id: 'msg_1', content: [{ type: 'text', text: 'hello' }] })

test('decodes a gzip-encoded response body', () => {
const row = finishExchange({
responseHeaders: { 'content-encoding': 'gzip' },
responseBody: gzipSync(Buffer.from(payload)),
})
assert.equal(row.response_body, payload)
})

test('decodes a brotli-encoded response body', () => {
const row = finishExchange({
responseHeaders: { 'content-encoding': 'br' },
responseBody: brotliCompressSync(Buffer.from(payload)),
})
assert.equal(row.response_body, payload)
})

test('decodes a deflate-encoded response body (zlib and raw)', () => {
const zlib = finishExchange({
responseHeaders: { 'content-encoding': 'deflate' },
responseBody: deflateSync(Buffer.from(payload)),
})
assert.equal(zlib.response_body, payload)

const raw = finishExchange({
responseHeaders: { 'content-encoding': 'deflate' },
responseBody: deflateRawSync(Buffer.from(payload)),
})
assert.equal(raw.response_body, payload)
})

test('passes through an identity / unencoded response body unchanged', () => {
const identity = finishExchange({
responseHeaders: { 'content-encoding': 'identity' },
responseBody: Buffer.from(payload),
})
assert.equal(identity.response_body, payload)

const none = finishExchange({ responseBody: Buffer.from(payload) })
assert.equal(none.response_body, payload)
})

test('decodes a gzip-encoded request body', () => {
const row = finishExchange({
requestHeaders: { 'content-encoding': 'gzip' },
requestBody: gzipSync(Buffer.from(payload)),
})
assert.equal(row.request_body, payload)
})

test('handles content-encoding header case-insensitively', () => {
const row = finishExchange({
responseHeaders: { 'Content-Encoding': 'GZIP' },
responseBody: gzipSync(Buffer.from(payload)),
})
assert.equal(row.response_body, payload)
})

test('decodes the body even when content-encoding is a redacted header', () => {
// The raw upstream encoding is captured before redaction, so decoding
// still works when an operator adds content-encoding to redactHeaders.
const row = finishExchange({
redactHeaders: ['content-encoding'],
responseHeaders: { 'content-encoding': 'gzip' },
responseBody: gzipSync(Buffer.from(payload)),
})
assert.equal(row.response_body, payload)
// The stored header is still redacted in the row.
const headers = JSON.parse(/** @type {string} */ (row.response_headers))
assert.match(headers['content-encoding'], /^REDACTED:/)
})

test('falls back to raw bytes when the encoding is unknown', () => {
const row = finishExchange({
responseHeaders: { 'content-encoding': 'snappy' },
responseBody: Buffer.from(payload),
})
assert.equal(row.response_body, payload)
})

test('falls back to raw bytes when a gzip body is corrupt', () => {
const corrupt = Buffer.from('not actually gzip')
const row = finishExchange({
responseHeaders: { 'content-encoding': 'gzip' },
responseBody: corrupt,
})
assert.equal(row.response_body, corrupt.toString('utf8'))
})