Stop parquet exports bloating & OOMing on wide repeated columns#90
Open
bgmcmullen wants to merge 1 commit into
Open
Stop parquet exports bloating & OOMing on wide repeated columns#90bgmcmullen wants to merge 1 commit into
bgmcmullen wants to merge 1 commit into
Conversation
…olumns `hyp sink force` produced multi-GB Parquet files and risked OOM. The `ai_gateway_messages` cache denormalizes request-level columns (`tools`, `system_text`) onto every message-part row; on export these collapsed to PLAIN encoding and were stored in full on every row (e.g. `tools` at ~1.3 GB / 88% of one file). Three independent causes, all in the local-fs/s3 + format-parquet path: 1. JSON columns lost dictionary encoding. The cache stores JSON columns as Iceberg `variant`, so the reader hands them back as parsed objects. hyparquet-writer keys dictionary values by reference, so structurally- identical objects counted as distinct and the column fell back to PLAIN. Intern identical-content objects to one shared reference so they dictionary-encode, while the JSON logical type still round-trips the original object to readers. (Stringifying instead would double-encode the object into a JSON-text string and change queryable output.) 2. Row groups spanned the whole partition, so even string columns exceeded the writer's ~1 MiB dictionary-page cap and fell back to PLAIN. Cluster row groups by the dataset's Iceberg partition columns (conversation id / date) so each group stays low-cardinality and the dictionary survives. 3. The encoder buffered the entire partition (collectRows + columnar copy) before writing. Stream row-group-by-row-group via ParquetWriter, flushing on a distinct-cluster-key budget, row cap, or a pre-add 32 MB byte cap, so peak heap is bounded to one group regardless of partition size. Verified on real data: source=claude export 1.4 GB -> ~150 MB, `tools` 1.34 GB -> ~15 MB (RLE_DICTIONARY), no OOM. Cluster-column derivation is shared (`clusterColumnsForDataset`) and wired into both the local-fs and s3 blob destinations. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Contributor
Dual-agent review —
|
| Source | Finding (severity, evidence) | Intersects |
|---|---|---|
| Codex | Resource Lifecycle & Cleanup — encoder/S3 hold full compressed blob in memory; major, format-parquet/src/index.js:240, s3/src/index.js:288 |
Concurrency surface (output-side heap), Risks bullet 1 |
| Claude | clusterColumnsForDataset undefined/parse branches untested; minor, src/core/sinks/encoder.js:22 |
Direct callers (clustering-disable path), Config field chain, Risks bullet 2 |
Codex review
Fix Validations
JSON columns lost dictionary encoding
- Status: correct
- Evidence: hypaware-core/plugins-workspace/format-parquet/src/columns.js:25, hypaware-core/plugins-workspace/format-parquet/src/columns.js:35, hypaware-core/plugins-workspace/format-parquet/src/columns.js:57
- Assessment: JSON values are no longer stringified; serializable object/array values are interned by emitted JSON content so the writer can dictionary-encode repeated references while preserving object round-trip behavior.
Row groups spanned the whole partition
- Status: correct
- Evidence: src/core/sinks/encoder.js:22, hypaware-core/plugins-workspace/local-fs/src/index.js:105, hypaware-core/plugins-workspace/s3/src/index.js:268, hypaware-core/plugins-workspace/format-parquet/src/index.js:225
- Assessment: Cluster columns are derived from the dataset Iceberg partition fields and forwarded through both affected blob sinks, and the parquet encoder uses those keys to split row groups before distinct-key cardinality grows too high.
Encoder buffered all source rows before writing
- Status: incomplete
- Evidence: hypaware-core/plugins-workspace/format-parquet/src/index.js:218, hypaware-core/plugins-workspace/format-parquet/src/index.js:238, hypaware-core/plugins-workspace/format-parquet/src/index.js:187, hypaware-core/plugins-workspace/format-parquet/src/index.js:240
- Assessment: Source rows are now consumed group-by-group, which removes the old
collectRowspartition buffer. The final Parquet output is still accumulated in aByteWriterand returned as oneUint8Array, so peak heap is not actually bounded to one row group independent of final file size.
Findings
7) Resource Lifecycle & Cleanup
- Severity: major
- Confidence: high
- Evidence: hypaware-core/plugins-workspace/format-parquet/src/index.js:187, hypaware-core/plugins-workspace/format-parquet/src/index.js:240, hypaware-core/plugins-workspace/format-parquet/src/index.js:241, hypaware-core/plugins-workspace/s3/src/index.js:288
- Why it matters: Very large exports can still OOM because the encoder keeps the complete Parquet file in memory after row-group streaming, and S3 then materializes the blob before upload.
- Suggested fix: Stream encoded row-group bytes to the destination, or write to a temp file and return a stream/file handle; update local-fs/S3 to consume without
materializeBytesfor parquet exports.
No Finding
- Behavioral Correctness
- Contract & Interface Fidelity
- Change Impact / Blast Radius
- Concurrency, Ordering & State Safety
- Error Handling & Resilience
- Security Surface
- Release Safety
- Test Evidence Quality
- Architectural Consistency
- Debuggability & Operability
Evidence Bundle
- Changed hot paths: JSON coercion/interning, parquet row-group writer, sink encode context, local-fs export, S3 export.
- Impacted callers: hypaware-core/plugins-workspace/local-fs/src/index.js:105, hypaware-core/plugins-workspace/s3/src/index.js:268, src/core/sinks/encoder.js:71.
- Impacted tests: test/plugins/format-parquet-clustering.test.js:81, test/plugins/format-parquet-clustering.test.js:102, test/plugins/format-parquet-clustering.test.js:140, test/plugins/format-parquet-clustering.test.js:216, test/plugins/s3-export-batch.test.js:165.
- Unresolved uncertainty: I ran the two changed test files with
node --test; I did not run fullnpm testor a real large-file heap/acceptance smoke.
Claude review
Claude review
clusterColumnsForDataset parsing/undefined branches are untested
- Severity: minor
- Confidence: 82
- Evidence: src/core/sinks/encoder.js:373
- Why it matters:
clusterColumnsForDatasetis exactly the deterministic parsing CLAUDE.md says to unit-test, yet itsundefined-return branches (nocachePartitioning, non-arrayfields, all-non-string columns filtered to empty) — the "clustering disabled / default row grouping" path — are never exercised; only the two-valid-field happy path is covered indirectly via the s3 test, so a regression that returns[]instead ofundefined, or throws on a dataset with no partition declaration, would ship green. - Suggested fix: Add a direct unit test for
clusterColumnsForDataset(query, dataset)covering: dataset with nocachePartitioning->undefined;fieldsabsent/non-array ->undefined; fields whosecolumnvalues are all non-strings/empty ->undefined; and the valid case returning the column list in order.
Reports: /Users/phil/workspace/hypaware/.git/worktrees/dual-review-pr90/dual-review/pr-90
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
hyp sink forceproduced multi-GB Parquet files and risked OOM. Theai_gateway_messagescache denormalizes request-level columns (tools,system_text) onto every message-part row; on export these collapsed to PLAIN encoding and were stored in full on every row (e.g.toolsat ~1.3 GB / 88% of one file).Three independent causes, all in the local-fs/s3 + format-parquet path:
JSON columns lost dictionary encoding. The cache stores JSON columns as Iceberg
variant, so the reader hands them back as parsed objects. hyparquet-writer keys dictionary values by reference, so structurally- identical objects counted as distinct and the column fell back to PLAIN. Intern identical-content objects to one shared reference so they dictionary-encode, while the JSON logical type still round-trips the original object to readers. (Stringifying instead would double-encode the object into a JSON-text string and change queryable output.)Row groups spanned the whole partition, so even string columns exceeded the writer's ~1 MiB dictionary-page cap and fell back to PLAIN. Cluster row groups by the dataset's Iceberg partition columns (conversation id / date) so each group stays low-cardinality and the dictionary survives.
The encoder buffered the entire partition (collectRows + columnar copy) before writing. Stream row-group-by-row-group via ParquetWriter, flushing on a distinct-cluster-key budget, row cap, or a pre-add 32 MB byte cap, so peak heap is bounded to one group regardless of partition size.
Verified on real data: source=claude export 1.4 GB -> ~150 MB,
tools1.34 GB -> ~15 MB (RLE_DICTIONARY), no OOM.Cluster-column derivation is shared (
clusterColumnsForDataset) and wired into both the local-fs and s3 blob destinations.