Skip to content

[FLINK-39417] Fix GenericRecordData could not be [de]serialized in pipeline#4374

Open
yuxiqian wants to merge 9 commits intoapache:masterfrom
yuxiqian:FLINK-39417
Open

[FLINK-39417] Fix GenericRecordData could not be [de]serialized in pipeline#4374
yuxiqian wants to merge 9 commits intoapache:masterfrom
yuxiqian:FLINK-39417

Conversation

@yuxiqian
Copy link
Copy Markdown
Member

This closes FLINK-39417.

Previously in FLINK-38886, we introduced a GenericRecordData as a replacement of BinaryRecordData so connector and internal operator developers could get rid of the heavy flink-cdc-runtime operator and construct binary-unaware records.

However, lacking corresponding serializers, it could not be used in complete pipelines but only in specific test cases.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses FLINK-39417 by enabling end-to-end (de)serialization of GenericRecordData in Flink CDC pipelines, complementing the earlier introduction of GenericRecordData as a binary-unaware RecordData implementation.

Changes:

  • Extend RecordDataSerializer to support both BinaryRecordData and GenericRecordData.
  • Introduce a new self-describing GenericRecordDataSerializer (type-tagged field encoding).
  • Broaden runtime/operator/test coverage to exercise GenericRecordData through transforms and Values connector pipelines.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java Adds unit tests covering serialization/copy for GenericRecordData and nested structures.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java Converts non-binary RecordData rows to BinaryRecordData when writing ROW fields.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java Adds a type-tagged wrapper format to serialize BinaryRecordData vs GenericRecordData.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java New self-describing serializer for GenericRecordData fields (incl. nested rows/array/map/variant).
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java Accepts RecordData input to allow pre-transform projection for generic rows.
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java Stops casting change-event before/after to BinaryRecordData, enabling generic rows.
flink-cdc-connect/.../ValuesDatabase.java Builds PK strings via FieldGetter to support non-string PK types and non-binary records.
flink-cdc-composer/src/test/java/.../FlinkPipelineComposerITCase.java Adds an IT case that runs GenericRecordData end-to-end across many types incl. array/map/row.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@yuxiqian yuxiqian marked this pull request as draft April 10, 2026 09:54
@yuxiqian yuxiqian marked this pull request as ready for review April 10, 2026 11:46
@yuxiqian yuxiqian requested a review from lvyanquan April 13, 2026 05:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants