-
Notifications
You must be signed in to change notification settings - Fork 475
[hotfix] Fix union read can't restore issue #2321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes an issue with union read restoration by introducing a new serialization version (VERSION_1) for SourceEnumeratorState. The key change is that VERSION_1 always serializes and deserializes remainingHybridLakeFlussSplits regardless of the lakeSource flag, fixing a bug where VERSION_0 would only serialize/deserialize these splits when lakeSource was non-null.
Key Changes:
- Introduced VERSION_1 serialization format that always handles
remainingHybridLakeFlussSplits - Refactored serialization logic to extract common bucket/partition serialization into a helper method
- Updated deserialization to handle both VERSION_0 (backward compatible) and VERSION_1 formats
- Added comprehensive tests for V0 compatibility and cross-version scenarios
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| FlussSourceEnumeratorStateSerializer.java | Introduces VERSION_1 serialization format, refactors serialization logic into helper methods, and updates deserialization to handle version-specific behavior |
| SourceEnumeratorStateSerializerTest.java | Adds tests for V0 backward compatibility and cross-version deserialization scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...n/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
Outdated
Show resolved
Hide resolved
...n/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
Show resolved
Hide resolved
...n/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
Show resolved
Hide resolved
39ed2ad to
c3bf44a
Compare
c3bf44a to
ccacaaf
Compare
loserwang1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left some message.
| import java.util.Set; | ||
|
|
||
| import static org.apache.fluss.utils.Preconditions.checkNotNull; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @Override | ||
| public SourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException { | ||
| if (version != VERSION_0) { | ||
| if (version != VERSION_0 && version != CURRENT_VERSION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version != VERSION_0 && version != CURRENT_VERSION
When we support version 3 later, state with version 3 cannot downgrade. We have met this problem in many connectors. @leonardBang , CC, should we stop higher version's state?
And I also found a problem : we can never downgrade later to version 1. Because in later code will version 1 state: version != VERSION_0 is always true. Maybe we should add this to upgrade document later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1: so, what's the best pratice in here? Always make the version check pass and make sure version3 compitable with v2? From kafka connector code, seems the kafka connector will still have the problem?
2: what do you mean by saying we can never downgrade later to version 1? Seems it still the same problem to 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, that's clearer
65dbbdd to
d0fc306
Compare
|
@loserwang1024 Comments is addressed. PTAL |
loserwang1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM



Purpose
Linked issue: close #xxx
Brief change log
Tests
API and Format
Documentation