Skip to content

Stop parquet exports bloating & OOMing on wide repeated columns#90

Open
bgmcmullen wants to merge 1 commit into
masterfrom
fix/parquet-sink-export-bloat-oom
Open

Stop parquet exports bloating & OOMing on wide repeated columns#90
bgmcmullen wants to merge 1 commit into
masterfrom
fix/parquet-sink-export-bloat-oom

Conversation

@bgmcmullen

Copy link
Copy Markdown
Contributor

Problem

hyp sink force produced multi-GB Parquet files and risked OOM. The ai_gateway_messages cache denormalizes request-level columns (tools, system_text) onto every message-part row; on export these collapsed to PLAIN encoding and were stored in full on every row (e.g. tools at ~1.3 GB / 88% of one file).

Three independent causes, all in the local-fs/s3 + format-parquet path:

  1. JSON columns lost dictionary encoding. The cache stores JSON columns as Iceberg variant, so the reader hands them back as parsed objects. hyparquet-writer keys dictionary values by reference, so structurally- identical objects counted as distinct and the column fell back to PLAIN. Intern identical-content objects to one shared reference so they dictionary-encode, while the JSON logical type still round-trips the original object to readers. (Stringifying instead would double-encode the object into a JSON-text string and change queryable output.)

  2. Row groups spanned the whole partition, so even string columns exceeded the writer's ~1 MiB dictionary-page cap and fell back to PLAIN. Cluster row groups by the dataset's Iceberg partition columns (conversation id / date) so each group stays low-cardinality and the dictionary survives.

  3. The encoder buffered the entire partition (collectRows + columnar copy) before writing. Stream row-group-by-row-group via ParquetWriter, flushing on a distinct-cluster-key budget, row cap, or a pre-add 32 MB byte cap, so peak heap is bounded to one group regardless of partition size.

Verified on real data: source=claude export 1.4 GB -> ~150 MB, tools 1.34 GB -> ~15 MB (RLE_DICTIONARY), no OOM.

Cluster-column derivation is shared (clusterColumnsForDataset) and wired into both the local-fs and s3 blob destinations.

…olumns

`hyp sink force` produced multi-GB Parquet files and risked OOM. The
`ai_gateway_messages` cache denormalizes request-level columns (`tools`,
`system_text`) onto every message-part row; on export these collapsed to
PLAIN encoding and were stored in full on every row (e.g. `tools` at
~1.3 GB / 88% of one file).

Three independent causes, all in the local-fs/s3 + format-parquet path:

1. JSON columns lost dictionary encoding. The cache stores JSON columns as
   Iceberg `variant`, so the reader hands them back as parsed objects.
   hyparquet-writer keys dictionary values by reference, so structurally-
   identical objects counted as distinct and the column fell back to PLAIN.
   Intern identical-content objects to one shared reference so they
   dictionary-encode, while the JSON logical type still round-trips the
   original object to readers. (Stringifying instead would double-encode the
   object into a JSON-text string and change queryable output.)

2. Row groups spanned the whole partition, so even string columns exceeded
   the writer's ~1 MiB dictionary-page cap and fell back to PLAIN. Cluster
   row groups by the dataset's Iceberg partition columns (conversation id /
   date) so each group stays low-cardinality and the dictionary survives.

3. The encoder buffered the entire partition (collectRows + columnar copy)
   before writing. Stream row-group-by-row-group via ParquetWriter, flushing
   on a distinct-cluster-key budget, row cap, or a pre-add 32 MB byte cap, so
   peak heap is bounded to one group regardless of partition size.

Verified on real data: source=claude export 1.4 GB -> ~150 MB, `tools`
1.34 GB -> ~15 MB (RLE_DICTIONARY), no OOM.

Cluster-column derivation is shared (`clusterColumnsForDataset`) and wired
into both the local-fs and s3 blob destinations.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@bgmcmullen bgmcmullen requested a review from philcunliffe June 8, 2026 23:28
@philcunliffe

Copy link
Copy Markdown
Contributor

Dual-agent review — request_changes

  • Verdict: request_changes
  • Risk class: medium
  • Auto-merge advisory: 👎 thumbs down — verdict is request_changes; needs human-gated follow-up

Advisory only: no merge was attempted.

Risk capstone

Cross-reference: reviewer findings vs high-risk surfaces

Source Finding (severity, evidence) Intersects
Codex Resource Lifecycle & Cleanup — encoder/S3 hold full compressed blob in memory; major, format-parquet/src/index.js:240, s3/src/index.js:288 Concurrency surface (output-side heap), Risks bullet 1
Claude clusterColumnsForDataset undefined/parse branches untested; minor, src/core/sinks/encoder.js:22 Direct callers (clustering-disable path), Config field chain, Risks bullet 2
Codex review

Fix Validations

JSON columns lost dictionary encoding

  • Status: correct
  • Evidence: hypaware-core/plugins-workspace/format-parquet/src/columns.js:25, hypaware-core/plugins-workspace/format-parquet/src/columns.js:35, hypaware-core/plugins-workspace/format-parquet/src/columns.js:57
  • Assessment: JSON values are no longer stringified; serializable object/array values are interned by emitted JSON content so the writer can dictionary-encode repeated references while preserving object round-trip behavior.

Row groups spanned the whole partition

  • Status: correct
  • Evidence: src/core/sinks/encoder.js:22, hypaware-core/plugins-workspace/local-fs/src/index.js:105, hypaware-core/plugins-workspace/s3/src/index.js:268, hypaware-core/plugins-workspace/format-parquet/src/index.js:225
  • Assessment: Cluster columns are derived from the dataset Iceberg partition fields and forwarded through both affected blob sinks, and the parquet encoder uses those keys to split row groups before distinct-key cardinality grows too high.

Encoder buffered all source rows before writing

  • Status: incomplete
  • Evidence: hypaware-core/plugins-workspace/format-parquet/src/index.js:218, hypaware-core/plugins-workspace/format-parquet/src/index.js:238, hypaware-core/plugins-workspace/format-parquet/src/index.js:187, hypaware-core/plugins-workspace/format-parquet/src/index.js:240
  • Assessment: Source rows are now consumed group-by-group, which removes the old collectRows partition buffer. The final Parquet output is still accumulated in a ByteWriter and returned as one Uint8Array, so peak heap is not actually bounded to one row group independent of final file size.

Findings

7) Resource Lifecycle & Cleanup

  • Severity: major
  • Confidence: high
  • Evidence: hypaware-core/plugins-workspace/format-parquet/src/index.js:187, hypaware-core/plugins-workspace/format-parquet/src/index.js:240, hypaware-core/plugins-workspace/format-parquet/src/index.js:241, hypaware-core/plugins-workspace/s3/src/index.js:288
  • Why it matters: Very large exports can still OOM because the encoder keeps the complete Parquet file in memory after row-group streaming, and S3 then materializes the blob before upload.
  • Suggested fix: Stream encoded row-group bytes to the destination, or write to a temp file and return a stream/file handle; update local-fs/S3 to consume without materializeBytes for parquet exports.

No Finding

  1. Behavioral Correctness
  2. Contract & Interface Fidelity
  3. Change Impact / Blast Radius
  4. Concurrency, Ordering & State Safety
  5. Error Handling & Resilience
  6. Security Surface
  7. Release Safety
  8. Test Evidence Quality
  9. Architectural Consistency
  10. Debuggability & Operability

Evidence Bundle

  • Changed hot paths: JSON coercion/interning, parquet row-group writer, sink encode context, local-fs export, S3 export.
  • Impacted callers: hypaware-core/plugins-workspace/local-fs/src/index.js:105, hypaware-core/plugins-workspace/s3/src/index.js:268, src/core/sinks/encoder.js:71.
  • Impacted tests: test/plugins/format-parquet-clustering.test.js:81, test/plugins/format-parquet-clustering.test.js:102, test/plugins/format-parquet-clustering.test.js:140, test/plugins/format-parquet-clustering.test.js:216, test/plugins/s3-export-batch.test.js:165.
  • Unresolved uncertainty: I ran the two changed test files with node --test; I did not run full npm test or a real large-file heap/acceptance smoke.
Claude review

Claude review

clusterColumnsForDataset parsing/undefined branches are untested

  • Severity: minor
  • Confidence: 82
  • Evidence: src/core/sinks/encoder.js:373
  • Why it matters: clusterColumnsForDataset is exactly the deterministic parsing CLAUDE.md says to unit-test, yet its undefined-return branches (no cachePartitioning, non-array fields, all-non-string columns filtered to empty) — the "clustering disabled / default row grouping" path — are never exercised; only the two-valid-field happy path is covered indirectly via the s3 test, so a regression that returns [] instead of undefined, or throws on a dataset with no partition declaration, would ship green.
  • Suggested fix: Add a direct unit test for clusterColumnsForDataset(query, dataset) covering: dataset with no cachePartitioning -> undefined; fields absent/non-array -> undefined; fields whose column values are all non-strings/empty -> undefined; and the valid case returning the column list in order.

Reports: /Users/phil/workspace/hypaware/.git/worktrees/dual-review-pr90/dual-review/pr-90

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants