consumer: dedupe replayed DML by applied watermark for storage consumer#4328
consumer: dedupe replayed DML by applied watermark for storage consumer#4328ti-chi-bot[bot] merged 3 commits intomasterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves an issue in the storage consumer where the order of processing Data Definition Language (DDL) and Data Manipulation Language (DML) events was incorrect. The changes introduce a revised sorting mechanism to ensure that for any given table version, all DML operations are processed and applied before any corresponding DDL operations, thereby maintaining data integrity and consistency during replication. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughAdded table-level DDL watermark tracking and DDL replay guards to the storage consumer: record max executed DDL version per table, skip stale DDL/DML based on that watermark, update watermark after successful DDL execution, and add related logging and control-flow guards. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request corrects the processing order of DDL and DML events in the storage consumer, ensuring that for the same table version, DML events are processed before DDL events. This is achieved by refactoring the sorting logic into a new dmlPathKeyLess function with the corrected comparison logic. The change is clear and addresses the issue. My main feedback is to add unit tests for the new critical sorting function to ensure its correctness and prevent future regressions.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
cmd/storage-consumer/consumer.go (1)
547-547:⚠️ Potential issue | 🔴 CriticalHandle DML flush errors before continuing to DDL.
Line 547 drops the
flushDMLEventserror. If flush fails, processing can continue and execute later DDL, breaking ordering guarantees under failure paths.Suggested fix
- c.flushDMLEvents(ctx, tableID) + if err := c.flushDMLEvents(ctx, tableID); err != nil { + return errors.Trace(err) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/storage-consumer/consumer.go` at line 547, The call to c.flushDMLEvents(ctx, tableID) currently ignores any returned error which can allow subsequent DDL to run and break ordering; update the call site to capture the error (err := c.flushDMLEvents(ctx, tableID)), check it, and if non-nil log/return/propagate the error (e.g., return fmt.Errorf or propagate the existing function's error) so processing stops and DDL is not executed when DML flush fails, preserving ordering guarantees; ensure you reference c.flushDMLEvents and the enclosing function's error flow when adding the early-return or error propagation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@cmd/storage-consumer/consumer.go`:
- Line 547: The call to c.flushDMLEvents(ctx, tableID) currently ignores any
returned error which can allow subsequent DDL to run and break ordering; update
the call site to capture the error (err := c.flushDMLEvents(ctx, tableID)),
check it, and if non-nil log/return/propagate the error (e.g., return fmt.Errorf
or propagate the existing function's error) so processing stops and DDL is not
executed when DML flush fails, preserving ordering guarantees; ensure you
reference c.flushDMLEvents and the enclosing function's error flow when adding
the early-return or error propagation.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, wlwilliamx The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
|
/retest |
|
/test all |
|
/test storage |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
cmd/storage-consumer/consumer.go (1)
541-564: Explicitly skip schema sentinel keys with empty query.When
PartitionNum == -1andDate == ""buttableDef.Queryis empty, this currently falls through to DML handling (no-op today). Add an earlycontinueto avoid unnecessary fake table-id allocation and processing.♻️ Proposed refactor
- if key.PartitionNum == fakePartitionNumForSchemaFile && len(key.Date) == 0 && len(tableDef.Query) > 0 { + if key.PartitionNum == fakePartitionNumForSchemaFile && len(key.Date) == 0 { + if len(tableDef.Query) == 0 { + continue + } if key.TableVersion <= ddlWatermark { log.Warn("DDL event replayed with stale table version, ignore it", zap.String("schema", key.Schema), zap.String("table", key.Table), zap.Uint64("tableVersion", key.TableVersion), zap.Uint64("ddlWatermark", ddlWatermark), zap.String("query", tableDef.Query)) continue } @@ log.Info("execute ddl event successfully", zap.String("query", tableDef.Query), zap.String("schema", key.Schema), zap.String("table", key.Table), zap.Uint64("ddlWatermark", c.tableDDLWatermark[tableKey])) continue }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/storage-consumer/consumer.go` around lines 541 - 564, The code path handling schema sentinel keys doesn't skip cases where key.PartitionNum == fakePartitionNumForSchemaFile and key.Date == "" but tableDef.Query is empty, causing unnecessary DML handling and fake table-id allocation; update the conditional in the loop that checks key.PartitionNum, key.Date and tableDef.Query (the block that creates ddlEvent via tableDef.ToDDLEvent, calls c.sink.WriteBlockEvent, and sets c.tableDDLWatermark[tableKey]) to add an early continue when tableDef.Query is empty (i.e., return immediately to the next iteration if tableDef.Query == ""), so sentinel keys with empty queries are skipped before any fake table-id or DML processing occurs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@cmd/storage-consumer/consumer.go`:
- Around line 541-564: The code path handling schema sentinel keys doesn't skip
cases where key.PartitionNum == fakePartitionNumForSchemaFile and key.Date == ""
but tableDef.Query is empty, causing unnecessary DML handling and fake table-id
allocation; update the conditional in the loop that checks key.PartitionNum,
key.Date and tableDef.Query (the block that creates ddlEvent via
tableDef.ToDDLEvent, calls c.sink.WriteBlockEvent, and sets
c.tableDDLWatermark[tableKey]) to add an early continue when tableDef.Query is
empty (i.e., return immediately to the next iteration if tableDef.Query == ""),
so sentinel keys with empty queries are skipped before any fake table-id or DML
processing occurs.
|
/retest |
1 similar comment
|
/retest |
What problem does this PR solve?
Issue Number: close #4327 ref #4051
What is changed and how it works?
Storage consumer also needs to be fixed
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Bug Fixes
Chores