From 9a1f6681e9d0ce596aeab0efae43ac0dc8b1f8b0 Mon Sep 17 00:00:00 2001 From: Phillip Cunliffe Date: Tue, 9 Jun 2026 15:35:11 -0700 Subject: [PATCH] feat(sinks): partition iceberg exports by day with conversation sort MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lay out @hypaware/format-iceberg exports for an archive's job, not the cache's: partition by day(primaryTimestampColumn) — a writer-owned default, not the cache's conversation_id-identity cachePartitioning, which sets an unbounded ~1-file-per-conversation floor compaction can't beat — and sort each day partition by the dataset's lookup columns (conversation_id-led) so a conversation lookup prunes row groups by min/max instead of needing a partition per conversation. - Promote partitionSpecForDeclaration + validatePartitionSpecStability (and the declaration type) from src/core/cache/iceberg to a shared src/core/iceberg home, re-exported from src/core/index.js: they are core surface consumed by the registry, cache, plugin types, and now the export (LLP 0003). - format-iceberg derives the day grain + sort order per dataset at commit time, creates the table with both, and rejects partition-spec drift on append (iceberg_partition_spec_drift). Emits hyp_partition_spec and hyp_sort_order on commit spans. - Reframe maintenance compaction: available via icebergRewrite but not run in-daemon and not needed for a day grain (was "blocked by icebird"). Spec: LLP 0022 (rewritten from the abandoned cache-parity decision); xrefs in LLP 0014 and 0003. Tests: 10 (derivation + drift through the real icebird write path) plus a passing iceberg_export_partitioned_local_fs smoke asserting the layout and hyp_partition_spec. Clustering (icebird #22) and read pruning (#20/#21) require a published icebird containing commit 3edb15b; the package.json pin must move off 0.8.5 before those benefits land. The code degrades gracefully on 0.8.5 — partitioning and drift work; the sort order is recorded but inert. Co-Authored-By: Claude Opus 4.8 (1M context) --- collectivus-plugin-kernel-types.d.ts | 2 +- .../format-iceberg/src/commit.js | 45 ++- .../format-iceberg/src/maintenance.js | 9 +- .../format-iceberg/src/partitioning.js | 98 +++++++ .../format-iceberg/src/table-format.js | 12 +- .../format-iceberg/src/types.d.ts | 23 +- .../smoke/flows/iceberg_export_local_fs.js | 2 +- .../iceberg_export_partitioned_local_fs.js | 224 +++++++++++++++ llp/0003-core-vs-plugin-surface.spec.md | 13 + llp/0014-sinks.spec.md | 9 + llp/0022-iceberg-export-partitioning.spec.md | 270 ++++++++++++++++++ src/core/cache/iceberg/schema.js | 89 +----- src/core/cache/iceberg/store.js | 6 +- src/core/cache/types.d.ts | 21 +- src/core/iceberg/partition-spec.js | 96 +++++++ src/core/iceberg/types.d.ts | 27 ++ src/core/index.d.ts | 4 + src/core/index.js | 3 + test/core/cache-iceberg-schema.test.js | 4 +- test/plugins/iceberg-partitioning.test.js | 232 +++++++++++++++ 20 files changed, 1072 insertions(+), 117 deletions(-) create mode 100644 hypaware-core/plugins-workspace/format-iceberg/src/partitioning.js create mode 100644 hypaware-core/smoke/flows/iceberg_export_partitioned_local_fs.js create mode 100644 llp/0022-iceberg-export-partitioning.spec.md create mode 100644 src/core/iceberg/partition-spec.js create mode 100644 src/core/iceberg/types.d.ts create mode 100644 test/plugins/iceberg-partitioning.test.js diff --git a/collectivus-plugin-kernel-types.d.ts b/collectivus-plugin-kernel-types.d.ts index 416a38c..8385c64 100644 --- a/collectivus-plugin-kernel-types.d.ts +++ b/collectivus-plugin-kernel-types.d.ts @@ -16,7 +16,7 @@ */ import type { AsyncDataSource, ScanOptions, ScanResults } from 'squirreling' -import type { CachePartitioningDeclaration } from './src/core/cache/types.d.ts' +import type { CachePartitioningDeclaration } from './src/core/iceberg/types.d.ts' export type { AsyncDataSource, ScanOptions, ScanResults } diff --git a/hypaware-core/plugins-workspace/format-iceberg/src/commit.js b/hypaware-core/plugins-workspace/format-iceberg/src/commit.js index e86fd5b..ff5fae3 100644 --- a/hypaware-core/plugins-workspace/format-iceberg/src/commit.js +++ b/hypaware-core/plugins-workspace/format-iceberg/src/commit.js @@ -7,12 +7,14 @@ import { loadLatestFileCatalogMetadata, } from 'icebird' +import { validatePartitionSpecStability } from '../../../../src/core/iceberg/partition-spec.js' + import { icebergSchemaForColumns, mergeFieldIdsFromTable, rowsToIcebergRecords } from './schema.js' /** * @import { ColumnSpec } from '../../../../collectivus-plugin-kernel-types.d.ts' - * @import { CommitInput, CommitResult, TableState } from './types.d.ts' - * @import { Lister, Resolver, Snapshot, TableMetadata } from 'icebird/src/types.js' + * @import { CommitInput, CommitResult, DatasetPartitioning, TableState } from './types.d.ts' + * @import { Lister, PartitionSpec, Resolver, Snapshot, TableMetadata } from 'icebird/src/types.js' */ /** @@ -80,15 +82,34 @@ export async function commitBatch(input, priorState) { if (!priorState.exists) { try { + // @ref LLP 0022#partition-derivation — create with the writer-owned + // day-grain partitionSpec + conversation sort order. Both default to + // unpartitioned/unsorted in icebird when `partitioning` is absent. await icebergCreateTable({ catalog, tableUrl: input.tableUrl, schema: targetSchema, formatVersion: 3, + partitionSpec: input.partitioning?.partitionSpec, + sortOrder: input.partitioning?.sortOrder, }) } catch (err) { throw wrapCommitError(err, 'iceberg_commit_failed', `create table failed at '${input.tableUrl}'`) } + } else if (input.partitioning && priorState.metadata) { + // @ref LLP 0022#drift-rejection — an existing table whose partition spec no + // longer matches the dataset's derived day grain is rejected; the export + // cannot retroactively repartition object-store data files. [constrained-by] + const existingSpec = currentPartitionSpec(priorState.metadata) ?? { 'spec-id': 0, fields: [] } + try { + validatePartitionSpecStability(input.partitioning.declaration, existingSpec, targetSchema) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + throw newError( + 'iceberg_partition_spec_drift', + `iceberg-format: partition spec drift at '${input.tableUrl}': ${message}` + ) + } } /** @type {TableMetadata} */ @@ -142,6 +163,7 @@ const DEFAULT_STREAM_ROW_LIMIT = 100_000 * rows: AsyncIterable>, * resolver: Resolver, * lister: Lister, + * partitioning?: DatasetPartitioning | null, * }} input * @param {{ exists: boolean, metadata: TableMetadata | null }} priorState * @param {{ batchByteLimit?: number, batchRowLimit?: number }} [opts] @@ -167,7 +189,7 @@ export async function commitRowStream(input, priorState, opts = {}) { async function flushBatch() { if (batch.length === 0) return const result = await commitBatch( - { tableUrl: input.tableUrl, columns: input.columns, rows: batch, resolver: input.resolver, lister: input.lister }, + { tableUrl: input.tableUrl, columns: input.columns, rows: batch, resolver: input.resolver, lister: input.lister, partitioning: input.partitioning }, state ) state = { exists: true, metadata: result.metadata } @@ -225,6 +247,23 @@ function schemaFromExistingMetadata(columns, metadata) { return mergeFieldIdsFromTable(columns, existing) } +/** + * Resolve the table's current `PartitionSpec` from metadata (the default spec, + * falling back to the last). Used for the on-append drift check. + * + * @param {TableMetadata} metadata + * @returns {PartitionSpec | undefined} + */ +function currentPartitionSpec(metadata) { + const specId = metadata['default-spec-id'] + const specs = metadata['partition-specs'] + if (specs?.length) { + const match = specs.find((s) => s['spec-id'] === specId) + return match ?? specs[specs.length - 1] + } + return undefined +} + /** * @param {unknown} value */ diff --git a/hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js b/hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js index 6d4c7e6..d03efaa 100644 --- a/hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js +++ b/hypaware-core/plugins-workspace/format-iceberg/src/maintenance.js @@ -117,9 +117,12 @@ export async function discoverExportDatasets(blobStore, prefix) { * Run export maintenance on all datasets under a prefix: snapshot * expiration per dataset, plus a compaction status report. * - * icebird V1 does not expose `rewrite-data-files` or `delete-data-files`, - * so compaction is not supported. The report signals - * `compactionSupported: false` so the CLI can surface a clear message. + * @ref LLP 0022#compaction — icebird now exposes `icebergRewrite` + * (read-rewrite compaction), but the export deliberately does not run it: + * day-grain partitioning already yields large files, and an in-daemon + * read-rewrite risks the OOM/blocking failure seen with the parquet sink. + * `compactionSupported: false` here means "not run by this sink" (out-of-band + * only), not "impossible". * * @param {{ * blobStore: BlobStore diff --git a/hypaware-core/plugins-workspace/format-iceberg/src/partitioning.js b/hypaware-core/plugins-workspace/format-iceberg/src/partitioning.js new file mode 100644 index 0000000..c42e229 --- /dev/null +++ b/hypaware-core/plugins-workspace/format-iceberg/src/partitioning.js @@ -0,0 +1,98 @@ +// @ts-check + +import { partitionSpecForDeclaration } from '../../../../src/core/iceberg/partition-spec.js' + +import { icebergSchemaForColumns } from './schema.js' + +/** + * @import { ColumnSpec, DatasetRegistration } from '../../../../collectivus-plugin-kernel-types.d.ts' + * @import { CachePartitioningDeclaration } from '../../../../src/core/iceberg/types.d.ts' + * @import { Schema, SortField, SortOrder } from 'icebird/src/types.js' + * @import { DatasetPartitioning } from './types.d.ts' + */ + +// @ref LLP 0022#partition-derivation — the export partitions by a writer-owned +// day grain on the dataset's primaryTimestampColumn, derived independently of +// the cache's `cachePartitioning` (which would impose an unbounded +// per-conversation file count on an archive). [implements] +/** + * Derive the export table's layout for a dataset: a `day(primaryTimestampColumn)` + * partition plus a within-partition sort on the dataset's lookup columns. + * Returns `null` when the dataset declares no `primaryTimestampColumn` present + * in its schema — that dataset exports unpartitioned (V1 behavior unchanged). + * + * @param {DatasetRegistration | undefined} reg + * @param {readonly ColumnSpec[]} columns + * @returns {DatasetPartitioning | null} + */ +export function derivePartitioning(reg, columns) { + if (!reg) return null + const tsColumn = typeof reg.primaryTimestampColumn === 'string' ? reg.primaryTimestampColumn : '' + if (!tsColumn) return null + // A primaryTimestampColumn that isn't in the exported schema can't anchor a + // day grain; fall back to unpartitioned rather than synthesize a bad spec. + if (!columns.some((c) => c.name === tsColumn)) return null + + const schema = icebergSchemaForColumns(columns) + /** @type {CachePartitioningDeclaration} */ + const declaration = { + source: { columns: [tsColumn] }, + iceberg: { fields: [{ column: tsColumn, transform: 'day', required: true }] }, + } + const partitionSpec = partitionSpecForDeclaration(declaration, schema) + const sortOrder = sortOrderForLookup(reg, schema) + return { + declaration, + partitionSpec, + sortOrder, + partitionSpecLabel: `day(${tsColumn})`, + sortOrderLabel: sortOrder.fields.map((f) => nameForSourceId(schema, f)).join(','), + } +} + +// @ref LLP 0022#within-partition-sort — cluster each day partition by the +// dataset's declared identity (lookup) columns so a conversation lookup prunes +// row groups by min/max, without the file-count cost of partitioning on it. +// This is the one place the export reads `cachePartitioning` — sort axis only. +// [implements] +/** + * Build a sort order from the dataset's declared identity columns + * (`cachePartitioning.iceberg.fields`, transform `identity`), in declared order. + * Returns an empty (unsorted) order when none apply — icebird treats that as a + * no-op, so an undeclared dataset is day-partitioned but unsorted. + * + * @param {DatasetRegistration} reg + * @param {Schema} schema + * @returns {SortOrder} + */ +function sortOrderForLookup(reg, schema) { + /** @type {Map} */ + const idByName = new Map(schema.fields.map((f) => [f.name, f.id])) + const declared = reg.cachePartitioning?.iceberg?.fields ?? [] + /** @type {SortField[]} */ + const fields = [] + for (const f of declared) { + if (f.transform !== 'identity') continue + const id = idByName.get(f.column) + if (id === undefined) continue + fields.push({ + 'source-id': id, + transform: 'identity', + direction: 'asc', + 'null-order': 'nulls-last', + }) + } + // order-id 0 is conventionally "unsorted"; a real order uses 1. + return fields.length > 0 ? { 'order-id': 1, fields } : { 'order-id': 0, fields: [] } +} + +/** + * @param {Schema} schema + * @param {SortField} field + * @returns {string} + */ +function nameForSourceId(schema, field) { + const id = field['source-id'] + const match = schema.fields.find((f) => f.id === id) + return match ? match.name : String(id) +} diff --git a/hypaware-core/plugins-workspace/format-iceberg/src/table-format.js b/hypaware-core/plugins-workspace/format-iceberg/src/table-format.js index f2e0830..9fdbb61 100644 --- a/hypaware-core/plugins-workspace/format-iceberg/src/table-format.js +++ b/hypaware-core/plugins-workspace/format-iceberg/src/table-format.js @@ -5,6 +5,7 @@ import { getTracer, SpanStatusCode } from '../../../../src/core/observability/in import { createBlobStoreIO, pathToKey, tableUrlForBlobPrefix } from './blob-io.js' import { commitBatch, commitRowStream, probeTable } from './commit.js' import { expireExportSnapshots, normalizeExportRetentionConfig } from './maintenance.js' +import { derivePartitioning } from './partitioning.js' import { loadMarker, markerKey, markerSubsumedBySnapshot, writeMarker } from './state.js' /** @@ -195,6 +196,11 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log, mai return { partitionsExported: partitions.length, bytesWritten: 0, status: 'skipped' } } + // @ref LLP 0022#partition-derivation — derived per dataset at commit time + // because `createSink` runs once for a sink that exports many datasets, so + // the spec cannot be resolved up front. [implements] + const partitioning = derivePartitioning(ctx.query.getDataset(dataset), columns) + const blobPrefix = joinKeys(pathToKey(prefix), sanitizeDataset(dataset)) const tableUrl = tableUrlForBlobPrefix(blobPrefix) // Track the most recent metadata.json write so the commit span can @@ -283,6 +289,10 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log, mai hyp_dataset: dataset, hyp_batch_id: batch.batchId, encoder_format: ctx.encoder.format, + // @ref LLP 0022#observability — surface the resolved layout so a smoke + // can assert what was written, not just that rows landed. + hyp_partition_spec: partitioning?.partitionSpecLabel ?? 'unpartitioned', + hyp_sort_order: partitioning?.sortOrderLabel ?? '', status: 'ok', ...destinationAttrs, }, @@ -290,7 +300,7 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log, mai async (span) => { try { const result = await commitRowStream( - { tableUrl, columns, rows: rowStream(), resolver, lister }, + { tableUrl, columns, rows: rowStream(), resolver, lister, partitioning }, { exists: priorState.exists, metadata: priorState.metadata } ) span.setAttribute('snapshot_id', result.snapshotId) diff --git a/hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts b/hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts index e8d3623..8986e39 100644 --- a/hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts +++ b/hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts @@ -1,5 +1,6 @@ import type { ColumnSpec } from '../../../../collectivus-plugin-kernel-types.d.ts' -import type { TableMetadata, Resolver, Lister } from 'icebird/src/types.js' +import type { TableMetadata, Resolver, Lister, PartitionSpec, SortOrder } from 'icebird/src/types.js' +import type { CachePartitioningDeclaration } from '../../../../src/core/iceberg/types.d.ts' export interface TableState { /** True when at least one metadata file is visible. */ @@ -8,6 +9,24 @@ export interface TableState { currentSnapshotId: string | undefined } +/** + * Writer-owned export layout for one dataset (LLP 0022): a day-grain partition + * derived from `primaryTimestampColumn`, plus a within-partition sort on the + * dataset's declared identity (lookup) columns. + */ +export interface DatasetPartitioning { + /** Synthesized day-grain declaration — kept for the on-append drift check. */ + declaration: CachePartitioningDeclaration + /** Iceberg partition spec passed to `icebergCreateTable`. */ + partitionSpec: PartitionSpec + /** Within-partition sort order; an empty order means unsorted (no-op). */ + sortOrder: SortOrder + /** Span label, e.g. `day(message_created_at)`. */ + partitionSpecLabel: string + /** Span label, e.g. `conversation_id,cwd,date` (empty when unsorted). */ + sortOrderLabel: string +} + export interface CommitInput { /** Table URL the resolver understands. */ tableUrl: string @@ -17,6 +36,8 @@ export interface CommitInput { rows: readonly Record[] resolver: Resolver lister: Lister + /** Day-grain partition + sort layout; absent ⇒ unpartitioned table. */ + partitioning?: DatasetPartitioning | null } export interface CommitResult { diff --git a/hypaware-core/smoke/flows/iceberg_export_local_fs.js b/hypaware-core/smoke/flows/iceberg_export_local_fs.js index 056701e..79a018b 100644 --- a/hypaware-core/smoke/flows/iceberg_export_local_fs.js +++ b/hypaware-core/smoke/flows/iceberg_export_local_fs.js @@ -272,7 +272,7 @@ export async function run({ harness, expect }) { (ds) => Array.isArray(ds) && ds.some((d) => d.dataset === DATASET) ) expect.that( - 'maintain: compactionSupported is false (icebird V1 limitation)', + 'maintain: compactionSupported is false (not run by this sink — out-of-band only, LLP 0022)', maintainReport.compactionSupported, (v) => v === false ) diff --git a/hypaware-core/smoke/flows/iceberg_export_partitioned_local_fs.js b/hypaware-core/smoke/flows/iceberg_export_partitioned_local_fs.js new file mode 100644 index 0000000..edb459b --- /dev/null +++ b/hypaware-core/smoke/flows/iceberg_export_partitioned_local_fs.js @@ -0,0 +1,224 @@ +// @ts-check + +import fs from 'node:fs/promises' +import fsSync from 'node:fs' +import path from 'node:path' +import { fileURLToPath } from 'node:url' + +import { loadLatestFileCatalogMetadata } from 'icebird' + +import { + Attr, + installObservability, + runRoot, +} from '../../../src/core/observability/index.js' +import { appendRowsToTable } from '../../../src/core/cache/iceberg/store.js' +import { createCommandRegistry } from '../../../src/core/registry/commands.js' +import { createKernelRuntime } from '../../../src/core/runtime/activation.js' +import { activatePlugins } from '../../../src/core/runtime/loader.js' +import { loadManifests } from '../../../src/core/manifest.js' +import { createBlobStoreIO, tableUrlForBlobPrefix } from '../../plugins-workspace/format-iceberg/src/blob-io.js' + +/** + * @import { ActivePlugin, BlobStore, ColumnSpec, DatasetRegistration, SinkEncoder, TableFormatProvider } from '../../../collectivus-plugin-kernel-types.d.ts' + */ + +const SMOKE_DIR = path.dirname(fileURLToPath(import.meta.url)) +const PLUGINS_WORKSPACE = path.resolve(SMOKE_DIR, '../../plugins-workspace') +const DATASET = 'iceberg_partitioned_rows' +const SINK_INSTANCE = 'iceberg_lake' + +/** @type {ColumnSpec[]} */ +const COLUMNS = [ + { name: 'conversation_id', type: 'STRING', nullable: false }, + { name: 'cwd', type: 'STRING', nullable: true }, + { name: 'message_created_at', type: 'TIMESTAMP', nullable: false }, + { name: 'date', type: 'STRING', nullable: false }, + { name: 'value', type: 'STRING', nullable: false }, +] + +// Four rows over two days and two conversations, interleaved so the +// within-partition conversation sort has work to do. +const ROWS = [ + { conversation_id: 'cB', cwd: '/x', message_created_at: '2026-06-04T10:00:00Z', date: '2026-06-04', value: 'b1' }, + { conversation_id: 'cA', cwd: '/x', message_created_at: '2026-06-04T09:00:00Z', date: '2026-06-04', value: 'a1' }, + { conversation_id: 'cB', cwd: '/x', message_created_at: '2026-06-05T10:00:00Z', date: '2026-06-05', value: 'b2' }, + { conversation_id: 'cA', cwd: '/x', message_created_at: '2026-06-05T09:00:00Z', date: '2026-06-05', value: 'a2' }, +] + +/** + * Hermetic smoke for `@hypaware/format-iceberg` day-grain partitioning + * (LLP 0022). Brings up the real `@hypaware/local-fs`, + * `@hypaware/format-parquet`, and `@hypaware/format-iceberg` plugin trees, + * registers a timestamped + lookup-keyed dataset, lands 4 rows over 2 days + * directly in the kernel cache, drives the table-format sink through + * `kernel.sinks.instantiate`, and asserts the exported table is laid out + * for an archive's job: + * + * - export succeeds (`status='exported'`, `bytesWritten > 0`). + * - the committed metadata carries a `day(message_created_at)` partition + * spec and a `conversation_id`-led sort order. + * - the 4 rows land in exactly 2 data files — one per day partition. + * - the `iceberg.table.create` span carries `hyp_partition_spec` and + * `hyp_sort_order` so the layout is observable, not just inferred. + * + * Note: rows are pre-populated via the direct cache write + * (`appendRowsToTable`) rather than `ctx.storage.appendRows`, so this flow + * exercises the export layout without depending on the spool-flush path. + * + * @param {{ harness: any, expect: any }} args + */ +export async function run({ harness, expect }) { + const obs = installObservability() + if (!obs.tracer.provider) { + throw new Error('iceberg_export_partitioned_local_fs: tracer not installed — expected HYP_DEV_TELEMETRY=1') + } + + const cacheRoot = path.join(harness.stateDir, 'cache') + const registry = createCommandRegistry() + const kernel = createKernelRuntime({ commandRegistry: registry, cacheRoot }) + + const destinationDir = path.join(harness.tmpDir, 'iceberg-out') + await fs.mkdir(destinationDir, { recursive: true }) + + const parquetDir = path.join(PLUGINS_WORKSPACE, 'format-parquet') + const localFsDir = path.join(PLUGINS_WORKSPACE, 'local-fs') + const icebergDir = path.join(PLUGINS_WORKSPACE, 'format-iceberg') + const tmpRoot = path.join(harness.tmpDir, 'plugin-temp') + await fs.mkdir(tmpRoot, { recursive: true }) + + await runRoot( + 'kernel.boot', + { + [Attr.COMPONENT]: 'kernel', + [Attr.OPERATION]: 'boot', + [Attr.SMOKE_NAME]: harness.smokeName, + [Attr.SMOKE_STEP]: 'plugin_activate', + [Attr.DEV_RUN_ID]: harness.devRunId, + status: 'ok', + }, + async () => { + const { loaded, failed } = await loadManifests([parquetDir, localFsDir, icebergDir]) + if (failed.length > 0) { + throw new Error(`manifest failures — ${failed.map((f) => `${f.manifestPath}: ${f.message}`).join('; ')}`) + } + const entries = loaded.map((l) => ({ + manifest: l.manifest, + rootDir: l.rootDir, + config: l.manifest.name === '@hypaware/local-fs' ? { exports_dir: destinationDir } : undefined, + })) + const result = await activatePlugins({ + plugins: entries, stateRoot: harness.stateDir, runId: harness.devRunId, runtime: kernel, tmpRoot, + }) + for (const r of result.results) { + if (!r.ok) throw new Error(`activate ${r.plugin.name} failed (${r.errorKind}): ${r.message}`) + } + } + ) + + // Register the dataset directly on the kernel registry and pre-populate + // the cache table the export will read. The day grain + sort are derived + // by the writer from this registration at commit time. + const tablePath = kernel.storage.cacheTablePath(DATASET) + /** @type {DatasetRegistration} */ + const dataset = { + name: DATASET, + plugin: '@hypaware/test-fixture', + schema: { columns: COLUMNS }, + primaryTimestampColumn: 'message_created_at', + cachePartitioning: { + source: { columns: ['conversation_id'], fallback: 'unknown' }, + iceberg: { + fields: [ + { column: 'conversation_id', transform: 'identity', required: true }, + { column: 'cwd', transform: 'identity' }, + { column: 'date', transform: 'identity', required: true }, + ], + }, + }, + discoverPartitions: () => [{ dataset: DATASET, partition: { partition: 'all' }, tablePath }], + createDataSource: () => ({ columns: COLUMNS.map((c) => c.name), numRows: 0, scan: () => ({ appliedWhere: false, appliedLimitOffset: false, async *rows() {} }) }), + } + kernel.query.registerDataset(dataset) + await appendRowsToTable(tablePath, COLUMNS, ROWS) + + const blobStore = /** @type {BlobStore} */ (kernel.capabilities.require('@hypaware/format-iceberg', 'hypaware.blob-store', '^1.0.0')) + const encoder = /** @type {SinkEncoder} */ (kernel.capabilities.require('@hypaware/format-iceberg', 'hypaware.encoder', '^1.0.0')) + const tableFormat = /** @type {TableFormatProvider} */ (kernel.capabilities.require('@hypaware/format-iceberg', 'hypaware.table-format', '^1.0.0')) + + /** @type {ActivePlugin} */ + const icebergPlugin = { + name: '@hypaware/format-iceberg', + version: '1.0.0', + manifest: { schema_version: 1, name: '@hypaware/format-iceberg', version: '1.0.0', hypaware_api: '^1.0.0', runtime: 'node', entrypoint: './src/index.js' }, + rootDir: icebergDir, + } + const handle = await kernel.sinks.instantiate({ + kind: 'table-format', instanceName: SINK_INSTANCE, tableFormat, + writerPlugin: '@hypaware/format-iceberg', destinationPlugin: '@hypaware/local-fs', + blobStore, encoder, + config: { schedule: '* * * * *', encoder: '@hypaware/format-parquet' }, + plugin: icebergPlugin, + paths: { + rootDir: icebergDir, + stateDir: path.join(harness.stateDir, 'plugins', '@hypaware/format-iceberg'), + cacheDir: path.join(harness.stateDir, 'cache', 'plugins', '@hypaware/format-iceberg'), + tempDir: path.join(tmpRoot, 'format-iceberg'), + }, + log: makeNoopLogger(), query: kernel.query, storage: kernel.storage, + }) + + const partitions = await dataset.discoverPartitions(/** @type {any} */ ({ cacheDir: kernel.storage.cacheRoot, scope: {} })) + const exportResult = await handle.sink.exportBatch( + { batchId: 'partitioned-batch-1', partitions }, + { format: 'iceberg', schedule: '* * * * *' } + ) + expect.that('export: status=exported', exportResult.status, (v) => v === 'exported') + expect.that('export: bytesWritten > 0', exportResult.bytesWritten, (v) => typeof v === 'number' && v > 0) + + // Read the committed table metadata back and assert the archive layout. + const tableUrl = tableUrlForBlobPrefix(`iceberg/datasets/${DATASET}`) + const { resolver, lister } = await createBlobStoreIO(blobStore) + const { metadata } = await loadLatestFileCatalogMetadata({ tableUrl, resolver, lister }) + + const spec = metadata['partition-specs'].find((s) => s['spec-id'] === metadata['default-spec-id']) + expect.that( + 'layout: partition spec is day(message_created_at)', + (spec?.fields ?? []).map((f) => `${f.transform}(${f.name})`).join(','), + (v) => v === 'day(message_created_at)' + ) + const order = (metadata['sort-orders'] ?? []).find((o) => o['order-id'] === metadata['default-sort-order-id']) + expect.that( + 'layout: default sort order leads with conversation_id', + order?.fields?.[0]?.['source-id'], + (v) => v === 1 + ) + + // 4 rows over 2 days ⇒ exactly 2 data files (one per day partition). + const dataDir = path.join(destinationDir, 'iceberg', 'datasets', DATASET, 'data') + const dataFiles = fsSync.readdirSync(dataDir).filter((f) => f.endsWith('.parquet')) + expect.that('layout: 4 rows landed in 2 day-partition files', dataFiles.length, (v) => v === 2) + + await obs.shutdown() + + // The layout is observable on the create span. + const traces = await expect.traces() + const createSpan = traces.find( + (t) => t.name === 'iceberg.table.create' && t.attributes?.hyp_sink_instance === SINK_INSTANCE + ) + expect.that('traces: iceberg.table.create span exists', createSpan, (v) => v !== undefined) + expect.that( + 'traces: hyp_partition_spec = day(message_created_at)', + createSpan?.attributes?.hyp_partition_spec, + (v) => v === 'day(message_created_at)' + ) + expect.that( + 'traces: hyp_sort_order clusters by conversation_id', + createSpan?.attributes?.hyp_sort_order, + (v) => typeof v === 'string' && v.startsWith('conversation_id') + ) +} + +function makeNoopLogger() { + return { debug() {}, info() {}, warn() {}, error() {} } +} diff --git a/llp/0003-core-vs-plugin-surface.spec.md b/llp/0003-core-vs-plugin-surface.spec.md index 4383064..fbbcc10 100644 --- a/llp/0003-core-vs-plugin-surface.spec.md +++ b/llp/0003-core-vs-plugin-surface.spec.md @@ -37,6 +37,19 @@ dataset gets the same query and formatting behavior for free. The local query cache ([LLP 0013](./0013-local-query-cache.decision.md)) is not a plugin and never appears in `plugins[]`. +**Partition-spec derivation is core surface.** The helpers that turn a dataset's +partitioning declaration into an Iceberg `PartitionSpec` and guard its stability +— `partitionSpecForDeclaration` and `validatePartitionSpecStability`, with the +declaration type — began life under `src/core/cache/iceberg/` but are pure +functions of `(declaration, schema)` consumed across the boundary: the dataset +registry validates declarations, the public plugin surface types them +(`DatasetRegistration.cachePartitioning`), the intrinsic cache derives its spec, +and the `@hypaware/format-iceberg` export derives its own +([LLP 0022](./0022-iceberg-export-partitioning.spec.md#shared-core-helpers)). +They are therefore promoted to a neutral core home re-exported from +`src/core/index.js`, not buried in the cache — the cache is one consumer, not the +owner. + ## Plugins own Domain behavior only, expressed through what they `require`, `provide`, and diff --git a/llp/0014-sinks.spec.md b/llp/0014-sinks.spec.md index a85184d..ea36cbf 100644 --- a/llp/0014-sinks.spec.md +++ b/llp/0014-sinks.spec.md @@ -47,6 +47,15 @@ plugin doesn't know whether it holds Parquet, JSONL, or an Iceberg manifest. Iceberg-on-S3 and Parquet-on-S3 share one S3 plugin. Adding GCS later is one plugin and every existing writer works. +## Export layout is the writer's, not the cache's + +A table-format writer lays data out for the **archive's** job, which is not the +cache's job. The `@hypaware/format-iceberg` writer partitions exported tables by +a writer-owned **day grain** and sorts each partition by the dataset's lookup +key — deliberately *not* inheriting the cache's `cachePartitioning`, which is +tuned for recent-query lookups and would impose an unbounded per-conversation +file count on an archive. See [LLP 0022](./0022-iceberg-export-partitioning.spec.md). + ## Export contract A sink implements an export contract — not a per-row writer: diff --git a/llp/0022-iceberg-export-partitioning.spec.md b/llp/0022-iceberg-export-partitioning.spec.md new file mode 100644 index 0000000..6c3d68d --- /dev/null +++ b/llp/0022-iceberg-export-partitioning.spec.md @@ -0,0 +1,270 @@ +# LLP 0022: Iceberg export partitioning (day grain + conversation sort) + +**Type:** Spec +**Status:** Draft +**Systems:** Sinks +**Author:** Phil / Claude +**Date:** 2026-06-09 +**Related:** LLP 0014, LLP 0013, LLP 0003, LLP 0015, LLP 0021 + +> Lay out exported Iceberg tables for an archive's job, not the cache's: +> **partition by day** (a writer-owned default from `primaryTimestampColumn`, +> *not* the cache's `cachePartitioning`) and **sort each partition by the +> dataset's lookup key** (`conversation_id`). Day grain bounds file count by +> time; the within-partition sort preserves conversation-lookup speed via +> row-group pruning instead of one-file-per-conversation partitioning. + +## Summary + +The intrinsic Iceberg-backed cache (`src/core/cache/iceberg/`) and the +`@hypaware/format-iceberg` export writer sit on the **same `icebird` engine** and +encode data files **identically**. They diverge on the read-side layout: the +cache **partitions** its tables (`icebergCreateTable({ …, partitionSpec })`, +`src/core/cache/iceberg/store.js:105`); the export creates tables with **no** +`partitionSpec` (`format-iceberg/src/commit.js:83`), so an exported table is one +flat, ever-growing set of data files. + +This spec gives the export a layout fit for an archive: + +1. **Partition by a writer-owned day grain** — `day(primaryTimestampColumn)`, + e.g. `day(message_created_at)` for ai-gateway. Independent of + `cachePartitioning`. Bounds file count by *time* (~58 partitions today, +1/day) + instead of by *conversation cardinality* (unbounded — see + [§Why day, not conversation_id](#why-day-not-conversation-id)). +2. **Sort each day partition by the dataset's lookup key** — `conversation_id` + for ai-gateway. A conversation's rows cluster inside the day file(s), so a + `conversation_id` lookup prunes row groups by their min/max bounds without + making `conversation_id` a partition boundary (and its file-count floor). See + [§Within-partition sort](#within-partition-sort). + +The cache and sinks have **different jobs** — the cache is the "recent-query +story," sinks are the "long-term-storage and downstream-integration story" +([LLP 0014](./0014-sinks.spec.md#sinks-are-export-targets-not-the-write-path)) — +so they *should* lay out differently. Parity-by-construction (the earlier, +**abandoned** design — inherit `cachePartitioning` so the export mirrors the +cache's `conversation_id:identity` partitioning) is rejected: it reproduces the +file-count pathology below. The export instead borrows `conversation_id` only as +a **sort** key, where it helps and costs nothing. + +## Why day, not conversation_id + +Measured from the local cache (`hyp query` over `ai_gateway_messages`, +2026-06-09): + +- **517,752 rows / 3,799 conversations / 58 distinct days**, ~1.8 KB/row, 3 + sources, ~3,917 `(conversation_id, cwd, date)` cache partitions. +- The cache's `conversation_id:identity` partition field sets a **hard floor of + ~1 data file per conversation**: ~4,523 live files at ~120–480 KB each + (another ~4,080 were an orphaned table generation — `du` over the directory + overcounts; trust snapshot `total-data-files`). +- **Compaction cannot beat that floor.** Iceberg compaction (icebird's + `icebergRewrite`) only merges files *within* a partition. A + conversation_id-identity partition holds one conversation, so the floor is one + object per conversation — **unbounded** in the number of conversations. +- `date` is **day-grained** (`2026-06-04`): ~25–39K rows/day. A day-grain + partition yields **dozens of multi-MB files per day**, with time-range pruning + and no compaction pressure. + +Conversation_id is the wrong *partition* axis for an archive on file-count +grounds alone. But it is the right *lookup* axis — recovered here as a sort key +(below), not a partition. + +## Partition derivation + +The export derives its partition spec **per dataset, at commit time** (a +`format-iceberg` sink has no single dataset — `createSink` runs once for many), +from the dataset's `DatasetRegistration` +(`collectivus-plugin-kernel-types.d.ts`): + +- If the dataset declares a `primaryTimestampColumn` present in its schema, + partition by **`day(primaryTimestampColumn)`**. For `ai_gateway_messages`, + `day(message_created_at)`. +- Otherwise export **unpartitioned** (V1 behavior unchanged for that dataset). A + dataset with no primary timestamp has no defensible day grain. + +The writer **synthesizes** the partition declaration — it does *not* read +`cachePartitioning` for the partition axis. It builds the declaration shape the +shared helper consumes, with one day-grained field, and feeds it to +`partitionSpecForDeclaration(declaration, schema)` (the exact cache helper, now +promoted to core — [§Shared core helpers](#shared-core-helpers)): + +```js +{ iceberg: { fields: [{ column: primaryTimestampColumn, transform: 'day', required: true }] } } +``` + +**Why `day(timestamp)`, not `identity(date)`.** ai-gateway carries a precomputed +day-grained `date` string, so `identity(date)` was on the table. It is rejected +as the rule: `day(primaryTimestampColumn)` is **general** to any timestamped +dataset, whereas `identity(date)` only works where a day column is precomputed +and would couple the export to a cache-ism. icebird buckets `day` correctly on +append (`groupByPartition` + `dayTransform`, verified). The cost is partition +values are UTC **day ordinals** (e.g. `…_day=20608`), not `date=2026-06-04` +strings — cosmetic in object keys; engines prune on values, and HypAware prunes +at the file/row-group level regardless ([§What this buys](#what-this-buys)). + +## Within-partition sort + +Conversation lookup is preserved by **clustering** a conversation's rows together +inside each day partition — a table **sort order** on the dataset's lookup +columns — not by partitioning on `conversation_id`. The export creates the table +with a `sortOrder`; icebird then **sorts every append by the table's default sort +order automatically** (`icebergAppend` → `prepareAppend` resolves +`default-sort-order-id`, sorts each partition group, and records the real +`sort_order_id`). No per-call threading. A `conversation_id` lookup then prunes +data files / row groups by their `conversation_id` min/max bounds +([§What this buys](#what-this-buys)), which a one-file-per-conversation partition +would have achieved only at the unbounded file-count cost above. + +**Sort-key derivation.** The dataset's lookup columns are exactly the identity +columns it already declares in `cachePartitioning.iceberg.fields`. The export +sorts by those identity columns, in declared order — for ai-gateway, +`conversation_id, cwd, date`, so `conversation_id` leads and dominates the +clustering. This is the one place the export **does** read `cachePartitioning`: +for the *sort* axis, where reusing the dataset's declared identity columns is +beneficial and carries none of the partition file-count cost. The *partition* +axis stays writer-owned and independent (see +[§Partition derivation](#partition-derivation)). A dataset with no +`cachePartitioning` is day-partitioned but unsorted — still file-count-bounded +and time-pruneable. + +**Files are locally, not globally, sorted.** Each append sorts only its own +rows, so a day partition touched by N export batches holds N internally-sorted +files whose `conversation_id` ranges may overlap. Row-group pruning still skips +most of them; a tighter global sort is available out-of-band via `icebergRewrite` +([§Compaction](#compaction)), not run in V1. + +Sort order is mutable table metadata, **not** partition spec, so introducing or +evolving it later is not partition-spec drift +([§Drift rejection](#drift-rejection)). + +## Shared core helpers + +`partitionSpecForDeclaration` and `validatePartitionSpecStability` (plus the +`CachePartitioningDeclaration` type) live under `src/core/cache/iceberg/schema.js` +and `src/core/cache/types.d.ts`, but are pure functions of `(declaration, schema)` +already consumed beyond the cache: the dataset registry validates declarations +(`src/core/registry/datasets.js`), the public plugin surface types them +(`DatasetRegistration.cachePartitioning`), and now the export derives a spec with +them. They are **core surface**. Promote both functions and the type to a neutral +core home re-exported from `src/core/index.js`; cache and export import from +there. Move + re-export, behavior unchanged. Recorded as an amendment to +[LLP 0003](./0003-core-vs-plugin-surface.spec.md), not a new Decision LLP. + +The `Cache…` prefix is a historical misnomer once the export consumes the type; +it keeps its name in V1 to avoid churning the dataset-registration surface. + +## Drift rejection + +On append to an existing table, the export validates that the dataset's derived +day-grain spec still matches the table's current `PartitionSpec` via +`validatePartitionSpecStability(declaration, existingSpec, schema)`, mirroring the +cache (`src/core/cache/iceberg/store.js:122-125`). A mismatch (a dataset that +gained/changed `primaryTimestampColumn`, or a table written before this spec) +fails with the stable `error_kind` **`iceberg_partition_spec_drift`**. V1 +**rejects** drift; partition *evolution* is unsupported. The operator path is to +export to a new table prefix; auto-roll is a possible future increment. (Sort- +order changes are not drift — see [§Within-partition sort](#within-partition-sort).) + +## No sink override + +A `format-iceberg` sink exports **multiple** datasets — each its own table, per +the "one source, one table" invariant +([LLP 0000](./0000-hypaware.explainer.md#cross-cutting-invariants)) — with +different schemas and timestamp columns. A single sink-level partition +declaration would be ambiguous across them; a per-dataset override map was +considered and dropped. Partitioning and clustering are **writer-owned defaults** +keyed off the dataset's own registration, not operator config. The export adds +**no new config** in V1. + +## What this buys + +With the partition + sort layout on an icebird carrying the read-pruning work +(#20/#21, see [§icebird dependency](#icebird-dependency)), day partitioning buys +HypAware's **own** read path — not just external engines: + +1. **Partition pruning** — `partitionMightMatch` skips data files whose day + ordinal can't match a time predicate (`icebergDataSource` applies it inside + `scan()`; transforms applied to filter literals). +2. **Row-group pruning** — WHERE is pushed to hyparquet, skipping row groups / + pages by column statistics. With the `conversation_id` sort, a + `conversation_id` predicate prunes most row groups in each day file. +3. **File-count control** — the immediate, layout-level win: bounded large files + instead of one-per-conversation sprawl, independent of any read path. +4. **External engines** (Trino/Spark/Snowflake) — real partition pruning on the + day ordinal from Iceberg manifest data. + +The same pin bump gives the **cache's** read path (`store.js`) the same pruning +for free — a bonus beyond this spec's scope. + +## Compaction + +icebird now exposes `icebergRewrite` (reads live rows, sorts globally under the +target spec, writes consolidated files, commits a replace snapshot). So +compaction is **available**, reframing the prior +`format-iceberg/src/maintenance.js:120` "blocked by icebird" status. But for a +day-partitioned archive it is **not needed** (partitions already hold large +files), and it is **not run in the daemon** — a full read-rewrite risks the +OOM/blocking failure already seen with the parquet sink (the encoder OOMed/blocked +the daemon; exports run manually with a large heap). V1 leaves it as an out-of-band tool that +would tighten the local-vs-global sort, nothing more. The maintenance report +should say "not needed / out-of-band," not "blocked." + +## Observability + +Emit the resolved partition spec and sort order on the iceberg sink's +`iceberg.table.create` / `iceberg.snapshot.commit` spans as **`hyp_partition_spec`** +(e.g. `day(message_created_at)`) and **`hyp_sort_order`** (e.g. +`conversation_id,cwd,date`), so a smoke can assert the intended layout was written +([LLP 0021](./0021-observability.spec.md)). On drift rejection, emit +`error_kind=iceberg_partition_spec_drift`. + +## icebird dependency + +All three enablers are implemented in icebird `master` (commit `3edb15b`, +"Scan pruning and sort-on-write (#20, #21, #22)"), **as yet unpublished** (npm +tops out at `0.8.5` pinned / `0.8.8` latest): + +- **#20** data-file pruning via partition values + manifest bounds — `prune.js` + `partitionMightMatch` / `fileMightMatch`. +- **#21** row-group/page pruning via column stats + bloom filters — WHERE + pushdown in `icebergDataSource.scan()`. +- **#22** sort-on-append — `prepareAppend` sorts each partition group by the + table's `default-sort-order-id`; `icebergRewrite` for global compaction. + +**Landing requirement:** this work depends on a published icebird containing +`3edb15b` (e.g. `0.8.9`/`0.9.0`); the committed `package.json` pin moves from +`0.8.5` to that version. The bump is a **shared-engine** change — the cache rides +the same icebird — so the cache's tests and hermetic smokes must be re-run to +confirm no regression across the changed `create.js` / `commit.js` / `read.js` / +`transform.js`. During development this worktree builds against a local checkout +of icebird `master`; the pin is updated to the published version before merge. + +## Out of scope + +- **Daemon-run compaction** — available via `icebergRewrite` but deliberately not + run in-process (memory landmine). Out-of-band only. +- **Non-parquet data files** — Iceberg V1 is parquet-only. +- **External catalog integration** (REST/Glue/Nessie) — file-catalog only. +- **Renaming `cachePartitioning` / `CachePartitioningDeclaration`** — a separate + breaking change. + +## References + +- [LLP 0014](./0014-sinks.spec.md) — Sinks; iceberg as a table-format writer; the + cache-vs-sink jobs split this spec leans on. +- [LLP 0013](./0013-local-query-cache.decision.md) — the cache whose layout this + spec deliberately *diverges* from. +- [LLP 0015](./0015-query-and-datasets.spec.md) — datasets, + `primaryTimestampColumn`, queryable read. +- [LLP 0003](./0003-core-vs-plugin-surface.spec.md) — core/plugin boundary; + amended for the helper promotion ([§Shared core helpers](#shared-core-helpers)). +- [LLP 0021](./0021-observability.spec.md) — the span attributes above. +- Code: `src/core/cache/iceberg/store.js:100-126` (cache create+partitionSpec, + drift guard), `src/core/cache/iceberg/schema.js` + (`partitionSpecForDeclaration`, `validatePartitionSpecStability`), + `format-iceberg/src/commit.js:81-107` (export create+append), + `format-iceberg/src/table-format.js:184` (per-dataset `exportDataset`), + `format-iceberg/src/maintenance.js:120` (compaction framing). +- icebird `master` `3edb15b` — `src/write/sort.js`, `src/write/stage.js`, + `src/prune.js`, `src/write/rewrite.js`. + diff --git a/src/core/cache/iceberg/schema.js b/src/core/cache/iceberg/schema.js index ab54fec..4cf43d7 100644 --- a/src/core/cache/iceberg/schema.js +++ b/src/core/cache/iceberg/schema.js @@ -2,8 +2,7 @@ /** * @import { ColumnSpec } from '../../../../collectivus-plugin-kernel-types.d.ts' - * @import { CachePartitioningDeclaration } from '../types.d.ts' - * @import { Field, IcebergType, PartitionSpec, PartitionTransform, Schema } from 'icebird/src/types.js' + * @import { Field, IcebergType, Schema } from 'icebird/src/types.js' */ /** @@ -33,47 +32,6 @@ export function icebergSchemaForColumns(columns) { return { type: 'struct', 'schema-id': 0, fields } } -const PARTITION_FIELD_ID_BASE = 1000 - -/** - * Translate a dataset's `CachePartitioningDeclaration` into the Iceberg - * `PartitionSpec` passed to `icebergCreateTable`. Partition field IDs - * start at `PARTITION_FIELD_ID_BASE` (1000) to stay distinct from schema - * field IDs, which start at 1. - * - * @param {CachePartitioningDeclaration} declaration - * @param {Schema} schema - * @returns {PartitionSpec} - */ -export function partitionSpecForDeclaration(declaration, schema) { - /** @type {Map} */ - const fieldsByName = new Map() - for (const f of schema.fields) { - fieldsByName.set(f.name, f) - } - /** @type {PartitionSpec['fields']} */ - const fields = [] - let partitionFieldId = PARTITION_FIELD_ID_BASE - for (const pf of declaration.iceberg.fields) { - const sf = fieldsByName.get(pf.column) - if (!sf) { - if (pf.required) { - throw new Error( - `cache-iceberg: required partition field "${pf.column}" not found in schema` - ) - } - continue - } - fields.push({ - 'source-id': sf.id, - 'field-id': partitionFieldId++, - name: pf.column, - transform: /** @type {PartitionTransform} */ (pf.transform), - }) - } - return { 'spec-id': 0, fields } -} - /** * Reconcile a `ColumnSpec[]` with an existing Iceberg table schema so * subsequent appends keep field IDs stable. The result is the schema to @@ -154,51 +112,6 @@ export function mergeFieldIdsFromTable(columns, existing, partitionColumns) { return { type: 'struct', 'schema-id': schemaId, fields } } -/** - * Validate that a `CachePartitioningDeclaration` still describes the - * existing `PartitionSpec`. Adding, removing, or changing a partition - * field is partition-spec evolution and must be a deliberate migration, - * not an accidental side effect. - * - * @param {CachePartitioningDeclaration} declaration - * @param {PartitionSpec} existingSpec - * @param {Schema} [schema] - */ -export function validatePartitionSpecStability(declaration, existingSpec, schema) { - const expectedSpec = schema - ? partitionSpecForDeclaration(declaration, schema) - : { - 'spec-id': existingSpec['spec-id'], - fields: declaration.iceberg.fields.map((field, index) => ({ - 'source-id': 0, - 'field-id': PARTITION_FIELD_ID_BASE + index, - name: field.column, - transform: /** @type {PartitionTransform} */ (field.transform), - })), - } - const expectedNames = new Set(expectedSpec.fields.map(f => f.name)) - for (const expected of expectedSpec.fields) { - const existing = existingSpec.fields.find(f => f.name === expected.name) - if (!existing) { - throw new Error( - `cache-iceberg: partition field "${expected.name}" is new — adding a partition field is spec evolution and requires an explicit migration` - ) - } - if (existing.transform !== expected.transform) { - throw new Error( - `cache-iceberg: partition field "${expected.name}" changed transform from ${existing.transform} to ${expected.transform} — partition spec changes require an explicit migration` - ) - } - } - for (const existing of existingSpec.fields) { - if (!expectedNames.has(existing.name)) { - throw new Error( - `cache-iceberg: partition field "${existing.name}" was removed — removing a partition field is spec evolution and requires an explicit migration` - ) - } - } -} - /** * Coerce each row into the Iceberg type system. The intrinsic cache * is strict: a `null` for a non-nullable column is a programmer diff --git a/src/core/cache/iceberg/store.js b/src/core/cache/iceberg/store.js index 8bb9f12..b807d91 100644 --- a/src/core/cache/iceberg/store.js +++ b/src/core/cache/iceberg/store.js @@ -16,10 +16,12 @@ import { createLocalIcebergIO, tableUrlForDir } from './resolver.js' import { icebergSchemaForColumns, mergeFieldIdsFromTable, - partitionSpecForDeclaration, rowsToIcebergRecords, - validatePartitionSpecStability, } from './schema.js' +import { + partitionSpecForDeclaration, + validatePartitionSpecStability, +} from '../../iceberg/partition-spec.js' /** * @import { ColumnSpec } from '../../../../collectivus-plugin-kernel-types.d.ts' diff --git a/src/core/cache/types.d.ts b/src/core/cache/types.d.ts index f8cb182..5842b27 100644 --- a/src/core/cache/types.d.ts +++ b/src/core/cache/types.d.ts @@ -1,6 +1,11 @@ import type { ColumnSpec, QueryScope, QueryStorageService } from '../../../collectivus-plugin-kernel-types.d.ts' import type { PartitionSpec } from 'icebird/src/types.js' import type { AsyncDataSource } from 'squirreling' +// Partitioning declaration promoted to a neutral core home +// (LLP 0003 / LLP 0022#shared-core-helpers). Re-exported here so existing +// cache importers keep their `../types.d.ts` path. +import type { CachePartitioningDeclaration, CachePartitionField } from '../iceberg/types.d.ts' +export type { CachePartitioningDeclaration, CachePartitionField } export interface PartitionCursor { epoch: number @@ -17,22 +22,6 @@ export interface PartitionCursor { } } -export interface CachePartitioningDeclaration { - source: { - columns: string[] - fallback?: string - } - iceberg: { - fields: CachePartitionField[] - } -} - -export interface CachePartitionField { - column: string - transform: 'identity' | 'day' | 'month' | 'year' | string - required?: boolean -} - export interface CachePartitionMeta { dataset: string partition: Record diff --git a/src/core/iceberg/partition-spec.js b/src/core/iceberg/partition-spec.js new file mode 100644 index 0000000..dd9100c --- /dev/null +++ b/src/core/iceberg/partition-spec.js @@ -0,0 +1,96 @@ +// @ts-check + +// @ref LLP 0022#shared-core-helpers — partition-spec derivation promoted out of +// the cache; consumed by the cache and the @hypaware/format-iceberg export +// alike, so it is core surface, not a cache internal (LLP 0003). [implements] + +/** + * @import { CachePartitioningDeclaration } from './types.d.ts' + * @import { Field, PartitionSpec, PartitionTransform, Schema } from 'icebird/src/types.js' + */ + +const PARTITION_FIELD_ID_BASE = 1000 + +/** + * Translate a dataset's `CachePartitioningDeclaration` into the Iceberg + * `PartitionSpec` passed to `icebergCreateTable`. Partition field IDs + * start at `PARTITION_FIELD_ID_BASE` (1000) to stay distinct from schema + * field IDs, which start at 1. + * + * @param {CachePartitioningDeclaration} declaration + * @param {Schema} schema + * @returns {PartitionSpec} + */ +export function partitionSpecForDeclaration(declaration, schema) { + /** @type {Map} */ + const fieldsByName = new Map() + for (const f of schema.fields) { + fieldsByName.set(f.name, f) + } + /** @type {PartitionSpec['fields']} */ + const fields = [] + let partitionFieldId = PARTITION_FIELD_ID_BASE + for (const pf of declaration.iceberg.fields) { + const sf = fieldsByName.get(pf.column) + if (!sf) { + if (pf.required) { + throw new Error( + `cache-iceberg: required partition field "${pf.column}" not found in schema` + ) + } + continue + } + fields.push({ + 'source-id': sf.id, + 'field-id': partitionFieldId++, + name: pf.column, + transform: /** @type {PartitionTransform} */ (pf.transform), + }) + } + return { 'spec-id': 0, fields } +} + +/** + * Validate that a `CachePartitioningDeclaration` still describes the + * existing `PartitionSpec`. Adding, removing, or changing a partition + * field is partition-spec evolution and must be a deliberate migration, + * not an accidental side effect. + * + * @param {CachePartitioningDeclaration} declaration + * @param {PartitionSpec} existingSpec + * @param {Schema} [schema] + */ +export function validatePartitionSpecStability(declaration, existingSpec, schema) { + const expectedSpec = schema + ? partitionSpecForDeclaration(declaration, schema) + : { + 'spec-id': existingSpec['spec-id'], + fields: declaration.iceberg.fields.map((field, index) => ({ + 'source-id': 0, + 'field-id': PARTITION_FIELD_ID_BASE + index, + name: field.column, + transform: /** @type {PartitionTransform} */ (field.transform), + })), + } + const expectedNames = new Set(expectedSpec.fields.map(f => f.name)) + for (const expected of expectedSpec.fields) { + const existing = existingSpec.fields.find(f => f.name === expected.name) + if (!existing) { + throw new Error( + `cache-iceberg: partition field "${expected.name}" is new — adding a partition field is spec evolution and requires an explicit migration` + ) + } + if (existing.transform !== expected.transform) { + throw new Error( + `cache-iceberg: partition field "${expected.name}" changed transform from ${existing.transform} to ${expected.transform} — partition spec changes require an explicit migration` + ) + } + } + for (const existing of existingSpec.fields) { + if (!expectedNames.has(existing.name)) { + throw new Error( + `cache-iceberg: partition field "${existing.name}" was removed — removing a partition field is spec evolution and requires an explicit migration` + ) + } + } +} diff --git a/src/core/iceberg/types.d.ts b/src/core/iceberg/types.d.ts new file mode 100644 index 0000000..1bcb744 --- /dev/null +++ b/src/core/iceberg/types.d.ts @@ -0,0 +1,27 @@ +// Shared Iceberg partitioning types. Promoted out of `src/core/cache/` because +// the declaration is dataset-owned core surface, not a cache internal: the +// dataset registry validates it, the public plugin surface types it +// (`DatasetRegistration.cachePartitioning`), the intrinsic cache derives a spec +// from it, and the `@hypaware/format-iceberg` export reuses it for its sort +// axis. See LLP 0003 (core vs plugin surface) and LLP 0022#shared-core-helpers. + +/** + * A dataset's declared partitioning. Consumed by the cache (to partition its + * tables) and by the iceberg export (to derive a within-partition sort key from + * the identity columns). The name is cache-historical; the type is core surface. + */ +export interface CachePartitioningDeclaration { + source: { + columns: string[] + fallback?: string + } + iceberg: { + fields: CachePartitionField[] + } +} + +export interface CachePartitionField { + column: string + transform: 'identity' | 'day' | 'month' | 'year' | string + required?: boolean +} diff --git a/src/core/index.d.ts b/src/core/index.d.ts index 576cdb8..560bca6 100644 --- a/src/core/index.d.ts +++ b/src/core/index.d.ts @@ -15,3 +15,7 @@ export type { PluginPermission, PluginRuntime, } from '../../collectivus-plugin-kernel-types.d.ts' + +// Iceberg partitioning declaration — core surface consumed by the cache and the +// @hypaware/format-iceberg export (LLP 0003 / LLP 0022#shared-core-helpers). +export type { CachePartitioningDeclaration, CachePartitionField } from './iceberg/types.d.ts' diff --git a/src/core/index.js b/src/core/index.js index 93d2eb3..a3ae47e 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -9,3 +9,6 @@ export * from './observability/index.js' export { createConfigRegistry, defaultConfigPath, loadConfigFile, parseConfigShape, CONFIG_BASENAME } from './config/schema.js' export { validateConfig, firstPartyPluginMetadata, mergeInstalledManifestsIntoKnown, isCronExpression, CAP_ENCODER, CAP_BLOB_STORE, CAP_HTTP_ENDPOINT } from './config/validate.js' export { buildPluginCatalog } from './plugin_catalog.js' +// Iceberg partition-spec helpers — core surface consumed by the cache and the +// @hypaware/format-iceberg export (LLP 0003 / LLP 0022#shared-core-helpers). +export { partitionSpecForDeclaration, validatePartitionSpecStability } from './iceberg/partition-spec.js' diff --git a/test/core/cache-iceberg-schema.test.js b/test/core/cache-iceberg-schema.test.js index 5f10309..e61ff1f 100644 --- a/test/core/cache-iceberg-schema.test.js +++ b/test/core/cache-iceberg-schema.test.js @@ -12,9 +12,11 @@ import { columnsFromIcebergSchema, icebergSchemaForColumns, mergeFieldIdsFromTable, +} from '../../src/core/cache/iceberg/schema.js' +import { partitionSpecForDeclaration, validatePartitionSpecStability, -} from '../../src/core/cache/iceberg/schema.js' +} from '../../src/core/iceberg/partition-spec.js' import { appendRowsToTable, tableExists } from '../../src/core/cache/iceberg/store.js' /** diff --git a/test/plugins/iceberg-partitioning.test.js b/test/plugins/iceberg-partitioning.test.js new file mode 100644 index 0000000..e6c7931 --- /dev/null +++ b/test/plugins/iceberg-partitioning.test.js @@ -0,0 +1,232 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' +import fs from 'node:fs/promises' +import fsSync from 'node:fs' +import os from 'node:os' +import path from 'node:path' + +import { derivePartitioning } from '../../hypaware-core/plugins-workspace/format-iceberg/src/partitioning.js' +import { commitBatch, probeTable } from '../../hypaware-core/plugins-workspace/format-iceberg/src/commit.js' +import { + createBlobStoreIO, + tableUrlForBlobPrefix, +} from '../../hypaware-core/plugins-workspace/format-iceberg/src/blob-io.js' +import { createLocalFsBlobStore } from '../../hypaware-core/plugins-workspace/local-fs/src/blob-store.js' + +/** + * @import { BlobStore, ColumnSpec, DatasetRegistration, HypError } from '../../collectivus-plugin-kernel-types.d.ts' + */ + +/** @type {ColumnSpec[]} */ +const AI_GATEWAY_COLUMNS = [ + { name: 'conversation_id', type: 'STRING', nullable: false }, + { name: 'cwd', type: 'STRING', nullable: true }, + { name: 'message_created_at', type: 'TIMESTAMP', nullable: false }, + { name: 'date', type: 'STRING', nullable: false }, + { name: 'message', type: 'STRING', nullable: true }, +] + +/** Minimal registration carrying just what `derivePartitioning` reads. */ +const AI_GATEWAY_REG = /** @type {DatasetRegistration} */ (/** @type {unknown} */ ({ + name: 'ai_gateway_messages', + primaryTimestampColumn: 'message_created_at', + cachePartitioning: { + source: { columns: ['client_name'], fallback: 'unknown' }, + iceberg: { + fields: [ + { column: 'conversation_id', transform: 'identity', required: true }, + { column: 'cwd', transform: 'identity' }, + { column: 'date', transform: 'identity', required: true }, + ], + }, + }, +})) + +// --- derivePartitioning (pure) --- + +test('derivePartitioning returns null without a registration', () => { + assert.equal(derivePartitioning(undefined, AI_GATEWAY_COLUMNS), null) +}) + +test('derivePartitioning returns null when no primaryTimestampColumn', () => { + const reg = /** @type {DatasetRegistration} */ (/** @type {unknown} */ ({ name: 'x' })) + assert.equal(derivePartitioning(reg, AI_GATEWAY_COLUMNS), null) +}) + +test('derivePartitioning returns null when the timestamp column is absent from the schema', () => { + const reg = /** @type {DatasetRegistration} */ (/** @type {unknown} */ ({ + name: 'x', primaryTimestampColumn: 'not_in_schema', + })) + assert.equal(derivePartitioning(reg, AI_GATEWAY_COLUMNS), null) +}) + +test('derivePartitioning builds day(primaryTimestampColumn) + conversation sort for ai-gateway', () => { + const p = derivePartitioning(AI_GATEWAY_REG, AI_GATEWAY_COLUMNS) + assert.ok(p, 'partitioning should be derived') + + // Partition: day(message_created_at) on its schema field id (3), id base 1000. + assert.equal(p.partitionSpec.fields.length, 1) + assert.deepEqual( + { name: p.partitionSpec.fields[0].name, transform: p.partitionSpec.fields[0].transform, src: p.partitionSpec.fields[0]['source-id'] }, + { name: 'message_created_at', transform: 'day', src: 3 } + ) + assert.equal(p.partitionSpec.fields[0]['field-id'], 1000) + assert.equal(p.partitionSpecLabel, 'day(message_created_at)') + + // Sort: the dataset's declared identity columns, in order, asc/nulls-last. + assert.equal(p.sortOrder['order-id'], 1) + assert.deepEqual( + p.sortOrder.fields.map((f) => [f['source-id'], f.transform, f.direction, f['null-order']]), + [[1, 'identity', 'asc', 'nulls-last'], [2, 'identity', 'asc', 'nulls-last'], [4, 'identity', 'asc', 'nulls-last']] + ) + assert.equal(p.sortOrderLabel, 'conversation_id,cwd,date') +}) + +test('derivePartitioning day grain is independent of cachePartitioning (does not inherit conversation_id partitioning)', () => { + const p = derivePartitioning(AI_GATEWAY_REG, AI_GATEWAY_COLUMNS) + assert.ok(p) + // The partition axis must be the day grain only — NOT the cache's + // conversation_id/cwd/date identity partitioning. + assert.deepEqual(p.partitionSpec.fields.map((f) => f.name), ['message_created_at']) +}) + +test('derivePartitioning yields an empty (unsorted) order when no cachePartitioning is declared', () => { + const reg = /** @type {DatasetRegistration} */ (/** @type {unknown} */ ({ + name: 'plain', primaryTimestampColumn: 'message_created_at', + })) + const p = derivePartitioning(reg, AI_GATEWAY_COLUMNS) + assert.ok(p) + assert.equal(p.partitionSpec.fields.length, 1, 'still day-partitioned') + assert.equal(p.sortOrder.fields.length, 0) + assert.equal(p.sortOrder['order-id'], 0) + assert.equal(p.sortOrderLabel, '') +}) + +test('derivePartitioning excludes non-identity cachePartitioning fields from the sort', () => { + const reg = /** @type {DatasetRegistration} */ (/** @type {unknown} */ ({ + name: 'mixed', + primaryTimestampColumn: 'message_created_at', + cachePartitioning: { + source: { columns: [] }, + iceberg: { + fields: [ + { column: 'conversation_id', transform: 'identity', required: true }, + { column: 'message_created_at', transform: 'day' }, // temporal, not a lookup key + ], + }, + }, + })) + const p = derivePartitioning(reg, AI_GATEWAY_COLUMNS) + assert.ok(p) + assert.deepEqual(p.sortOrder.fields.map((f) => f['source-id']), [1]) + assert.equal(p.sortOrderLabel, 'conversation_id') +}) + +// --- commit integration (real local-fs + icebird) --- + +/** @returns {Promise<{ blobStore: BlobStore, baseDir: string, cleanup: () => Promise }>} */ +async function freshLocalFsStore() { + const baseDir = await fs.mkdtemp(path.join(os.tmpdir(), 'hyp-iceberg-partition-')) + return { + blobStore: createLocalFsBlobStore({ baseDir }), + baseDir, + cleanup: () => fs.rm(baseDir, { recursive: true, force: true }), + } +} + +const ROWS = [ + { conversation_id: 'cB', cwd: '/x', message_created_at: '2026-06-04T10:00:00Z', date: '2026-06-04', message: 'b1' }, + { conversation_id: 'cA', cwd: '/x', message_created_at: '2026-06-04T09:00:00Z', date: '2026-06-04', message: 'a1' }, + { conversation_id: 'cB', cwd: '/x', message_created_at: '2026-06-05T10:00:00Z', date: '2026-06-05', message: 'b2' }, + { conversation_id: 'cA', cwd: '/x', message_created_at: '2026-06-05T09:00:00Z', date: '2026-06-05', message: 'a2' }, +] + +test('commitBatch creates a day-partitioned, conversation-sorted table and buckets rows by day', async () => { + const fixture = await freshLocalFsStore() + try { + const { resolver, lister } = await createBlobStoreIO(fixture.blobStore) + const tableUrl = tableUrlForBlobPrefix('iceberg/datasets/ai_gateway_messages') + const partitioning = derivePartitioning(AI_GATEWAY_REG, AI_GATEWAY_COLUMNS) + + const initial = await probeTable(tableUrl, resolver, lister) + await commitBatch( + { tableUrl, columns: AI_GATEWAY_COLUMNS, rows: ROWS, resolver, lister, partitioning }, + { exists: initial.exists, metadata: initial.metadata } + ) + + const after = await probeTable(tableUrl, resolver, lister) + const meta = after.metadata + assert.ok(meta, 'metadata present') + + // Partition spec: day(message_created_at). + const spec = meta['partition-specs'].find((s) => s['spec-id'] === meta['default-spec-id']) + assert.deepEqual((spec?.fields ?? []).map((f) => [f.name, f.transform]), [['message_created_at', 'day']]) + + // Sort order recorded and made default. + const order = (meta['sort-orders'] ?? []).find((o) => o['order-id'] === meta['default-sort-order-id']) + assert.ok(order && order.fields.length === 3, 'default sort order has the three lookup columns') + assert.equal(order.fields[0]['source-id'], 1, 'conversation_id leads the sort') + + // 4 rows over 2 days ⇒ exactly 2 data files (one per day partition). + const dataDir = path.join(fixture.baseDir, 'iceberg', 'datasets', 'ai_gateway_messages', 'data') + const dataFiles = fsSync.readdirSync(dataDir).filter((f) => f.endsWith('.parquet')) + assert.equal(dataFiles.length, 2, `expected 2 day-partition files, got ${dataFiles.length}`) + } finally { + await fixture.cleanup() + } +}) + +test('commitBatch rejects partition-spec drift on a previously-unpartitioned table', async () => { + const fixture = await freshLocalFsStore() + try { + const { resolver, lister } = await createBlobStoreIO(fixture.blobStore) + const tableUrl = tableUrlForBlobPrefix('iceberg/datasets/ai_gateway_messages') + + // First commit creates the table WITHOUT partitioning (simulates a table + // written before this spec, or a dataset that later gains a timestamp). + const initial = await probeTable(tableUrl, resolver, lister) + await commitBatch( + { tableUrl, columns: AI_GATEWAY_COLUMNS, rows: [ROWS[0]], resolver, lister }, + { exists: initial.exists, metadata: initial.metadata } + ) + + // Second commit now carries the day-grain partitioning ⇒ drift. + const partitioning = derivePartitioning(AI_GATEWAY_REG, AI_GATEWAY_COLUMNS) + const existing = await probeTable(tableUrl, resolver, lister) + await assert.rejects( + () => + commitBatch( + { tableUrl, columns: AI_GATEWAY_COLUMNS, rows: [ROWS[1]], resolver, lister, partitioning }, + { exists: existing.exists, metadata: existing.metadata } + ), + (err) => /** @type {HypError} */ (err).hypErrorKind === 'iceberg_partition_spec_drift' + ) + } finally { + await fixture.cleanup() + } +}) + +test('commitBatch accepts a re-commit with the same derived partitioning (no false drift)', async () => { + const fixture = await freshLocalFsStore() + try { + const { resolver, lister } = await createBlobStoreIO(fixture.blobStore) + const tableUrl = tableUrlForBlobPrefix('iceberg/datasets/ai_gateway_messages') + const partitioning = derivePartitioning(AI_GATEWAY_REG, AI_GATEWAY_COLUMNS) + + const initial = await probeTable(tableUrl, resolver, lister) + await commitBatch( + { tableUrl, columns: AI_GATEWAY_COLUMNS, rows: [ROWS[0]], resolver, lister, partitioning }, + { exists: initial.exists, metadata: initial.metadata } + ) + const existing = await probeTable(tableUrl, resolver, lister) + const result = await commitBatch( + { tableUrl, columns: AI_GATEWAY_COLUMNS, rows: [ROWS[2]], resolver, lister, partitioning }, + { exists: existing.exists, metadata: existing.metadata } + ) + assert.ok(result.snapshotId.length > 0, 'second append with stable spec must succeed') + } finally { + await fixture.cleanup() + } +})