Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions collectivus-plugin-kernel-types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,15 @@ export interface SinkEncodeContext {
* destination opens this stream from the kernel storage service.
*/
rows?: AsyncIterable<Record<string, unknown>>
/**
* Column names that group rows sharing identical wide, repeated values
* (e.g. the dataset's Iceberg partition fields like `conversation_id`).
* Encoders that build columnar files may use these to keep each row
* group low-cardinality so heavily-repeated columns stay dictionary-
* encoded instead of falling back to PLAIN. Optional; encoders that do
* not partition row groups ignore it. Empty/absent disables clustering.
*/
clusterColumns?: readonly string[]
}

export interface SinkEncodedBlob {
Expand Down
57 changes: 51 additions & 6 deletions hypaware-core/plugins-workspace/format-parquet/src/columns.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,53 @@
* @returns {ColumnSource[]}
*/
export function rowsToColumnSources(columns, rows) {
return columns.map((spec) => ({
name: spec.name,
type: spec.type,
nullable: spec.nullable,
data: rows.map((row) => coerce(spec, row[spec.name])),
}))
return columns.map((spec) => {
const data = rows.map((row) => coerce(spec, row[spec.name]))
// JSON columns arrive from the cache (Iceberg `variant`) as parsed
// objects/arrays. hyparquet-writer keys its dictionary by reference, so
// structurally-identical objects are otherwise distinct entries and the
// column collapses to PLAIN — re-storing the (denormalized) blob on every
// row. Interning by canonical content makes identical values share one
// reference so they dictionary-encode (stored once), while the JSON
// logical type still round-trips the original object to readers.
if (spec.type === 'JSON') internJsonValues(data)
return { name: spec.name, type: spec.type, nullable: spec.nullable, data }
})
}

/**
* Replace structurally-identical object/array values in `data` with a single
* shared reference, in place. Primitive values (already-stringified JSON,
* numbers, null) are left untouched — they dedupe by value already.
*
* The interning key is a plain `JSON.stringify` of the value, which is a
* faithful, injective rendering of what the JSON writer will emit (numbers
* vs strings stay distinct, no sentinel that real data could forge). Values
* that cannot be serialized — a nested BigInt is the only realistic case —
* throw and are simply left un-interned: keeping their own reference means a
* distinct value is never merged into another, at the cost of not deduping
* that one value. Such values do not occur in these columns in practice
* (they originate from `JSON.parse`, which never yields BigInt), so this
* costs no real dedup while staying correct if one ever does.
*
* @param {unknown[]} data
*/
function internJsonValues(data) {
/** @type {Map<string, unknown>} */
const seen = new Map()
for (let i = 0; i < data.length; i++) {
const v = data[i]
if (v === null || v === undefined || typeof v !== 'object') continue
let key
try {
key = JSON.stringify(v)
} catch {
continue // non-serializable (e.g. nested BigInt) — do not intern
}
const existing = seen.get(key)
if (existing !== undefined) data[i] = existing
else seen.set(key, v)
}
}

/**
Expand Down Expand Up @@ -57,6 +98,10 @@ function coerce(spec, value) {
case 'TIMESTAMP':
return toTimestamp(value, spec.name)
case 'JSON':
// Pass JSON objects/arrays through unchanged so the writer's JSON
// logical type round-trips them back to readers as objects. Dedup is
// handled by interning identical values (see internJsonValues) rather
// than by stringifying, which would double-encode (object -> JSON text).
return value
default:
return value
Expand Down
173 changes: 152 additions & 21 deletions hypaware-core/plugins-workspace/format-parquet/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@ const EXTENSION = 'parquet'
const DEFAULT_CODEC = 'SNAPPY'
const DEFAULT_ZSTD_LEVEL = 3

// Row-group clustering. hyparquet-writer keeps a column dictionary-encoded
// only while a row group's DISTINCT values fit under its ~1 MiB dictionary-
// page cap. Columns denormalized onto every row but constant per conversation
// (e.g. `tools`, `system_text`) explode to PLAIN — re-storing every copy in
// full — once a single row group spans the whole partition's distinct values.
// Bounding each row group to a small number of distinct cluster keys (and a
// max row count) keeps the dictionary alive. The dictionary decision depends
// on the distinct-value COUNT, not row order, and the source rows already
// arrive grouped by conversation, so no sort is needed.
const DEFAULT_MAX_CLUSTER_KEYS = 16
const DEFAULT_MAX_ROWS_PER_GROUP = 50_000
// Hard ceiling on how many estimated row bytes accumulate in one in-memory
// group before it is written out as a row group and freed. This is the knob
// that bounds peak heap during a sink force: the encoder never holds more than
// ~one group (plus its columnar copy) at once, instead of materializing the
// whole partition. Independent of blob size, so a fat-`tools` partition cannot
// push a group into the gigabytes.
const DEFAULT_MAX_GROUP_BYTES = 32 * 1024 * 1024
// Mirrors hyparquet-writer's own default page size (1 MiB).
const DEFAULT_PAGE_SIZE = 1024 * 1024

// Codecs this encoder can emit. SNAPPY is supplied by hyparquet-writer's
// own default compressors; ZSTD is wired here via Node's built-in zlib
// (Node >= 22.15 / 23.8). Reads are covered by `hyparquet-compressors`,
Expand Down Expand Up @@ -153,19 +174,75 @@ async function encodePartition(partition, ctx, settings) {
'format-parquet: SinkEncodeContext.rows must be provided by the blob destination'
)
}
const rows = await collectRows(ctx.rows)
const columnData = rowsToColumnSources(ctx.columns, rows)
const columns = ctx.columns
const sourceRows = ctx.rows
const { ByteWriter, ParquetWriter, schemaFromColumnData } = await import('hyparquet-writer')

// Derive a stable schema from the declared column types (not from the
// data) so we can write row groups incrementally — never holding more
// than one cluster group of rows (plus its columnar copy) in memory.
// This is what stops `hyp sink force` on a large partition from OOMing
// while materializing the whole partition at once.
const schema = schemaFromColumnData({ columnData: rowsToColumnSources(columns, []) })
const writer = new ByteWriter()
const pq = new ParquetWriter({
writer,
schema,
codec: settings.codec,
compressors: settings.compressors,
})
const pageSize = settings.pageSize ?? DEFAULT_PAGE_SIZE
const clusterColumns = ctx.clusterColumns && ctx.clusterColumns.length > 0 ? ctx.clusterColumns : null

const { parquetWriteBuffer } = await import('hyparquet-writer')
/** @type {{ columnData: any, codec: 'SNAPPY' | 'ZSTD', compressors?: Record<string, (bytes: Uint8Array) => Uint8Array>, pageSize?: number }} */
const writeOpts = { columnData, codec: settings.codec }
if (settings.compressors) writeOpts.compressors = settings.compressors
if (settings.pageSize) writeOpts.pageSize = settings.pageSize
const arrayBuffer = parquetWriteBuffer(writeOpts)
const bytes = new Uint8Array(arrayBuffer)
let rowCount = 0
let rowGroupCount = 0
/** @type {Record<string, unknown>[]} */
let group = []
let groupBytes = 0
/** @type {Set<string> | null} */
let groupKeys = clusterColumns ? new Set() : null

// Each flush is exactly one Parquet row group. useDictionary runs per
// row group, so a low-cardinality group keeps wide repeated columns
// (`tools`, `system_text`) dictionary-encoded rather than PLAIN.
const flushGroup = () => {
if (group.length === 0) return
pq.write({ columnData: rowsToColumnSources(columns, group), rowGroupSize: group.length, pageSize })
rowCount += group.length
rowGroupCount++
group = []
groupBytes = 0
if (groupKeys) groupKeys = new Set()
}

for await (const row of sourceRows) {
// Estimate the row up front so the byte cap is checked *before* the
// row is added. Otherwise groupBytes only reflects rows already in the
// group, and a fat blob (20-30 MB) can push the group an entire row
// past DEFAULT_MAX_GROUP_BYTES before the next iteration flushes.
const rowBytes = estimateRowBytes(row)
const wouldOverflowBytes = group.length > 0 && groupBytes + rowBytes > DEFAULT_MAX_GROUP_BYTES
if (clusterColumns && groupKeys) {
const key = clusterKeyOf(row, clusterColumns)
const overflowKeys = !groupKeys.has(key) && groupKeys.size >= DEFAULT_MAX_CLUSTER_KEYS
if (group.length > 0 && (overflowKeys || group.length >= DEFAULT_MAX_ROWS_PER_GROUP || wouldOverflowBytes)) {
flushGroup()
}
groupKeys.add(key)
} else if (group.length > 0 && (group.length >= DEFAULT_MAX_ROWS_PER_GROUP || wouldOverflowBytes)) {
flushGroup()
}
group.push(row)
groupBytes += rowBytes
}
flushGroup()

pq.finish()
const bytes = new Uint8Array(writer.getBuffer())

const filename = `${partitionFilename(partition)}.${EXTENSION}`
span.setAttribute('row_count', rows.length)
span.setAttribute('row_count', rowCount)
span.setAttribute('row_group_count', rowGroupCount)
span.setAttribute('bytes_written', bytes.byteLength)
span.setAttribute('hyp_sink_filename', filename)
span.setStatus({ code: SpanStatusCode.OK })
Expand All @@ -174,7 +251,7 @@ async function encodePartition(partition, ctx, settings) {
filename,
bytes,
bytesWritten: bytes.byteLength,
rowCount: rows.length,
rowCount,
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
Expand All @@ -197,18 +274,72 @@ async function encodePartition(partition, ctx, settings) {
}

/**
* Drain an async iterable of rows into an in-memory array. The Phase 8.3
* smoke writes 50 rows, so a single-array buffer is fine; larger
* deployments lean on the row-group splitting inside hyparquet-writer.
* Cheap, allocation-free estimate of a row's in-memory footprint in bytes.
* Used only to bound how much a group accumulates before it is flushed as a
* row group, so precision matters less than never under-counting a fat blob.
* Mirrors the cache compactor's estimator.
*
* @param {Record<string, unknown>} row
* @returns {number}
*/
function estimateRowBytes(row) {
let total = 0
for (const value of Object.values(row)) total += estimateValueBytes(value)
return total
}

/**
* @param {unknown} value
* @returns {number}
*/
function estimateValueBytes(value) {
if (value === null || value === undefined) return 0
switch (typeof value) {
case 'string':
return value.length * 2 // JS strings are UTF-16 internally
case 'number':
return 8
case 'bigint':
return 16
case 'boolean':
return 4
case 'object': {
if (value instanceof Uint8Array) return value.byteLength
let total = 0
if (Array.isArray(value)) {
for (const item of value) total += estimateValueBytes(item)
return total
}
for (const [k, v] of Object.entries(value)) {
total += k.length * 2 + estimateValueBytes(v)
}
return total
}
default:
return 0
}
}

/**
* Stable grouping key from a row's cluster columns. Consistent for equal
* tuples; never persisted. NUL-separated to keep distinct tuples distinct.
*
* @param {AsyncIterable<Record<string, unknown>>} source
* @returns {Promise<Record<string, unknown>[]>}
* @param {Record<string, unknown>} row
* @param {readonly string[]} clusterColumns
* @returns {string}
*/
async function collectRows(source) {
/** @type {Record<string, unknown>[]} */
const rows = []
for await (const row of source) rows.push(row)
return rows
function clusterKeyOf(row, clusterColumns) {
let key = ''
for (const col of clusterColumns) {
const v = row[col]
const part = typeof v === 'string'
? v
: typeof v === 'bigint'
? v.toString()
: JSON.stringify(v ?? null)
key += part + '\u0000'
}
return key
}

/**
Expand Down
3 changes: 2 additions & 1 deletion hypaware-core/plugins-workspace/local-fs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Buffer } from 'node:buffer'
import fs from 'node:fs/promises'
import path from 'node:path'

import { encodePartition } from 'hypaware/core/sinks'
import { encodePartition, clusterColumnsForDataset } from 'hypaware/core/sinks'

import { createLocalFsBlobStore, resolveExportsBaseDir } from './blob-store.js'

Expand Down Expand Up @@ -107,6 +107,7 @@ function buildSink({ baseDir, encoder, sinkCtx, query, storage }) {
tempDir: sinkCtx.paths.tempDir,
columns,
rows,
clusterColumns: clusterColumnsForDataset(query, partition.dataset),
sinkInstance: sinkCtx.name,
plugin: PLUGIN_NAME,
})
Expand Down
3 changes: 2 additions & 1 deletion hypaware-core/plugins-workspace/s3/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { Buffer } from 'node:buffer'

import { encodePartition } from 'hypaware/core/sinks'
import { encodePartition, clusterColumnsForDataset } from 'hypaware/core/sinks'

import {
createS3BlobStore,
Expand Down Expand Up @@ -270,6 +270,7 @@ function buildSink({ config, client, encoder, sinkCtx, query, storage }) {
tempDir: sinkCtx.paths.tempDir,
columns,
rows,
clusterColumns: clusterColumnsForDataset(query, partition.dataset),
sinkInstance: sinkCtx.name,
plugin: PLUGIN_NAME,
})
Expand Down
28 changes: 27 additions & 1 deletion src/core/sinks/encoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,34 @@
import { Attr, withSpan } from '../observability/index.js'

/**
* @import { ColumnSpec, QueryPartition, SinkEncodeContext, SinkEncodedBlob, SinkEncoder } from '../../../collectivus-plugin-kernel-types.d.ts'
* @import { ColumnSpec, QueryPartition, QueryRegistry, SinkEncodeContext, SinkEncodedBlob, SinkEncoder } from '../../../collectivus-plugin-kernel-types.d.ts'
*/

/**
* Derive the cluster columns for a dataset from its Iceberg partition fields
* (e.g. `conversation_id`/`cwd`/`date`). Blob destinations pass these into the
* encode context so columnar encoders can keep each row group low-cardinality
* — which keeps wide, heavily-repeated columns (`tools`, `system_text`)
* dictionary-encoded instead of falling back to PLAIN. Returns undefined for
* datasets without a partition declaration (encoders then use default row
* grouping). Shared by the `local-fs` and `s3` blob destinations.
*
* @param {QueryRegistry} query
* @param {string} dataset
* @returns {string[] | undefined}
*/
export function clusterColumnsForDataset(query, dataset) {
const reg = /** @type {{ cachePartitioning?: { iceberg?: { fields?: Array<{ column?: unknown }> } } }} */ (
query.getDataset(dataset)
)
const fields = reg?.cachePartitioning?.iceberg?.fields
if (!Array.isArray(fields)) return undefined
const cols = fields
.map((f) => f.column)
.filter(/** @returns {c is string} */ (c) => typeof c === 'string' && c.length > 0)
return cols.length > 0 ? cols : undefined
}

/**
* Wrap a single `encoder.encodePartition` call in a
* `sink.encode_partition` span. Blob-sink `Sink.exportBatch`
Expand Down Expand Up @@ -48,6 +73,7 @@ export async function encodePartition(encoder, partition, ctx) {
tempDir: ctx.tempDir,
columns: ctx.columns,
rows: ctx.rows,
clusterColumns: ctx.clusterColumns,
})
const rowCount = blob.rowCount ?? 0
const bytesWritten = blob.bytesWritten ?? bytesLengthOf(blob.bytes)
Expand Down
2 changes: 1 addition & 1 deletion src/core/sinks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
* ```
*/

export { encodePartition } from './encoder.js'
export { encodePartition, clusterColumnsForDataset } from './encoder.js'
Loading