Skip to content

fix(streamer): Align streamer checkpoint key with target table version#18896

Open
yihua wants to merge 4 commits into
apache:masterfrom
yihua:fix-checkpoint-by-table-version
Open

fix(streamer): Align streamer checkpoint key with target table version#18896
yihua wants to merge 4 commits into
apache:masterfrom
yihua:fix-checkpoint-by-table-version

Conversation

@yihua
Copy link
Copy Markdown
Contributor

@yihua yihua commented Jun 2, 2026

Describe the issue this Pull Request addresses

For Hudi table version 6, several streamer sources hardcoded new StreamerCheckpointV2(...) and emitted v2 checkpoint keys in commit metadata regardless of the configured write table version (KafkaSource was the original repro).

Summary and Changelog

This PR routes every affected source (KafkaSource, KinesisSource, JdbcSource, SqlFileBasedSource, PulsarSource, DebeziumSource, GcsEventsSource, HiveIncrPullSource) and selector helper (DFSPathSelector, DatePartitionPathSelector, S3EventsMetaSelector) through CheckpointUtils.createCheckpoint(writeTableVersion, key) so the emitted checkpoint class is determined solely by WRITE_TABLE_VERSION (v6 → v1, v8 → v2). Removed now-dead helpers (Source#assertCheckpointVersion, InputBatch(Option, String, ...) ctors).

Incremental sources (HoodieIncrSource, S3EventsHoodieIncrSource, GcsEventsHoodieIncrSource) are not affected — they have their own checkpoint construction logic and already route through CheckpointUtils.buildCheckpointFromGeneralSource / the DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2 allowlist.

Impact

Streamer commit metadata for table version 6 now correctly emits v1 checkpoint keys (deltastreamer.checkpoint.key) instead of v2 keys (streamer.checkpoint.key.v2). No public API change.

Risk Level

medium: touches the checkpoint write path in every streamer source. Added TestStreamerSourceCheckpointVersion, which exercises each source class across {v6, v8} write table versions and {V1, V2} input checkpoints and asserts the returned checkpoint class is determined solely by the write table version. Added a testCreateCheckpoint parameterized test for CheckpointUtils.createCheckpoint in TestCheckpointUtils.

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

yihua added 2 commits June 1, 2026 22:27
Several streamer sources hardcoded `new StreamerCheckpointV2(...)`,
emitting V2 checkpoint keys to commit metadata even on table version 6
where 0.x readers only consume the V1 key. Route every non-incremental
source through `CheckpointUtils.createCheckpoint(writeTableVersion, key)`
so the checkpoint class is determined solely by WRITE_TABLE_VERSION.
@github-actions github-actions Bot added the size:L PR with lines of changes in (300, 1000] label Jun 2, 2026
@hudi-bot
Copy link
Copy Markdown
Collaborator

hudi-bot commented Jun 2, 2026

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! This PR routes streamer sources and selector helpers (plus the DistributedTestDataSource test utility) through CheckpointUtils.createCheckpoint(writeTableVersion, key) so that table version 6 emits V1 checkpoint keys instead of V2, aligning the streamer checkpoint key with the target table version. One pre-existing issue worth addressing while you're in the area around HiveIncrPullSource's use of String.valueOf(commitToPull.get()). Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. One small naming consistency observation below; the rest of the code is clean.

sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new data");
return new InputBatch<>(Option.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))),
String.valueOf(commitToPull.get()));
createCheckpoint(writeTableVersion, String.valueOf(commitToPull.get())));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 I think String.valueOf(commitToPull.get()) here ends up calling Checkpoint.toString(), which returns something like "StreamerCheckpointV2{checkpointKey='20200101000000'}" (not the bare instant time). That string then becomes the new checkpoint key, so subsequent runs would compare instant times against "StreamerCheckpointV{1,2}{checkpointKey='…'}" and the loop in findCommitToPull would never advance. This is a pre-existing bug, but since you're touching this exact line — would just commitToPull.get() (or createCheckpoint(writeTableVersion, commitToPull.get().getCheckpointKey())) work, given findCommitToPull already returns a correctly-versioned Checkpoint? The new testHiveIncrPullSource only exercises findCommitToPull directly, so it doesn't catch this.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata());
}

public static Checkpoint createCheckpoint(int writeTableVersion, String checkpointToResume) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you rename this to buildCheckpoint (or rename buildCheckpointFromGeneralSource to createCheckpointFromGeneralSource) so the two factory methods in the same class use a consistent verb? As it stands, a caller trying to pick between them has to read both signatures to understand the difference.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 35.89744% with 25 lines in your changes missing coverage. Please review.
✅ Project coverage is 66.22%. Comparing base (11f0e7c) to head (7465508).
⚠️ Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
...apache/hudi/utilities/sources/GcsEventsSource.java 0.00% 7 Missing ⚠️
...che/hudi/utilities/sources/HiveIncrPullSource.java 0.00% 4 Missing ⚠️
...ies/sources/helpers/DatePartitionPathSelector.java 0.00% 3 Missing ⚠️
...tilities/sources/helpers/S3EventsMetaSelector.java 0.00% 3 Missing ⚠️
...g/apache/hudi/utilities/sources/KinesisSource.java 0.00% 2 Missing ⚠️
...che/hudi/utilities/sources/SqlFileBasedSource.java 0.00% 2 Missing ⚠️
...udi/utilities/sources/debezium/DebeziumSource.java 0.00% 2 Missing ⚠️
.../org/apache/hudi/utilities/sources/JdbcSource.java 66.66% 1 Missing ⚠️
...rg/apache/hudi/utilities/sources/PulsarSource.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18896      +/-   ##
============================================
- Coverage     68.83%   66.22%   -2.62%     
+ Complexity    29171    23486    -5685     
============================================
  Files          2520     2046     -474     
  Lines        140024   115755   -24269     
  Branches      17192    14896    -2296     
============================================
- Hits          96392    76658   -19734     
+ Misses        35858    32269    -3589     
+ Partials       7774     6828     -946     
Flag Coverage Δ
common-and-other-modules ?
hadoop-mr-java-client 44.87% <0.00%> (-0.05%) ⬇️
spark-client-hadoop-common 48.16% <0.00%> (-0.06%) ⬇️
spark-java-tests 49.41% <7.69%> (+0.06%) ⬆️
spark-scala-tests 45.25% <0.00%> (-0.02%) ⬇️
utilities 37.38% <34.21%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../hudi/common/table/checkpoint/CheckpointUtils.java 87.50% <100.00%> (-1.39%) ⬇️
.../apache/hudi/examples/common/RandomJsonSource.java 100.00% <100.00%> (+100.00%) ⬆️
.../org/apache/hudi/utilities/sources/InputBatch.java 100.00% <ø> (ø)
...org/apache/hudi/utilities/sources/KafkaSource.java 45.94% <100.00%> (-39.77%) ⬇️
...java/org/apache/hudi/utilities/sources/Source.java 56.89% <ø> (-11.22%) ⬇️
...udi/utilities/sources/helpers/DFSPathSelector.java 76.92% <100.00%> (-7.40%) ⬇️
.../org/apache/hudi/utilities/sources/JdbcSource.java 56.36% <66.66%> (-36.37%) ⬇️
...rg/apache/hudi/utilities/sources/PulsarSource.java 0.00% <0.00%> (ø)
...g/apache/hudi/utilities/sources/KinesisSource.java 0.00% <0.00%> (-76.65%) ⬇️
...che/hudi/utilities/sources/SqlFileBasedSource.java 0.00% <0.00%> (-89.48%) ⬇️
... and 5 more

... and 861 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants