Skip to content

[SS-78] Iceberg Sink Append Mode#35781

Open
ublubu wants to merge 9 commits intoMaterializeInc:mainfrom
ublubu:append
Open

[SS-78] Iceberg Sink Append Mode#35781
ublubu wants to merge 9 commits intoMaterializeInc:mainfrom
ublubu:append

Conversation

@ublubu
Copy link
Copy Markdown
Contributor

@ublubu ublubu commented Mar 30, 2026

Motivation

We want a copy of the update stream, so dump it into an Iceberg sink.

Description

This PR adds MODE APPEND to Iceberg sinks.

The "append" sink outputs data rows with additional _mz_diff and _mz_timestamp columns.
Inserted row gets +1 diff. Removed row gets -1 diff.
Updated row becomes two output rows, with -1 (before) and +1 (after) diffs, respectively.

Pending Work

User-facing error (will be added to pure.rs or ddl.rs?) when the "FROM" table already contains columns named _mz_diff or _mz_timestamp.

@github-actions
Copy link
Copy Markdown

Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone.

PR title guidelines

  • Use imperative mood: "Fix X" not "Fixed X" or "Fixes X"
  • Be specific: "Fix panic in catalog sync when controller restarts" not "Fix bug" or "Update catalog code"
  • Prefix with area if helpful: compute: , storage: , adapter: , sql:

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

@ublubu ublubu changed the title DRAFT: Iceberg Sink Append Mode [SS-78] Iceberg Sink Append Mode Mar 31, 2026
@ublubu ublubu marked this pull request as ready for review March 31, 2026 19:26
@ublubu ublubu requested review from a team as code owners March 31, 2026 19:26
@ublubu ublubu requested review from DAlperin and SangJunBak March 31, 2026 19:26
Copy link
Copy Markdown
Member

@DAlperin DAlperin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great. Approved mod some nits and SQL+TEST team approval

let (arrow_schema_with_ids, iceberg_schema) =
relation_desc_to_iceberg_schema(&sink.from_desc)?;

Ok(if sink.envelope == SinkEnvelope::Append {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets just use a match here

Comment on lines +380 to +382
let n = ctx.arrow_schema.fields().len().saturating_sub(2);
let user_schema_for_append =
Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe some kind of assert!(n >= 2 && fields[n-2].name() == "_mz_diff" && fields[n-1].name() == "_mz_timestamp",
"expected _mz_diff and _mz_timestamp as last two fields")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants