Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collectivus-plugin-kernel-types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

import type { AsyncDataSource, ScanOptions, ScanResults } from 'squirreling'
import type { CachePartitioningDeclaration } from './src/core/cache/types.d.ts'
import type { CachePartitioningDeclaration } from './src/core/iceberg/types.d.ts'

export type { AsyncDataSource, ScanOptions, ScanResults }

Expand Down
45 changes: 42 additions & 3 deletions hypaware-core/plugins-workspace/format-iceberg/src/commit.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import {
loadLatestFileCatalogMetadata,
} from 'icebird'

import { validatePartitionSpecStability } from '../../../../src/core/iceberg/partition-spec.js'

import { icebergSchemaForColumns, mergeFieldIdsFromTable, rowsToIcebergRecords } from './schema.js'

/**
* @import { ColumnSpec } from '../../../../collectivus-plugin-kernel-types.d.ts'
* @import { CommitInput, CommitResult, TableState } from './types.d.ts'
* @import { Lister, Resolver, Snapshot, TableMetadata } from 'icebird/src/types.js'
* @import { CommitInput, CommitResult, DatasetPartitioning, TableState } from './types.d.ts'
* @import { Lister, PartitionSpec, Resolver, Snapshot, TableMetadata } from 'icebird/src/types.js'
*/

/**
Expand Down Expand Up @@ -80,15 +82,34 @@ export async function commitBatch(input, priorState) {

if (!priorState.exists) {
try {
// @ref LLP 0022#partition-derivation — create with the writer-owned
// day-grain partitionSpec + conversation sort order. Both default to
// unpartitioned/unsorted in icebird when `partitioning` is absent.
await icebergCreateTable({
catalog,
tableUrl: input.tableUrl,
schema: targetSchema,
formatVersion: 3,
partitionSpec: input.partitioning?.partitionSpec,
sortOrder: input.partitioning?.sortOrder,
})
} catch (err) {
throw wrapCommitError(err, 'iceberg_commit_failed', `create table failed at '${input.tableUrl}'`)
}
} else if (input.partitioning && priorState.metadata) {
// @ref LLP 0022#drift-rejection — an existing table whose partition spec no
// longer matches the dataset's derived day grain is rejected; the export
// cannot retroactively repartition object-store data files. [constrained-by]
const existingSpec = currentPartitionSpec(priorState.metadata) ?? { 'spec-id': 0, fields: [] }
try {
validatePartitionSpecStability(input.partitioning.declaration, existingSpec, targetSchema)
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
throw newError(
'iceberg_partition_spec_drift',
`iceberg-format: partition spec drift at '${input.tableUrl}': ${message}`
)
}
}

/** @type {TableMetadata} */
Expand Down Expand Up @@ -142,6 +163,7 @@ const DEFAULT_STREAM_ROW_LIMIT = 100_000
* rows: AsyncIterable<Record<string, unknown>>,
* resolver: Resolver,
* lister: Lister,
* partitioning?: DatasetPartitioning | null,
* }} input
* @param {{ exists: boolean, metadata: TableMetadata | null }} priorState
* @param {{ batchByteLimit?: number, batchRowLimit?: number }} [opts]
Expand All @@ -167,7 +189,7 @@ export async function commitRowStream(input, priorState, opts = {}) {
async function flushBatch() {
if (batch.length === 0) return
const result = await commitBatch(
{ tableUrl: input.tableUrl, columns: input.columns, rows: batch, resolver: input.resolver, lister: input.lister },
{ tableUrl: input.tableUrl, columns: input.columns, rows: batch, resolver: input.resolver, lister: input.lister, partitioning: input.partitioning },
state
)
state = { exists: true, metadata: result.metadata }
Expand Down Expand Up @@ -225,6 +247,23 @@ function schemaFromExistingMetadata(columns, metadata) {
return mergeFieldIdsFromTable(columns, existing)
}

/**
* Resolve the table's current `PartitionSpec` from metadata (the default spec,
* falling back to the last). Used for the on-append drift check.
*
* @param {TableMetadata} metadata
* @returns {PartitionSpec | undefined}
*/
function currentPartitionSpec(metadata) {
const specId = metadata['default-spec-id']
const specs = metadata['partition-specs']
if (specs?.length) {
const match = specs.find((s) => s['spec-id'] === specId)
return match ?? specs[specs.length - 1]
}
return undefined
}

/**
* @param {unknown} value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ export async function discoverExportDatasets(blobStore, prefix) {
* Run export maintenance on all datasets under a prefix: snapshot
* expiration per dataset, plus a compaction status report.
*
* icebird V1 does not expose `rewrite-data-files` or `delete-data-files`,
* so compaction is not supported. The report signals
* `compactionSupported: false` so the CLI can surface a clear message.
* @ref LLP 0022#compaction — icebird now exposes `icebergRewrite`
* (read-rewrite compaction), but the export deliberately does not run it:
* day-grain partitioning already yields large files, and an in-daemon
* read-rewrite risks the OOM/blocking failure seen with the parquet sink.
* `compactionSupported: false` here means "not run by this sink" (out-of-band
* only), not "impossible".
*
* @param {{
* blobStore: BlobStore
Expand Down
98 changes: 98 additions & 0 deletions hypaware-core/plugins-workspace/format-iceberg/src/partitioning.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// @ts-check

import { partitionSpecForDeclaration } from '../../../../src/core/iceberg/partition-spec.js'

import { icebergSchemaForColumns } from './schema.js'

/**
* @import { ColumnSpec, DatasetRegistration } from '../../../../collectivus-plugin-kernel-types.d.ts'
* @import { CachePartitioningDeclaration } from '../../../../src/core/iceberg/types.d.ts'
* @import { Schema, SortField, SortOrder } from 'icebird/src/types.js'
* @import { DatasetPartitioning } from './types.d.ts'
*/

// @ref LLP 0022#partition-derivation — the export partitions by a writer-owned
// day grain on the dataset's primaryTimestampColumn, derived independently of
// the cache's `cachePartitioning` (which would impose an unbounded
// per-conversation file count on an archive). [implements]
/**
* Derive the export table's layout for a dataset: a `day(primaryTimestampColumn)`
* partition plus a within-partition sort on the dataset's lookup columns.
* Returns `null` when the dataset declares no `primaryTimestampColumn` present
* in its schema — that dataset exports unpartitioned (V1 behavior unchanged).
*
* @param {DatasetRegistration | undefined} reg
* @param {readonly ColumnSpec[]} columns
* @returns {DatasetPartitioning | null}
*/
export function derivePartitioning(reg, columns) {
if (!reg) return null
const tsColumn = typeof reg.primaryTimestampColumn === 'string' ? reg.primaryTimestampColumn : ''
if (!tsColumn) return null
// A primaryTimestampColumn that isn't in the exported schema can't anchor a
// day grain; fall back to unpartitioned rather than synthesize a bad spec.
if (!columns.some((c) => c.name === tsColumn)) return null

const schema = icebergSchemaForColumns(columns)
/** @type {CachePartitioningDeclaration} */
const declaration = {
source: { columns: [tsColumn] },
iceberg: { fields: [{ column: tsColumn, transform: 'day', required: true }] },
}
const partitionSpec = partitionSpecForDeclaration(declaration, schema)
const sortOrder = sortOrderForLookup(reg, schema)
return {
declaration,
partitionSpec,
sortOrder,
partitionSpecLabel: `day(${tsColumn})`,
sortOrderLabel: sortOrder.fields.map((f) => nameForSourceId(schema, f)).join(','),
}
}

// @ref LLP 0022#within-partition-sort — cluster each day partition by the
// dataset's declared identity (lookup) columns so a conversation lookup prunes
// row groups by min/max, without the file-count cost of partitioning on it.
// This is the one place the export reads `cachePartitioning` — sort axis only.
// [implements]
/**
* Build a sort order from the dataset's declared identity columns
* (`cachePartitioning.iceberg.fields`, transform `identity`), in declared order.
* Returns an empty (unsorted) order when none apply — icebird treats that as a
* no-op, so an undeclared dataset is day-partitioned but unsorted.
*
* @param {DatasetRegistration} reg
* @param {Schema} schema
* @returns {SortOrder}
*/
function sortOrderForLookup(reg, schema) {
/** @type {Map<string, number>} */
const idByName = new Map(schema.fields.map((f) => [f.name, f.id]))
const declared = reg.cachePartitioning?.iceberg?.fields ?? []
/** @type {SortField[]} */
const fields = []
for (const f of declared) {
if (f.transform !== 'identity') continue
const id = idByName.get(f.column)
if (id === undefined) continue
fields.push({
'source-id': id,
transform: 'identity',
direction: 'asc',
'null-order': 'nulls-last',
})
}
// order-id 0 is conventionally "unsorted"; a real order uses 1.
return fields.length > 0 ? { 'order-id': 1, fields } : { 'order-id': 0, fields: [] }
}

/**
* @param {Schema} schema
* @param {SortField} field
* @returns {string}
*/
function nameForSourceId(schema, field) {
const id = field['source-id']
const match = schema.fields.find((f) => f.id === id)
return match ? match.name : String(id)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getTracer, SpanStatusCode } from '../../../../src/core/observability/in
import { createBlobStoreIO, pathToKey, tableUrlForBlobPrefix } from './blob-io.js'
import { commitBatch, commitRowStream, probeTable } from './commit.js'
import { expireExportSnapshots, normalizeExportRetentionConfig } from './maintenance.js'
import { derivePartitioning } from './partitioning.js'
import { loadMarker, markerKey, markerSubsumedBySnapshot, writeMarker } from './state.js'

/**
Expand Down Expand Up @@ -195,6 +196,11 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log, mai
return { partitionsExported: partitions.length, bytesWritten: 0, status: 'skipped' }
}

// @ref LLP 0022#partition-derivation — derived per dataset at commit time
// because `createSink` runs once for a sink that exports many datasets, so
// the spec cannot be resolved up front. [implements]
const partitioning = derivePartitioning(ctx.query.getDataset(dataset), columns)

const blobPrefix = joinKeys(pathToKey(prefix), sanitizeDataset(dataset))
const tableUrl = tableUrlForBlobPrefix(blobPrefix)
// Track the most recent metadata.json write so the commit span can
Expand Down Expand Up @@ -283,14 +289,18 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log, mai
hyp_dataset: dataset,
hyp_batch_id: batch.batchId,
encoder_format: ctx.encoder.format,
// @ref LLP 0022#observability — surface the resolved layout so a smoke
// can assert what was written, not just that rows landed.
hyp_partition_spec: partitioning?.partitionSpecLabel ?? 'unpartitioned',
hyp_sort_order: partitioning?.sortOrderLabel ?? '',
status: 'ok',
...destinationAttrs,
},
},
async (span) => {
try {
const result = await commitRowStream(
{ tableUrl, columns, rows: rowStream(), resolver, lister },
{ tableUrl, columns, rows: rowStream(), resolver, lister, partitioning },
{ exists: priorState.exists, metadata: priorState.metadata }
)
span.setAttribute('snapshot_id', result.snapshotId)
Expand Down
23 changes: 22 additions & 1 deletion hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ColumnSpec } from '../../../../collectivus-plugin-kernel-types.d.ts'
import type { TableMetadata, Resolver, Lister } from 'icebird/src/types.js'
import type { TableMetadata, Resolver, Lister, PartitionSpec, SortOrder } from 'icebird/src/types.js'
import type { CachePartitioningDeclaration } from '../../../../src/core/iceberg/types.d.ts'

export interface TableState {
/** True when at least one metadata file is visible. */
Expand All @@ -8,6 +9,24 @@ export interface TableState {
currentSnapshotId: string | undefined
}

/**
* Writer-owned export layout for one dataset (LLP 0022): a day-grain partition
* derived from `primaryTimestampColumn`, plus a within-partition sort on the
* dataset's declared identity (lookup) columns.
*/
export interface DatasetPartitioning {
/** Synthesized day-grain declaration — kept for the on-append drift check. */
declaration: CachePartitioningDeclaration
/** Iceberg partition spec passed to `icebergCreateTable`. */
partitionSpec: PartitionSpec
/** Within-partition sort order; an empty order means unsorted (no-op). */
sortOrder: SortOrder
/** Span label, e.g. `day(message_created_at)`. */
partitionSpecLabel: string
/** Span label, e.g. `conversation_id,cwd,date` (empty when unsorted). */
sortOrderLabel: string
}

export interface CommitInput {
/** Table URL the resolver understands. */
tableUrl: string
Expand All @@ -17,6 +36,8 @@ export interface CommitInput {
rows: readonly Record<string, unknown>[]
resolver: Resolver
lister: Lister
/** Day-grain partition + sort layout; absent ⇒ unpartitioned table. */
partitioning?: DatasetPartitioning | null
}

export interface CommitResult {
Expand Down
2 changes: 1 addition & 1 deletion hypaware-core/smoke/flows/iceberg_export_local_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ export async function run({ harness, expect }) {
(ds) => Array.isArray(ds) && ds.some((d) => d.dataset === DATASET)
)
expect.that(
'maintain: compactionSupported is false (icebird V1 limitation)',
'maintain: compactionSupported is false (not run by this sink — out-of-band only, LLP 0022)',
maintainReport.compactionSupported,
(v) => v === false
)
Expand Down
Loading