Skip to content

[HUDI-18891] fix(streamer): override all deserialize() overloads in KafkaAvroSchemaDeserializer#18892

Open
nsivabalan wants to merge 1 commit into
apache:masterfrom
nsivabalan:fixKafkaAvroSchemaDeserOverloads
Open

[HUDI-18891] fix(streamer): override all deserialize() overloads in KafkaAvroSchemaDeserializer#18892
nsivabalan wants to merge 1 commit into
apache:masterfrom
nsivabalan:fixKafkaAvroSchemaDeserOverloads

Conversation

@nsivabalan
Copy link
Copy Markdown
Contributor

Change Logs

KafkaAvroSchemaDeserializer previously only overrode deserialize(String, Boolean, byte[], Schema) to inject the configured sourceSchema. The Kafka consumer / Connect framework can invoke other overloads — deserialize(String, byte[]), deserialize(String, byte[], Schema), and deserialize(String, Headers, byte[]) — which bypassed the sourceSchema injection. This caused ArrayIndexOutOfBoundsException when consuming records serialized with an older schema (fewer fields in a nested record) while the deserializer was configured with an evolved schema, because Avro resolution used the writer's old schema instead of the configured reader schema.

This change overrides all three additional deserialize methods to consistently inject sourceSchema, ensuring Avro schema resolution handles old → new field evolution correctly (defaulting new nullable fields to null).

Impact

Bug fix for KafkaAvroSchemaDeserializer. No public API change. Behavior for the already-overridden deserialize(String, Boolean, byte[], Schema) is unchanged. The three newly overridden methods now behave consistently with the existing override.

Risk level

low

Documentation Update

No user-facing config or API change.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added

Test Plan

  • New tests in TestKafkaAvroSchemaDeserializer use a Debezium CDC envelope schema with a nested Value record that gains 4 nullable fields (notes, search_engine_id, locale_id, language_id). All 4 deserialize overloads are exercised against old-schema records read with the evolved schema, validating positional index access (index 20-23) on the nested before record to reproduce the AIOOBE.
  • mvn -pl hudi-utilities test -Dtest='TestKafkaAvroSchemaDeserializer' — 2/2 pass.
  • mvn -pl hudi-utilities checkstyle:check — 0 violations.

Closes #18891

…afkaAvroSchemaDeserializer

KafkaAvroSchemaDeserializer previously only overrode
deserialize(String, Boolean, byte[], Schema) to inject the configured
sourceSchema. The Kafka consumer / Connect framework can invoke other
overloads — deserialize(String, byte[]), deserialize(String, byte[], Schema),
and deserialize(String, Headers, byte[]) — which bypassed the
sourceSchema injection. This caused ArrayIndexOutOfBoundsException
when consuming records serialized with an older schema (fewer fields
in a nested record) while the deserializer was configured with an
evolved schema, because Avro resolution used the writer's old schema
instead of the configured reader schema.

This change overrides all three additional deserialize methods to
consistently inject sourceSchema, ensuring Avro schema resolution
handles old -> new field evolution correctly (defaulting new nullable
fields to null).

Tests use a Debezium CDC envelope schema with a nested Value record
that gains 4 nullable fields (notes, search_engine_id, locale_id,
language_id). All 4 deserialize overloads are exercised against
old-schema records read with the evolved schema, validating
positional index access (index 20-23) on the nested before record
to reproduce the AIOOBE.

Fixes: apache#18891
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This consistently routes the remaining Kafka deserialize overloads ((topic, bytes), (topic, bytes, Schema), and (topic, Headers, bytes)) through the configured sourceSchema, matching the existing 4-arg override and fixing the Avro schema resolution path so old → new field evolution correctly defaults new nullable fields to null. The added CDC envelope test exercises all four overloads end-to-end. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One minor naming inconsistency in the new production overloads.

cc @yihua

}

@Override
public Object deserialize(String s, byte[] bytes) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the parameter s in these two overloads is cryptic — the third override on line 72 correctly uses topic, which is also what the parent class uses. Could you rename s to topic in both for consistency?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 68.83%. Comparing base (11f0e7c) to head (2e163ff).

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18892      +/-   ##
============================================
- Coverage     68.83%   68.83%   -0.01%     
- Complexity    29171    29176       +5     
============================================
  Files          2520     2520              
  Lines        140024   140027       +3     
  Branches      17192    17192              
============================================
- Hits          96392    96388       -4     
- Misses        35858    35870      +12     
+ Partials       7774     7769       -5     
Flag Coverage Δ
common-and-other-modules 44.35% <100.00%> (+<0.01%) ⬆️
hadoop-mr-java-client 44.90% <ø> (-0.02%) ⬇️
spark-client-hadoop-common 48.22% <ø> (-0.01%) ⬇️
spark-java-tests 49.37% <0.00%> (+0.01%) ⬆️
spark-scala-tests 45.26% <0.00%> (-0.02%) ⬇️
utilities 37.44% <0.00%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...i/utilities/deser/KafkaAvroSchemaDeserializer.java 80.95% <100.00%> (+3.17%) ⬆️

... and 16 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

hudi-bot commented Jun 1, 2026

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

KafkaAvroSchemaDeserializer bypasses sourceSchema injection for 3 of 4 deserialize overloads, causing AIOOBE on schema evolution

4 participants