diff --git a/collectivus-plugin-kernel-types.d.ts b/collectivus-plugin-kernel-types.d.ts index 2ed27bc..cb7246a 100644 --- a/collectivus-plugin-kernel-types.d.ts +++ b/collectivus-plugin-kernel-types.d.ts @@ -682,6 +682,15 @@ export interface SinkEncodeContext { * destination opens this stream from the kernel storage service. */ rows?: AsyncIterable> + /** + * Column names that group rows sharing identical wide, repeated values + * (e.g. the dataset's Iceberg partition fields like `conversation_id`). + * Encoders that build columnar files may use these to keep each row + * group low-cardinality so heavily-repeated columns stay dictionary- + * encoded instead of falling back to PLAIN. Optional; encoders that do + * not partition row groups ignore it. Empty/absent disables clustering. + */ + clusterColumns?: readonly string[] } export interface SinkEncodedBlob { diff --git a/hypaware-core/plugins-workspace/format-parquet/src/columns.js b/hypaware-core/plugins-workspace/format-parquet/src/columns.js index 7408f26..d821e50 100644 --- a/hypaware-core/plugins-workspace/format-parquet/src/columns.js +++ b/hypaware-core/plugins-workspace/format-parquet/src/columns.js @@ -23,12 +23,53 @@ * @returns {ColumnSource[]} */ export function rowsToColumnSources(columns, rows) { - return columns.map((spec) => ({ - name: spec.name, - type: spec.type, - nullable: spec.nullable, - data: rows.map((row) => coerce(spec, row[spec.name])), - })) + return columns.map((spec) => { + const data = rows.map((row) => coerce(spec, row[spec.name])) + // JSON columns arrive from the cache (Iceberg `variant`) as parsed + // objects/arrays. hyparquet-writer keys its dictionary by reference, so + // structurally-identical objects are otherwise distinct entries and the + // column collapses to PLAIN — re-storing the (denormalized) blob on every + // row. Interning by canonical content makes identical values share one + // reference so they dictionary-encode (stored once), while the JSON + // logical type still round-trips the original object to readers. + if (spec.type === 'JSON') internJsonValues(data) + return { name: spec.name, type: spec.type, nullable: spec.nullable, data } + }) +} + +/** + * Replace structurally-identical object/array values in `data` with a single + * shared reference, in place. Primitive values (already-stringified JSON, + * numbers, null) are left untouched — they dedupe by value already. + * + * The interning key is a plain `JSON.stringify` of the value, which is a + * faithful, injective rendering of what the JSON writer will emit (numbers + * vs strings stay distinct, no sentinel that real data could forge). Values + * that cannot be serialized — a nested BigInt is the only realistic case — + * throw and are simply left un-interned: keeping their own reference means a + * distinct value is never merged into another, at the cost of not deduping + * that one value. Such values do not occur in these columns in practice + * (they originate from `JSON.parse`, which never yields BigInt), so this + * costs no real dedup while staying correct if one ever does. + * + * @param {unknown[]} data + */ +function internJsonValues(data) { + /** @type {Map} */ + const seen = new Map() + for (let i = 0; i < data.length; i++) { + const v = data[i] + if (v === null || v === undefined || typeof v !== 'object') continue + let key + try { + key = JSON.stringify(v) + } catch { + continue // non-serializable (e.g. nested BigInt) — do not intern + } + const existing = seen.get(key) + if (existing !== undefined) data[i] = existing + else seen.set(key, v) + } } /** @@ -57,6 +98,10 @@ function coerce(spec, value) { case 'TIMESTAMP': return toTimestamp(value, spec.name) case 'JSON': + // Pass JSON objects/arrays through unchanged so the writer's JSON + // logical type round-trips them back to readers as objects. Dedup is + // handled by interning identical values (see internJsonValues) rather + // than by stringifying, which would double-encode (object -> JSON text). return value default: return value diff --git a/hypaware-core/plugins-workspace/format-parquet/src/index.js b/hypaware-core/plugins-workspace/format-parquet/src/index.js index e996481..17f2aa1 100644 --- a/hypaware-core/plugins-workspace/format-parquet/src/index.js +++ b/hypaware-core/plugins-workspace/format-parquet/src/index.js @@ -2,6 +2,8 @@ import zlib from 'node:zlib' +import { ByteWriter, ParquetWriter, schemaFromColumnData } from 'hyparquet-writer' + import { rowsToColumnSources } from './columns.js' import { getTracer, SpanStatusCode } from '../../../../src/core/observability/index.js' @@ -16,6 +18,27 @@ const EXTENSION = 'parquet' const DEFAULT_CODEC = 'SNAPPY' const DEFAULT_ZSTD_LEVEL = 3 +// Row-group clustering. hyparquet-writer keeps a column dictionary-encoded +// only while a row group's DISTINCT values fit under its ~1 MiB dictionary- +// page cap. Columns denormalized onto every row but constant per conversation +// (e.g. `tools`, `system_text`) explode to PLAIN — re-storing every copy in +// full — once a single row group spans the whole partition's distinct values. +// Bounding each row group to a small number of distinct cluster keys (and a +// max row count) keeps the dictionary alive. The dictionary decision depends +// on the distinct-value COUNT, not row order, and the source rows already +// arrive grouped by conversation, so no sort is needed. +const DEFAULT_MAX_CLUSTER_KEYS = 16 +const DEFAULT_MAX_ROWS_PER_GROUP = 50_000 +// Hard ceiling on how many estimated row bytes accumulate in one in-memory +// group before it is written out as a row group and freed. This is the knob +// that bounds peak heap during a sink force: the encoder never holds more than +// ~one group (plus its columnar copy) at once, instead of materializing the +// whole partition. Independent of blob size, so a fat-`tools` partition cannot +// push a group into the gigabytes. +const DEFAULT_MAX_GROUP_BYTES = 32 * 1024 * 1024 +// Mirrors hyparquet-writer's own default page size (1 MiB). +const DEFAULT_PAGE_SIZE = 1024 * 1024 + // Codecs this encoder can emit. SNAPPY is supplied by hyparquet-writer's // own default compressors; ZSTD is wired here via Node's built-in zlib // (Node >= 22.15 / 23.8). Reads are covered by `hyparquet-compressors`, @@ -153,19 +176,74 @@ async function encodePartition(partition, ctx, settings) { 'format-parquet: SinkEncodeContext.rows must be provided by the blob destination' ) } - const rows = await collectRows(ctx.rows) - const columnData = rowsToColumnSources(ctx.columns, rows) + const columns = ctx.columns + const sourceRows = ctx.rows + + // Derive a stable schema from the declared column types (not from the + // data) so we can write row groups incrementally — never holding more + // than one cluster group of rows (plus its columnar copy) in memory. + // This is what stops `hyp sink force` on a large partition from OOMing + // while materializing the whole partition at once. + const schema = schemaFromColumnData({ columnData: rowsToColumnSources(columns, []) }) + const writer = new ByteWriter() + const pq = new ParquetWriter({ + writer, + schema, + codec: settings.codec, + compressors: settings.compressors, + }) + const pageSize = settings.pageSize ?? DEFAULT_PAGE_SIZE + const clusterColumns = ctx.clusterColumns && ctx.clusterColumns.length > 0 ? ctx.clusterColumns : null + + let rowCount = 0 + let rowGroupCount = 0 + /** @type {Record[]} */ + let group = [] + let groupBytes = 0 + /** @type {Set | null} */ + let groupKeys = clusterColumns ? new Set() : null + + // Each flush is exactly one Parquet row group. useDictionary runs per + // row group, so a low-cardinality group keeps wide repeated columns + // (`tools`, `system_text`) dictionary-encoded rather than PLAIN. + const flushGroup = () => { + if (group.length === 0) return + pq.write({ columnData: rowsToColumnSources(columns, group), rowGroupSize: group.length, pageSize }) + rowCount += group.length + rowGroupCount++ + group = [] + groupBytes = 0 + if (groupKeys) groupKeys = new Set() + } + + for await (const row of sourceRows) { + // Estimate the row up front so the byte cap is checked *before* the + // row is added. Otherwise groupBytes only reflects rows already in the + // group, and a fat blob (20-30 MB) can push the group an entire row + // past DEFAULT_MAX_GROUP_BYTES before the next iteration flushes. + const rowBytes = estimateRowBytes(row) + const wouldOverflowBytes = group.length > 0 && groupBytes + rowBytes > DEFAULT_MAX_GROUP_BYTES + if (clusterColumns && groupKeys) { + const key = clusterKeyOf(row, clusterColumns) + const overflowKeys = !groupKeys.has(key) && groupKeys.size >= DEFAULT_MAX_CLUSTER_KEYS + if (group.length > 0 && (overflowKeys || group.length >= DEFAULT_MAX_ROWS_PER_GROUP || wouldOverflowBytes)) { + flushGroup() + } + groupKeys.add(key) + } else if (group.length > 0 && (group.length >= DEFAULT_MAX_ROWS_PER_GROUP || wouldOverflowBytes)) { + flushGroup() + } + group.push(row) + groupBytes += rowBytes + } + flushGroup() - const { parquetWriteBuffer } = await import('hyparquet-writer') - /** @type {{ columnData: any, codec: 'SNAPPY' | 'ZSTD', compressors?: Record Uint8Array>, pageSize?: number }} */ - const writeOpts = { columnData, codec: settings.codec } - if (settings.compressors) writeOpts.compressors = settings.compressors - if (settings.pageSize) writeOpts.pageSize = settings.pageSize - const arrayBuffer = parquetWriteBuffer(writeOpts) - const bytes = new Uint8Array(arrayBuffer) + pq.finish() + const bytes = new Uint8Array(writer.getBuffer()) const filename = `${partitionFilename(partition)}.${EXTENSION}` - span.setAttribute('row_count', rows.length) + span.setAttribute('row_count', rowCount) + span.setAttribute('row_group_count', rowGroupCount) span.setAttribute('bytes_written', bytes.byteLength) span.setAttribute('hyp_sink_filename', filename) span.setStatus({ code: SpanStatusCode.OK }) @@ -174,7 +252,7 @@ async function encodePartition(partition, ctx, settings) { filename, bytes, bytesWritten: bytes.byteLength, - rowCount: rows.length, + rowCount, } } catch (err) { const message = err instanceof Error ? err.message : String(err) @@ -197,18 +275,72 @@ async function encodePartition(partition, ctx, settings) { } /** - * Drain an async iterable of rows into an in-memory array. The Phase 8.3 - * smoke writes 50 rows, so a single-array buffer is fine; larger - * deployments lean on the row-group splitting inside hyparquet-writer. + * Cheap, allocation-free estimate of a row's in-memory footprint in bytes. + * Used only to bound how much a group accumulates before it is flushed as a + * row group, so precision matters less than never under-counting a fat blob. + * Mirrors the cache compactor's estimator. * - * @param {AsyncIterable>} source - * @returns {Promise[]>} + * @param {Record} row + * @returns {number} */ -async function collectRows(source) { - /** @type {Record[]} */ - const rows = [] - for await (const row of source) rows.push(row) - return rows +function estimateRowBytes(row) { + let total = 0 + for (const value of Object.values(row)) total += estimateValueBytes(value) + return total +} + +/** + * @param {unknown} value + * @returns {number} + */ +function estimateValueBytes(value) { + if (value === null || value === undefined) return 0 + switch (typeof value) { + case 'string': + return value.length * 2 // JS strings are UTF-16 internally + case 'number': + return 8 + case 'bigint': + return 16 + case 'boolean': + return 4 + case 'object': { + if (value instanceof Uint8Array) return value.byteLength + let total = 0 + if (Array.isArray(value)) { + for (const item of value) total += estimateValueBytes(item) + return total + } + for (const [k, v] of Object.entries(value)) { + total += k.length * 2 + estimateValueBytes(v) + } + return total + } + default: + return 0 + } +} + +/** + * Stable grouping key from a row's cluster columns. Consistent for equal + * tuples; never persisted. NUL-separated to keep distinct tuples distinct. + * + * @param {Record} row + * @param {readonly string[]} clusterColumns + * @returns {string} + */ +function clusterKeyOf(row, clusterColumns) { + let key = '' + for (const col of clusterColumns) { + const v = row[col] + const part = typeof v === 'string' + ? v + : typeof v === 'bigint' + ? v.toString() + : JSON.stringify(v ?? null) + key += part + '\u0000' + } + return key } /** diff --git a/hypaware-core/plugins-workspace/local-fs/src/index.js b/hypaware-core/plugins-workspace/local-fs/src/index.js index 0abe871..ce392f5 100644 --- a/hypaware-core/plugins-workspace/local-fs/src/index.js +++ b/hypaware-core/plugins-workspace/local-fs/src/index.js @@ -4,7 +4,7 @@ import { Buffer } from 'node:buffer' import fs from 'node:fs/promises' import path from 'node:path' -import { encodePartition } from 'hypaware/core/sinks' +import { encodePartition, clusterColumnsForDataset } from 'hypaware/core/sinks' import { createLocalFsBlobStore, resolveExportsBaseDir } from './blob-store.js' @@ -107,6 +107,7 @@ function buildSink({ baseDir, encoder, sinkCtx, query, storage }) { tempDir: sinkCtx.paths.tempDir, columns, rows, + clusterColumns: clusterColumnsForDataset(query, partition.dataset), sinkInstance: sinkCtx.name, plugin: PLUGIN_NAME, }) diff --git a/hypaware-core/plugins-workspace/s3/src/index.js b/hypaware-core/plugins-workspace/s3/src/index.js index f69a1c0..23dc403 100644 --- a/hypaware-core/plugins-workspace/s3/src/index.js +++ b/hypaware-core/plugins-workspace/s3/src/index.js @@ -2,7 +2,7 @@ import { Buffer } from 'node:buffer' -import { encodePartition } from 'hypaware/core/sinks' +import { encodePartition, clusterColumnsForDataset } from 'hypaware/core/sinks' import { createS3BlobStore, @@ -270,6 +270,7 @@ function buildSink({ config, client, encoder, sinkCtx, query, storage }) { tempDir: sinkCtx.paths.tempDir, columns, rows, + clusterColumns: clusterColumnsForDataset(query, partition.dataset), sinkInstance: sinkCtx.name, plugin: PLUGIN_NAME, }) diff --git a/src/core/sinks/encoder.js b/src/core/sinks/encoder.js index 5e363b8..5c92338 100644 --- a/src/core/sinks/encoder.js +++ b/src/core/sinks/encoder.js @@ -3,9 +3,34 @@ import { Attr, withSpan } from '../observability/index.js' /** - * @import { ColumnSpec, QueryPartition, SinkEncodeContext, SinkEncodedBlob, SinkEncoder } from '../../../collectivus-plugin-kernel-types.d.ts' + * @import { ColumnSpec, QueryPartition, QueryRegistry, SinkEncodeContext, SinkEncodedBlob, SinkEncoder } from '../../../collectivus-plugin-kernel-types.d.ts' */ +/** + * Derive the cluster columns for a dataset from its Iceberg partition fields + * (e.g. `conversation_id`/`cwd`/`date`). Blob destinations pass these into the + * encode context so columnar encoders can keep each row group low-cardinality + * — which keeps wide, heavily-repeated columns (`tools`, `system_text`) + * dictionary-encoded instead of falling back to PLAIN. Returns undefined for + * datasets without a partition declaration (encoders then use default row + * grouping). Shared by the `local-fs` and `s3` blob destinations. + * + * @param {QueryRegistry} query + * @param {string} dataset + * @returns {string[] | undefined} + */ +export function clusterColumnsForDataset(query, dataset) { + const reg = /** @type {{ cachePartitioning?: { iceberg?: { fields?: Array<{ column?: unknown }> } } }} */ ( + query.getDataset(dataset) + ) + const fields = reg?.cachePartitioning?.iceberg?.fields + if (!Array.isArray(fields)) return undefined + const cols = fields + .map((f) => f.column) + .filter(/** @returns {c is string} */ (c) => typeof c === 'string' && c.length > 0) + return cols.length > 0 ? cols : undefined +} + /** * Wrap a single `encoder.encodePartition` call in a * `sink.encode_partition` span. Blob-sink `Sink.exportBatch` @@ -48,6 +73,7 @@ export async function encodePartition(encoder, partition, ctx) { tempDir: ctx.tempDir, columns: ctx.columns, rows: ctx.rows, + clusterColumns: ctx.clusterColumns, }) const rowCount = blob.rowCount ?? 0 const bytesWritten = blob.bytesWritten ?? bytesLengthOf(blob.bytes) diff --git a/src/core/sinks/index.js b/src/core/sinks/index.js index 9f5ccab..07991c2 100644 --- a/src/core/sinks/index.js +++ b/src/core/sinks/index.js @@ -12,4 +12,4 @@ * ``` */ -export { encodePartition } from './encoder.js' +export { encodePartition, clusterColumnsForDataset } from './encoder.js' diff --git a/test/plugins/format-parquet-clustering.test.js b/test/plugins/format-parquet-clustering.test.js new file mode 100644 index 0000000..8671d60 --- /dev/null +++ b/test/plugins/format-parquet-clustering.test.js @@ -0,0 +1,278 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' + +import { parquetMetadata, parquetReadObjects } from 'hyparquet' + +import { activate } from '../../hypaware-core/plugins-workspace/format-parquet/src/index.js' + +/** + * @import { ColumnSpec } from '../../collectivus-plugin-kernel-types.d.ts' + */ + +const noopLog = { debug() {}, info() {}, warn() {}, error() {} } + +/** Build the format-parquet encoder via the plugin's activate(). */ +async function makeEncoder() { + /** @type {any} */ + let encoder + await activate(/** @type {any} */ ({ + config: undefined, + log: noopLog, + provideCapability: (_name, _version, value) => { encoder = value }, + })) + return encoder +} + +/** @type {readonly ColumnSpec[]} */ +const COLUMNS = [ + { name: 'conversation_id', type: 'STRING', nullable: false }, + { name: 'tools', type: 'STRING', nullable: true }, +] + +/** + * Conversation-contiguous rows (the order `readRows` yields). Each + * conversation repeats one wide `tools` blob; the blob differs per + * conversation, so the column has `nConvs` distinct values total — enough + * distinct ~20 KB blobs to exceed hyparquet-writer's 1 MiB dictionary cap + * when they all land in one row group. + * + * @param {number} nConvs + * @param {number} rowsPerConv + */ +async function* genRows(nConvs, rowsPerConv) { + for (let c = 0; c < nConvs; c++) { + const conversation_id = `conv-${c}` + const tools = JSON.stringify({ schema: 'x'.repeat(20_000), conv: conversation_id }) + for (let r = 0; r < rowsPerConv; r++) { + yield { conversation_id, tools } + } + } +} + +/** + * Collect, per column, the set of Parquet encodings across all row groups. + * + * @param {Uint8Array} bytes + * @returns {{ rowGroups: number, encodingsByColumn: Map> }} + */ +function columnEncodings(bytes) { + const ab = /** @type {ArrayBuffer} */ (bytes.buffer.slice(bytes.byteOffset, bytes.byteOffset + bytes.byteLength)) + const md = parquetMetadata(ab) + /** @type {Map>} */ + const encodingsByColumn = new Map() + for (const rg of md.row_groups) { + for (const col of rg.columns) { + const name = col.meta_data?.path_in_schema.join('.') ?? '' + const set = encodingsByColumn.get(name) ?? new Set() + for (const e of col.meta_data?.encodings ?? []) set.add(String(e)) + encodingsByColumn.set(name, set) + } + } + return { rowGroups: md.row_groups.length, encodingsByColumn } +} + +const N_CONVS = 70 +const ROWS_PER_CONV = 8 + +test('without clustering, a wide per-conversation column falls back to PLAIN (the bug)', async () => { + const encoder = await makeEncoder() + const blob = await encoder.encodePartition( + { dataset: 'ai_gateway_messages', partition: {} }, + { log: noopLog, tempDir: '/tmp', columns: COLUMNS, rows: genRows(N_CONVS, ROWS_PER_CONV) } + // no clusterColumns + ) + const { rowGroups, encodingsByColumn } = columnEncodings(blob.bytes) + assert.equal(rowGroups, 1, 'all rows land in a single default row group') + const tools = encodingsByColumn.get('tools') ?? new Set() + assert.ok( + !tools.has('RLE_DICTIONARY') && !tools.has('PLAIN_DICTIONARY'), + `expected tools to be PLAIN (dictionary busted), got ${[...tools].join(',')}` + ) +}) + +test('with clustering, the wide column stays dictionary-encoded and the file shrinks', async () => { + const encoder = await makeEncoder() + + const plain = await encoder.encodePartition( + { dataset: 'ai_gateway_messages', partition: {} }, + { log: noopLog, tempDir: '/tmp', columns: COLUMNS, rows: genRows(N_CONVS, ROWS_PER_CONV) } + ) + const clustered = await encoder.encodePartition( + { dataset: 'ai_gateway_messages', partition: {} }, + { + log: noopLog, + tempDir: '/tmp', + columns: COLUMNS, + rows: genRows(N_CONVS, ROWS_PER_CONV), + clusterColumns: ['conversation_id'], + } + ) + + const { rowGroups, encodingsByColumn } = columnEncodings(clustered.bytes) + assert.ok(rowGroups > 1, `clustered output should split into multiple row groups, got ${rowGroups}`) + const tools = encodingsByColumn.get('tools') ?? new Set() + assert.ok( + tools.has('RLE_DICTIONARY'), + `expected tools to be dictionary-encoded under clustering, got ${[...tools].join(',')}` + ) + + // Same rows, same codec — the only difference is row-group clustering, which + // keeps the repeated blob stored once per group instead of once per row. + assert.ok( + clustered.bytesWritten * 3 < plain.bytesWritten, + `clustered export should be far smaller: clustered=${clustered.bytesWritten} plain=${plain.bytesWritten}` + ) +}) + +test('the per-group byte cap splits a single high-volume conversation into multiple row groups', async () => { + // One conversation -> cluster-key splitting can never trigger (1 distinct + // key). The only thing that can split this is the per-group BYTE cap, which + // is the memory bound: the encoder writes a row group and frees it rather + // than buffering the whole partition. Total estimated bytes (~40 MB) exceed + // DEFAULT_MAX_GROUP_BYTES (32 MB), so we must see >1 row group. + const encoder = await makeEncoder() + async function* oneBigConversation() { + const tools = JSON.stringify({ schema: 'x'.repeat(20_000) }) + for (let i = 0; i < 1000; i++) yield { conversation_id: 'conv-0', tools } + } + const blob = await encoder.encodePartition( + { dataset: 'ai_gateway_messages', partition: {} }, + { + log: noopLog, + tempDir: '/tmp', + columns: COLUMNS, + rows: oneBigConversation(), + clusterColumns: ['conversation_id'], + } + ) + const { rowGroups, encodingsByColumn } = columnEncodings(blob.bytes) + assert.ok(rowGroups >= 2, `byte cap should split into multiple row groups, got ${rowGroups}`) + assert.equal(blob.rowCount, 1000, 'all rows written exactly once across the split') + // The split groups each see one distinct blob, so dictionary survives. + assert.ok((encodingsByColumn.get('tools') ?? new Set()).has('RLE_DICTIONARY')) +}) + +test('a fat row flushes the group before it is added, so no group overshoots the byte cap', async () => { + // Each row is ~18 MB estimated, so two rows (~36 MB) exceed the 32 MB cap but + // one fits. The byte check must run *before* the row is added: otherwise the + // group accumulates one row, fails the `groupBytes >= cap` test (it is still + // under), then takes a second fat row to ~36 MB before flushing on the next + // iteration — overshooting the heap bound by a whole row. With the pre-add + // check, every fat row lands in its own row group. + const encoder = await makeEncoder() + const ROWS = 4 + async function* fatRows() { + for (let i = 0; i < ROWS; i++) { + // ~9M UTF-16 chars -> ~18 MB by the estimator. Distinct per row, same + // conversation so cluster-key splitting can never be the cause. + yield { conversation_id: 'conv-0', tools: 'x'.repeat(9_000_000) + i } + } + } + const blob = await encoder.encodePartition( + { dataset: 'ai_gateway_messages', partition: {} }, + { + log: noopLog, + tempDir: '/tmp', + columns: COLUMNS, + rows: fatRows(), + clusterColumns: ['conversation_id'], + } + ) + assert.equal(blob.rowCount, ROWS, 'all rows written exactly once') + const ab = /** @type {ArrayBuffer} */ ( + blob.bytes.buffer.slice(blob.bytes.byteOffset, blob.bytes.byteOffset + blob.bytes.byteLength) + ) + const md = parquetMetadata(ab) + assert.equal(md.row_groups.length, ROWS, `each fat row should get its own row group, got ${md.row_groups.length}`) + for (const rg of md.row_groups) { + assert.equal(Number(rg.num_rows), 1, 'no row group holds two fat rows (which would overshoot the cap)') + } +}) + +test('JSON object columns are interned so they dictionary-encode AND round-trip as objects', async () => { + // The cache stores JSON columns as Iceberg `variant`, so the reader hands + // them back as parsed objects — a fresh object reference per row even when + // the content repeats. The writer keys its dictionary by reference, so + // without help it sees every row as distinct and bails to PLAIN (this is + // what kept the real `tools` column at >1 GB even with clustering active). + // Interning identical-content objects to one shared reference lets them + // dictionary-encode WITHOUT stringifying — so the JSON logical type still + // round-trips the original object to readers (no double-encoding). + const encoder = await makeEncoder() + const cols = [ + { name: 'conversation_id', type: 'STRING', nullable: false }, + { name: 'tools', type: 'JSON', nullable: true }, + ] + const content = (/** @type {string} */ id) => ({ schema: 'x'.repeat(1000), conv: id, nested: { a: 1 } }) + async function* rows() { + for (let c = 0; c < 40; c++) { + const conversation_id = `conv-${c}` + // Fresh object reference each row, identical content — mirrors the reader. + for (let r = 0; r < 10; r++) yield { conversation_id, tools: content(conversation_id) } + } + } + const blob = await encoder.encodePartition( + { dataset: 'ai_gateway_messages', partition: {} }, + { log: noopLog, tempDir: '/tmp', columns: cols, rows: rows(), clusterColumns: ['conversation_id'] } + ) + // Dedup: dictionary-encoded despite fresh refs. + const { encodingsByColumn } = columnEncodings(blob.bytes) + assert.ok( + (encodingsByColumn.get('tools') ?? new Set()).has('RLE_DICTIONARY'), + `object-valued JSON column must dictionary-encode after interning, got ${[...(encodingsByColumn.get('tools') ?? [])].join(',')}` + ) + // No double-encoding: readers get the original object back, not a JSON string. + const ab = /** @type {ArrayBuffer} */ ( + blob.bytes.buffer.slice(blob.bytes.byteOffset, blob.bytes.byteOffset + blob.bytes.byteLength) + ) + const read = await parquetReadObjects({ file: ab }) + assert.equal(typeof read[0].tools, 'object', 'JSON column must round-trip as an object, not a JSON-text string') + assert.deepEqual(read[0].tools, content(String(read[0].conversation_id)), 'round-tripped object must match the original content') +}) + +test('interning never merges distinct values: BigInt vs same-text string vs sentinel-shaped object', async () => { + // Three values that a naive key could collapse but which the writer emits + // differently, so none may be merged: + // - {a: 1n} -> number 1 + // - {a: "1"} -> string "1" (a string-coercing key would merge with 1n) + // - {a: {__hyp_bigint__: "1"}} -> that literal object (a sentinel-tagged key would merge with 1n) + // The plain-JSON.stringify key (with BigInt values left un-interned) keeps + // all three distinct. + const encoder = await makeEncoder() + const cols = [{ name: 'j', type: 'JSON', nullable: true }] + async function* rows() { + yield { j: { a: 1n } } + yield { j: { a: '1' } } + yield { j: { a: { __hyp_bigint__: '1' } } } + } + const blob = await encoder.encodePartition( + { dataset: 'd', partition: {} }, + { log: noopLog, tempDir: '/tmp', columns: cols, rows: rows() } + ) + const ab = /** @type {ArrayBuffer} */ ( + blob.bytes.buffer.slice(blob.bytes.byteOffset, blob.bytes.byteOffset + blob.bytes.byteLength) + ) + const read = await parquetReadObjects({ file: ab }) + assert.equal(typeof read[0].j.a, 'number', 'BigInt row stays numeric') + assert.equal(read[0].j.a, 1) + assert.equal(typeof read[1].j.a, 'string', 'string row stays a string') + assert.equal(read[1].j.a, '1') + assert.deepEqual(read[2].j.a, { __hyp_bigint__: '1' }, 'sentinel-shaped object stays itself, not merged into the BigInt') +}) + +test('clusterColumns leaves row counts and schema unchanged', async () => { + const encoder = await makeEncoder() + const blob = await encoder.encodePartition( + { dataset: 'ai_gateway_messages', partition: {} }, + { + log: noopLog, + tempDir: '/tmp', + columns: COLUMNS, + rows: genRows(5, 3), + clusterColumns: ['conversation_id'], + } + ) + assert.equal(blob.rowCount, 15, 'every row is still written exactly once') +}) diff --git a/test/plugins/s3-export-batch.test.js b/test/plugins/s3-export-batch.test.js index 2aa9830..b68e513 100644 --- a/test/plugins/s3-export-batch.test.js +++ b/test/plugins/s3-export-batch.test.js @@ -131,6 +131,65 @@ test('exportBatch partial failure: retryPartitions has only the failed partition await sink.close() }) +test('exportBatch forwards dataset cluster columns to the encoder', async () => { + // The s3 sink must derive cluster columns from the dataset's Iceberg + // partition fields and pass them to the encoder — same as local-fs — so the + // Parquet encoder keeps wide repeated columns dictionary-encoded. + /** @type {any} */ + let registered + /** @type {any} */ + const ctx = { + provideCapability() {}, + sinks: { register(/** @type {any} */ d) { registered = d } }, + log: { debug() {}, info() {}, warn() {}, error() {} }, + query: { + getDataset: (/** @type {string} */ name) => + name === 'ai_gateway_messages' + ? { cachePartitioning: { iceberg: { fields: [{ column: 'conversation_id' }, { column: 'date' }] } } } + : undefined, + listDatasets: () => [], + }, + storage: { + tableExists: () => false, + readRows: () => ({ async *[Symbol.asyncIterator]() {} }), + }, + } + await activate(ctx) + + /** @type {any} */ + let seenCtx + const spyEncoder = { + format: 'parquet', + extension: 'parquet', + supports: ['queryable'], + async encodePartition(/** @type {any} */ _p, /** @type {any} */ encodeCtx) { + seenCtx = encodeCtx + const bytes = new TextEncoder().encode('x') + return { filename: 'f.parquet', bytes, bytesWritten: 1, rowCount: 0 } + }, + } + const fakeClient = { async putObject() { return {} }, destroy() {} } + const sinkCtx = { + name: 'test', + config: { + bucket: 'b', + prefix: 'p', + __clientFactory: async () => ({ client: fakeClient, credential_source_kind: 'injected' }), + }, + encoder: spyEncoder, + log: { debug() {}, info() {}, warn() {}, error() {} }, + paths: { tempDir: '/tmp' }, + } + const sink = await registered.create(sinkCtx) + await sink.exportBatch( + { batchId: 'b1', partitions: [{ dataset: 'ai_gateway_messages', partition: {}, tablePath: '' }] }, + {} + ) + assert.deepEqual(seenCtx?.clusterColumns, ['conversation_id', 'date'], + 's3 sink must forward derived cluster columns to the encoder') + await sink.close() +}) + test('exportBatch all-success: no retryPartitions field', async () => { const registration = await activatePlugin() const fakeClient = {