[HUDI-18891] fix(streamer): override all deserialize() overloads in KafkaAvroSchemaDeserializer#18892
Conversation
…afkaAvroSchemaDeserializer KafkaAvroSchemaDeserializer previously only overrode deserialize(String, Boolean, byte[], Schema) to inject the configured sourceSchema. The Kafka consumer / Connect framework can invoke other overloads — deserialize(String, byte[]), deserialize(String, byte[], Schema), and deserialize(String, Headers, byte[]) — which bypassed the sourceSchema injection. This caused ArrayIndexOutOfBoundsException when consuming records serialized with an older schema (fewer fields in a nested record) while the deserializer was configured with an evolved schema, because Avro resolution used the writer's old schema instead of the configured reader schema. This change overrides all three additional deserialize methods to consistently inject sourceSchema, ensuring Avro schema resolution handles old -> new field evolution correctly (defaulting new nullable fields to null). Tests use a Debezium CDC envelope schema with a nested Value record that gains 4 nullable fields (notes, search_engine_id, locale_id, language_id). All 4 deserialize overloads are exercised against old-schema records read with the evolved schema, validating positional index access (index 20-23) on the nested before record to reproduce the AIOOBE. Fixes: apache#18891
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This consistently routes the remaining Kafka deserialize overloads ((topic, bytes), (topic, bytes, Schema), and (topic, Headers, bytes)) through the configured sourceSchema, matching the existing 4-arg override and fixing the Avro schema resolution path so old → new field evolution correctly defaults new nullable fields to null. The added CDC envelope test exercises all four overloads end-to-end. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One minor naming inconsistency in the new production overloads.
cc @yihua
| } | ||
|
|
||
| @Override | ||
| public Object deserialize(String s, byte[] bytes) { |
There was a problem hiding this comment.
🤖 nit: the parameter s in these two overloads is cryptic — the third override on line 72 correctly uses topic, which is also what the parent class uses. Could you rename s to topic in both for consistency?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18892 +/- ##
============================================
- Coverage 68.83% 68.83% -0.01%
- Complexity 29171 29176 +5
============================================
Files 2520 2520
Lines 140024 140027 +3
Branches 17192 17192
============================================
- Hits 96392 96388 -4
- Misses 35858 35870 +12
+ Partials 7774 7769 -5
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Change Logs
KafkaAvroSchemaDeserializerpreviously only overrodedeserialize(String, Boolean, byte[], Schema)to inject the configuredsourceSchema. The Kafka consumer / Connect framework can invoke other overloads —deserialize(String, byte[]),deserialize(String, byte[], Schema), anddeserialize(String, Headers, byte[])— which bypassed thesourceSchemainjection. This causedArrayIndexOutOfBoundsExceptionwhen consuming records serialized with an older schema (fewer fields in a nested record) while the deserializer was configured with an evolved schema, because Avro resolution used the writer's old schema instead of the configured reader schema.This change overrides all three additional
deserializemethods to consistently injectsourceSchema, ensuring Avro schema resolution handles old → new field evolution correctly (defaulting new nullable fields to null).Impact
Bug fix for
KafkaAvroSchemaDeserializer. No public API change. Behavior for the already-overriddendeserialize(String, Boolean, byte[], Schema)is unchanged. The three newly overridden methods now behave consistently with the existing override.Risk level
low
Documentation Update
No user-facing config or API change.
Contributor's checklist
Test Plan
TestKafkaAvroSchemaDeserializeruse a Debezium CDC envelope schema with a nestedValuerecord that gains 4 nullable fields (notes,search_engine_id,locale_id,language_id). All 4 deserialize overloads are exercised against old-schema records read with the evolved schema, validating positional index access (index 20-23) on the nestedbeforerecord to reproduce the AIOOBE.mvn -pl hudi-utilities test -Dtest='TestKafkaAvroSchemaDeserializer'— 2/2 pass.mvn -pl hudi-utilities checkstyle:check— 0 violations.Closes #18891