fix(streamer): Align streamer checkpoint key with target table version#18896
fix(streamer): Align streamer checkpoint key with target table version#18896yihua wants to merge 4 commits into
Conversation
Several streamer sources hardcoded `new StreamerCheckpointV2(...)`, emitting V2 checkpoint keys to commit metadata even on table version 6 where 0.x readers only consume the V1 key. Route every non-incremental source through `CheckpointUtils.createCheckpoint(writeTableVersion, key)` so the checkpoint class is determined solely by WRITE_TABLE_VERSION.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! This PR routes streamer sources and selector helpers (plus the DistributedTestDataSource test utility) through CheckpointUtils.createCheckpoint(writeTableVersion, key) so that table version 6 emits V1 checkpoint keys instead of V2, aligning the streamer checkpoint key with the target table version. One pre-existing issue worth addressing while you're in the area around HiveIncrPullSource's use of String.valueOf(commitToPull.get()). Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. One small naming consistency observation below; the rest of the code is clean.
| sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch new data"); | ||
| return new InputBatch<>(Option.of(avroRDD.keys().map(r -> ((GenericRecord) r.datum()))), | ||
| String.valueOf(commitToPull.get())); | ||
| createCheckpoint(writeTableVersion, String.valueOf(commitToPull.get()))); |
There was a problem hiding this comment.
🤖 I think String.valueOf(commitToPull.get()) here ends up calling Checkpoint.toString(), which returns something like "StreamerCheckpointV2{checkpointKey='20200101000000'}" (not the bare instant time). That string then becomes the new checkpoint key, so subsequent runs would compare instant times against "StreamerCheckpointV{1,2}{checkpointKey='…'}" and the loop in findCommitToPull would never advance. This is a pre-existing bug, but since you're touching this exact line — would just commitToPull.get() (or createCheckpoint(writeTableVersion, commitToPull.get().getCheckpointKey())) work, given findCommitToPull already returns a correctly-versioned Checkpoint? The new testHiveIncrPullSource only exercises findCommitToPull directly, so it doesn't catch this.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata()); | ||
| } | ||
|
|
||
| public static Checkpoint createCheckpoint(int writeTableVersion, String checkpointToResume) { |
There was a problem hiding this comment.
🤖 nit: could you rename this to buildCheckpoint (or rename buildCheckpointFromGeneralSource to createCheckpointFromGeneralSource) so the two factory methods in the same class use a consistent verb? As it stands, a caller trying to pick between them has to read both signatures to understand the difference.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18896 +/- ##
============================================
- Coverage 68.83% 66.22% -2.62%
+ Complexity 29171 23486 -5685
============================================
Files 2520 2046 -474
Lines 140024 115755 -24269
Branches 17192 14896 -2296
============================================
- Hits 96392 76658 -19734
+ Misses 35858 32269 -3589
+ Partials 7774 6828 -946
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
For Hudi table version 6, several streamer sources hardcoded
new StreamerCheckpointV2(...)and emitted v2 checkpoint keys in commit metadata regardless of the configured write table version (KafkaSourcewas the original repro).Summary and Changelog
This PR routes every affected source (
KafkaSource,KinesisSource,JdbcSource,SqlFileBasedSource,PulsarSource,DebeziumSource,GcsEventsSource,HiveIncrPullSource) and selector helper (DFSPathSelector,DatePartitionPathSelector,S3EventsMetaSelector) throughCheckpointUtils.createCheckpoint(writeTableVersion, key)so the emitted checkpoint class is determined solely byWRITE_TABLE_VERSION(v6 → v1, v8 → v2). Removed now-dead helpers (Source#assertCheckpointVersion,InputBatch(Option, String, ...)ctors).Incremental sources (
HoodieIncrSource,S3EventsHoodieIncrSource,GcsEventsHoodieIncrSource) are not affected — they have their own checkpoint construction logic and already route throughCheckpointUtils.buildCheckpointFromGeneralSource/ theDATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2allowlist.Impact
Streamer commit metadata for table version 6 now correctly emits v1 checkpoint keys (
deltastreamer.checkpoint.key) instead of v2 keys (streamer.checkpoint.key.v2). No public API change.Risk Level
medium: touches the checkpoint write path in every streamer source. Added
TestStreamerSourceCheckpointVersion, which exercises each source class across {v6, v8} write table versions and {V1, V2} input checkpoints and asserts the returned checkpoint class is determined solely by the write table version. Added atestCreateCheckpointparameterized test forCheckpointUtils.createCheckpointinTestCheckpointUtils.Documentation Update
none
Contributor's checklist