Skip to content

cmd: isolate DDL stale drop keys in consumers#4455

Open
wlwilliamx wants to merge 8 commits intopingcap:masterfrom
wlwilliamx:codex/fix-consumer-ddl-order-key
Open

cmd: isolate DDL stale drop keys in consumers#4455
wlwilliamx wants to merge 8 commits intopingcap:masterfrom
wlwilliamx:codex/fix-consumer-ddl-order-key

Conversation

@wlwilliamx
Copy link
Collaborator

@wlwilliamx wlwilliamx commented Mar 12, 2026

What problem does this PR solve?

Issue Number: close #4454

What is changed and how it works?

This PR separates DDL flush coordination keys from DDL stale-drop keys in the Kafka and Pulsar consumers.

getBlockTableIDs() is kept for flush and watermark coordination, but appendDDL() now uses a dedicated ddlOrderKey to decide whether a DDL is truly stale.

For DDLSpan-only CREATE SCHEMA / independent CREATE TABLE, the new logic no longer reuses the shared DDLSpanTableID as the identity key. Instead, it falls back to schema-scoped or table-scoped logical object keys, so cross-object DDLs stay queued and are later sorted by commitTs.

The PR also adds regression tests for:

  • DDLSpan-only cross-object create DDLs
  • same-object stale CREATE SCHEMA
  • same-table stale ALTER TABLE
  • codec-style empty BlockedTables.TableIDs on create DDLs

Check List

Tests

  • Unit test
  • Manual test

Manual test:

go test ./cmd/kafka-consumer ./cmd/pulsar-consumer -count=1

Questions

Will it cause performance regression or break compatibility?

No material performance regression is expected. The change is limited to in-memory DDL stale-drop keying inside the MQ consumers and does not change the existing flush or watermark behavior.

Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

Fix an MQ consumer bug that could drop out-of-order CREATE SCHEMA or independent CREATE TABLE DDLs before commitTs sorting.

Summary by CodeRabbit

  • Bug Fixes

    • Improved DDL execution ordering to correctly handle cross-object and per-object DDLs, preventing out-of-order or stale DDLs from applying and preserving commit-ts ordering.
  • Improvements

    • More nuanced watermark bypass so certain CREATE SCHEMA/CREATE TABLE cases can execute promptly without delaying others.
  • Tests

    • Expanded tests for cross-object DDL ordering, stale-pruning, empty-blocked-table cases, and watermark/commit-ts interactions.

@ti-chi-bot ti-chi-bot bot added the release-note Denotes a PR that will be considered when it comes time to generate release notes. label Mar 12, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 12, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign sdojjy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Mar 12, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 12, 2026

📝 Walkthrough

Walkthrough

Introduces per-DDL-object ordering keys and replaces per-table commit-ts tracking with a keyed map; DDLs are pre-sorted by commit-ts and may bypass watermark for certain independent DDLs; append/write logic updated to drop only truly stale DDLs using the new ordering keys.

Changes

Cohort / File(s) Summary
DDL ordering & writer logic
cmd/kafka-consumer/writer.go, cmd/pulsar-consumer/writer.go
Add ddlOrderKeyKind and ddlOrderKey types; replace ddlWithMaxCommitTs with ddlOrderKeyWithMaxCommitTs; add newDDLOrderKeyForSchema, newDDLOrderKeyForTable, and getDDLOrderKeys; update appendDDL and Write to use order keys, stable-sort DDLs by commitTs, and apply watermark-bypass rules for certain DDLs.
Writer tests
cmd/kafka-consumer/writer_test.go, cmd/pulsar-consumer/writer_test.go
Rename import alias, replace/extend tests: add multiple test cases validating cross-object DDL ordering, empty blocked IDs, stale DDL pruning, and interactions with watermark/commitTs; keep recordingSink usage for verification.

Sequence Diagram(s)

sequenceDiagram
    participant Producer
    participant Writer
    participant Buffer as "DDL Buffer"
    participant Sorter
    participant Watermark as "Watermark Checker"
    participant Sink

    Producer->>Writer: appendDDL(ddl)
    Writer->>Writer: compute orderKeys (getDDLOrderKeys)
    alt commitTs >= maxCommitTs[any orderKey]
        Writer->>Buffer: enqueue DDL
    else
        Writer-->>Producer: drop stale DDL
    end

    Writer->>Sorter: Write() -> fetch buffered DDLs
    Sorter->>Sorter: sort.SliceStable by commitTs

    loop for each DDL in sorted order
        Sorter->>Watermark: isBypass(ddl)? / watermark >= commitTs?
        alt bypassWatermark
            Watermark->>Sink: flush DDL immediately
        else if watermark >= commitTs
            Watermark->>Sink: flush DDL
        else
            Watermark->>Writer: defer remaining DDLs (break)
        end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Suggested labels

lgtm, approved, size/XL

Suggested reviewers

  • hongyunyan
  • 3AceShowHand

Poem

🐰 With tiny paws I hop and say,

I sorted DDLs along the way,
Keys for schema, table, all,
No stale drop will make them fall,
Waters mark the time to play ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title clearly and specifically describes the main change: isolating DDL stale-drop keys, which is the core objective of the PR addressing issue #4454.
Description check ✅ Passed The PR description includes all required template sections: problem statement with linked issue (#4454), detailed explanation of changes, test coverage (unit and manual tests), answers to questions about compatibility and documentation, and a properly formatted release note.
Linked Issues check ✅ Passed The PR directly addresses issue #4454 by introducing dedicated ddlOrderKey logic to replace the problematic shared DDLSpanTableID stale-drop mechanism, ensuring cross-object DDLs remain queued for commitTs sorting while protecting only truly stale objects.
Out of Scope Changes check ✅ Passed All changes are tightly scoped to DDL stale-drop key isolation in Kafka and Pulsar consumers. No extraneous modifications to unrelated functionality; test updates properly focus on regression coverage for the bug being fixed.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refines the DDL (Data Definition Language) processing within the Kafka and Pulsar consumers to prevent the incorrect dropping of out-of-order CREATE SCHEMA or independent CREATE TABLE DDLs. By introducing a dedicated ddlOrderKey mechanism, the system can now accurately differentiate between DDLs that require flush coordination and those that need stale-drop protection, ensuring DDLs are applied in the correct commit timestamp order while avoiding unnecessary blocking. This change improves the robustness of DDL replication, particularly in scenarios involving concurrent or out-of-sequence DDL events.

Highlights

  • Introduced ddlOrderKey: A new type ddlOrderKey was added to precisely identify logical objects for DDL ordering guarantees, distinguishing it from flush barrier keys.
  • Refined Stale DDL Detection: The appendDDL function now uses ddlOrderKey to prevent truly stale DDLs from being dropped, especially for CREATE SCHEMA and independent CREATE TABLE operations.
  • Conditional Watermark Bypass: The Write method was updated to sort DDLs by commit timestamp and to conditionally bypass watermark checks for CREATE SCHEMA and independent CREATE TABLE DDLs, improving DDL execution flow.
  • Enhanced Test Coverage: New regression tests were added to cover various DDL ordering scenarios, including cross-object DDLs, stale DDLs with empty blocked table IDs, and CREATE TABLE ... LIKE ... behavior.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • cmd/kafka-consumer/writer.go
    • Imported the sort package.
    • Defined ddlOrderKeyKind and ddlOrderKey structs for DDL ordering.
    • Replaced ddlWithMaxCommitTs with ddlOrderKeyWithMaxCommitTs in the writer struct.
    • Implemented newDDLOrderKeyForSchema, newDDLOrderKeyForTable, and getDDLOrderKeys functions.
    • Modified appendDDL to use ddlOrderKey for tracking maximum commit timestamps.
    • Updated Write to sort ddlList by commit timestamp and added logic to bypass watermark for CREATE SCHEMA and independent CREATE TABLE DDLs.
  • cmd/kafka-consumer/writer_test.go
    • Added a recordingSink mock for testing DDL execution.
    • Introduced TestWriterWrite_executesIndependentCreateTableWithoutWatermark to verify watermark bypass for independent CREATE TABLE.
    • Added TestWriterWrite_preservesOrderWhenBlockedDDLNotReady to ensure correct ordering with blocking DDLs.
    • Included TestWriterWrite_doesNotBypassWatermarkForCreateTableLike to confirm no watermark bypass for CREATE TABLE ... LIKE ....
    • Added TestWriterWrite_handlesOutOfOrderDDLsByCommitTs to test DDL sorting by commit timestamp.
    • Implemented TestWriterAppendDDL_keepsCrossObjectDDLSpanOnlyDDLs and TestWriterAppendDDL_keepsCrossObjectCreateDDLsWithEmptyBlockedTableIDs for cross-object DDL handling.
    • Added TestWriterAppendDDL_dropsStaleCreateSchemaWithEmptyBlockedTableIDs, TestWriterAppendDDL_dropsStaleDDLForSameDDLSpanOnlyObject, and TestWriterAppendDDL_dropsStaleDDLForSameBlockedTable for stale DDL dropping.
  • cmd/pulsar-consumer/writer.go
    • Imported the sort package.
    • Defined ddlOrderKeyKind and ddlOrderKey structs for DDL ordering.
    • Replaced ddlWithMaxCommitTs with ddlOrderKeyWithMaxCommitTs in the writer struct.
    • Implemented newDDLOrderKeyForSchema, newDDLOrderKeyForTable, and getDDLOrderKeys functions.
    • Modified appendDDL to use ddlOrderKey for tracking maximum commit timestamps.
    • Updated Write to sort ddlList by commit timestamp and added logic to bypass watermark for CREATE SCHEMA and independent CREATE TABLE DDLs.
  • cmd/pulsar-consumer/writer_test.go
    • Added a recordingSink mock for testing DDL execution.
    • Introduced TestWriterWrite_executesIndependentCreateTableWithoutWatermark to verify watermark bypass for independent CREATE TABLE.
    • Added TestWriterWrite_preservesOrderWhenBlockedDDLNotReady to ensure correct ordering with blocking DDLs.
    • Included TestWriterWrite_doesNotBypassWatermarkForCreateTableLike to confirm no watermark bypass for CREATE TABLE ... LIKE ....
    • Added TestWriterWrite_handlesOutOfOrderDDLsByCommitTs to test DDL sorting by commit timestamp.
    • Implemented TestWriterAppendDDL_keepsCrossObjectDDLSpanOnlyDDLs and TestWriterAppendDDL_keepsCrossObjectCreateDDLsWithEmptyBlockedTableIDs for cross-object DDL handling.
    • Added TestWriterAppendDDL_dropsStaleCreateSchemaWithEmptyBlockedTableIDs, TestWriterAppendDDL_dropsStaleDDLForSameDDLSpanOnlyObject, and TestWriterAppendDDL_dropsStaleDDLForSameBlockedTable for stale DDL dropping.
Activity
  • The pull request addresses and closes issue MQ consumer can drop valid DDLSpan-only DDLs before sorting #4454.
  • Unit tests were executed to validate the changes.
  • Manual tests were performed using go test ./cmd/kafka-consumer ./cmd/pulsar-consumer -count=1.
  • No performance regression or compatibility issues are anticipated.
  • No updates to user, design, or monitoring documentation are required.
  • A release note was provided, indicating a fix for an MQ consumer bug that could drop out-of-order CREATE SCHEMA or independent CREATE TABLE DDLs.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a bug in MQ consumers where out-of-order DDLs could be incorrectly dropped. The introduction of ddlOrderKey to distinguish between flush coordination and stale-drop logic is a solid approach. The changes are well-commented, and the addition of comprehensive regression tests is commendable. My review includes suggestions to address significant code duplication between the Kafka and Pulsar consumers, and a potential issue in the new watermark bypass logic.

Comment on lines +554 to +558
bypassWatermark = blockedTables != nil &&
blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal &&
len(blockedTables.TableIDs) == 1 &&
blockedTables.TableIDs[0] == commonType.DDLSpanTableID &&
len(todoDDL.GetBlockedTableNames()) == 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic to bypass the watermark for an independent CREATE TABLE DDL seems to handle only the case where TableIDs contains just DDLSpanTableID. However, as shown in TestWriterAppendDDL_keepsCrossObjectCreateDDLsWithEmptyBlockedTableIDs, an independent CREATE TABLE can also have an empty TableIDs. In that scenario, this logic will not bypass the watermark, which could lead to the same deadlock issue in integration tests that this change aims to solve.

Consider adjusting the condition to also handle an empty TableIDs slice for independent CREATE TABLE DDLs.

Suggested change
bypassWatermark = blockedTables != nil &&
blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal &&
len(blockedTables.TableIDs) == 1 &&
blockedTables.TableIDs[0] == commonType.DDLSpanTableID &&
len(todoDDL.GetBlockedTableNames()) == 0
bypassWatermark = blockedTables != nil &&
blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal &&
(len(blockedTables.TableIDs) == 0 ||
(len(blockedTables.TableIDs) == 1 && blockedTables.TableIDs[0] == commonType.DDLSpanTableID)) &&
len(todoDDL.GetBlockedTableNames()) == 0

Comment on lines +493 to +497
bypassWatermark = blockedTables != nil &&
blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal &&
len(blockedTables.TableIDs) == 1 &&
blockedTables.TableIDs[0] == commonType.DDLSpanTableID &&
len(todoDDL.GetBlockedTableNames()) == 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic to bypass the watermark for an independent CREATE TABLE DDL seems to handle only the case where TableIDs contains just DDLSpanTableID. However, as shown in TestWriterAppendDDL_keepsCrossObjectCreateDDLsWithEmptyBlockedTableIDs, an independent CREATE TABLE can also have an empty TableIDs. In that scenario, this logic will not bypass the watermark, which could lead to the same deadlock issue in integration tests that this change aims to solve.

Consider adjusting the condition to also handle an empty TableIDs slice for independent CREATE TABLE DDLs.

Suggested change
bypassWatermark = blockedTables != nil &&
blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal &&
len(blockedTables.TableIDs) == 1 &&
blockedTables.TableIDs[0] == commonType.DDLSpanTableID &&
len(todoDDL.GetBlockedTableNames()) == 0
bypassWatermark = blockedTables != nil &&
blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal &&
(len(blockedTables.TableIDs) == 0 ||
(len(blockedTables.TableIDs) == 1 && blockedTables.TableIDs[0] == commonType.DDLSpanTableID)) &&
len(todoDDL.GetBlockedTableNames()) == 0

Comment on lines +42 to +61
type ddlOrderKeyKind uint8

const (
ddlOrderKeyKindGlobal ddlOrderKeyKind = iota
ddlOrderKeyKindTable
ddlOrderKeyKindSchema
ddlOrderKeyKindSchemaTable
)

// ddlOrderKey identifies which logical object a DDL ordering guarantee applies to.
//
// It is intentionally different from the flush barrier set returned by getBlockTableIDs():
// DDLSpanTableID is a shared coordination barrier, not a real object identity. Reusing it as the
// stale-drop key causes unrelated CREATE SCHEMA / independent CREATE TABLE DDLs to suppress each other.
type ddlOrderKey struct {
kind ddlOrderKeyKind
id int64
schema string
table string
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new DDL ordering logic, including ddlOrderKeyKind, ddlOrderKey, and the helper functions newDDLOrderKeyForSchema, newDDLOrderKeyForTable, and getDDLOrderKeys, is also present in cmd/pulsar-consumer/writer.go. To improve maintainability and avoid code duplication, consider extracting this shared logic into a common package, for example, in pkg/cmd/util or a new dedicated package. This would ensure that any future changes to this logic only need to be made in one place.

Comment on lines +28 to +57
// recordingSink is a minimal sink.Sink implementation that records which DDLs are executed.
//
// It lets unit tests validate consumer-side DDL flushing behavior without requiring a real downstream.
type recordingSink struct {
ddls []string
}

var _ sink.Sink = (*recordingSink)(nil)

func (s *recordingSink) SinkType() common.SinkType { return common.MysqlSinkType }
func (s *recordingSink) IsNormal() bool { return true }
func (s *recordingSink) AddDMLEvent(_ *commonEvent.DMLEvent) {
}

func (s *recordingSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
if ddl, ok := event.(*commonEvent.DDLEvent); ok {
s.ddls = append(s.ddls, ddl.Query)
}
return nil
}

func (s *recordingSink) AddCheckpointTs(_ uint64) {
}

func (s *recordingSink) SetTableSchemaStore(_ *commonEvent.TableSchemaStore) {
}

func (s *recordingSink) Close(_ bool) {
}
func (s *recordingSink) Run(_ context.Context) error { return nil }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test file is nearly identical to cmd/pulsar-consumer/writer_test.go. The recordingSink and all the test cases are duplicated. This indicates that the tested DDL handling logic is generic. To reduce code duplication and make tests easier to maintain, consider refactoring these tests into a shared test suite. You could create test helpers in a common package that take a writer interface or a factory function to set up the consumer-specific writer, allowing the same test logic to be executed for both Kafka and Pulsar consumers.

…r-ddl-order-key

# Conflicts:
#	cmd/kafka-consumer/writer.go
#	cmd/kafka-consumer/writer_test.go
#	cmd/pulsar-consumer/writer.go
#	cmd/pulsar-consumer/writer_test.go
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/kafka-consumer/writer.go`:
- Around line 315-330: The current early-return for
commonEvent.InfluenceTypeNormal collects blockedTables.TableIDs and returns them
before handling timodel.ActionCreateTable, causing CREATE TABLE ... LIKE src to
be keyed by src; change the logic so ActionCreateTable is handled before the
early return (or special-case it) by invoking the switch
timodel.ActionType(ddl.Type) (and adding newDDLOrderKeyForTable(ddl)) prior to
returning keys, or ensure when ddl is ActionCreateTable you add the
created-table key (newDDLOrderKeyForTable) and avoid returning only
blockedTables.TableIDs; update the code paths around
commonEvent.InfluenceTypeNormal, the blockedTables.TableIDs loop, the early
return, and the switch handling (timodel.ActionCreateTable /
newDDLOrderKeyForTable) accordingly so stale-drop uses the created table
identity.

In `@cmd/pulsar-consumer/writer.go`:
- Around line 306-324: The current commonEvent.InfluenceTypeNormal block returns
blockedTables.TableIDs as soon as any real table ID is present, which causes
ActionCreateTable to be keyed by the referenced/source table instead of the
created table; modify the code in that case block (around
blockedTables.TableIDs, ddlOrderKey, ddlOrderKeyKindTable,
commonType.DDLSpanTableID, timodel.ActionType(ddl.Type),
timodel.ActionCreateTable, newDDLOrderKeyForTable, newDDLOrderKeyForSchema,
addGlobalKey) so that ActionCreateTable always uses the created-table key:
specifically, do not short-circuit and return keys for the general blocked table
case when timodel.ActionCreateTable applies—instead either move the len(keys) !=
0 check after the switch or special-case ActionCreateTable to replace/override
keys with newDDLOrderKeyForTable(ddl) (and return that), leaving other action
types to use blocked tables or the schema/global keys as before.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 85c99fc4-1959-4be3-bf91-195c6c2167c7

📥 Commits

Reviewing files that changed from the base of the PR and between 8a61ab3 and 585691a.

📒 Files selected for processing (4)
  • cmd/kafka-consumer/writer.go
  • cmd/kafka-consumer/writer_test.go
  • cmd/pulsar-consumer/writer.go
  • cmd/pulsar-consumer/writer_test.go

Comment on lines +315 to +330
case commonEvent.InfluenceTypeNormal:
for _, tableID := range blockedTables.TableIDs {
if tableID == commonType.DDLSpanTableID {
continue
}
keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
}
if len(keys) != 0 {
return keys
}

switch timodel.ActionType(ddl.Type) {
case timodel.ActionCreateSchema:
keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
case timodel.ActionCreateTable:
keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't use referenced source-table IDs as the stale key for ActionCreateTable.

Lines 315-323 return the blocked table IDs for every normal DDL, so CREATE TABLE ... LIKE src is still keyed by src instead of by the created table. If CREATE TABLE dst1 LIKE src and CREATE TABLE dst2 LIKE src arrive out of order, the older one will be dropped as "stale" even though the two DDLs target different objects. The source-table dependency is already handled by watermark gating; stale-drop should use the created table identity here.

💡 Suggested fix
 case commonEvent.InfluenceTypeNormal:
+	if timodel.ActionType(ddl.Type) == timodel.ActionCreateTable {
+		keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
+		return keys
+	}
 	for _, tableID := range blockedTables.TableIDs {
 		if tableID == commonType.DDLSpanTableID {
 			continue
 		}
 		keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
 	}
 	if len(keys) != 0 {
 		return keys
 	}

 	switch timodel.ActionType(ddl.Type) {
 	case timodel.ActionCreateSchema:
 		keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
-	case timodel.ActionCreateTable:
-		keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
 	default:
 		addGlobalKey()
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case commonEvent.InfluenceTypeNormal:
for _, tableID := range blockedTables.TableIDs {
if tableID == commonType.DDLSpanTableID {
continue
}
keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
}
if len(keys) != 0 {
return keys
}
switch timodel.ActionType(ddl.Type) {
case timodel.ActionCreateSchema:
keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
case timodel.ActionCreateTable:
keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
case commonEvent.InfluenceTypeNormal:
if timodel.ActionType(ddl.Type) == timodel.ActionCreateTable {
keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
return keys
}
for _, tableID := range blockedTables.TableIDs {
if tableID == commonType.DDLSpanTableID {
continue
}
keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
}
if len(keys) != 0 {
return keys
}
switch timodel.ActionType(ddl.Type) {
case timodel.ActionCreateSchema:
keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/kafka-consumer/writer.go` around lines 315 - 330, The current
early-return for commonEvent.InfluenceTypeNormal collects blockedTables.TableIDs
and returns them before handling timodel.ActionCreateTable, causing CREATE TABLE
... LIKE src to be keyed by src; change the logic so ActionCreateTable is
handled before the early return (or special-case it) by invoking the switch
timodel.ActionType(ddl.Type) (and adding newDDLOrderKeyForTable(ddl)) prior to
returning keys, or ensure when ddl is ActionCreateTable you add the
created-table key (newDDLOrderKeyForTable) and avoid returning only
blockedTables.TableIDs; update the code paths around
commonEvent.InfluenceTypeNormal, the blockedTables.TableIDs loop, the early
return, and the switch handling (timodel.ActionCreateTable /
newDDLOrderKeyForTable) accordingly so stale-drop uses the created table
identity.

Comment on lines +306 to +324
case commonEvent.InfluenceTypeNormal:
for _, tableID := range blockedTables.TableIDs {
if tableID == commonType.DDLSpanTableID {
continue
}
keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
}
if len(keys) != 0 {
return keys
}

switch timodel.ActionType(ddl.Type) {
case timodel.ActionCreateSchema:
keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
case timodel.ActionCreateTable:
keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
default:
addGlobalKey()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Key ActionCreateTable by the created table, not the referenced blocked table.

Lines 306-315 return BlockedTables.TableIDs as soon as any real table ID is present, so CREATE TABLE ... LIKE src is still keyed by the source table. That means two valid DDLs like CREATE TABLE dst1 LIKE src and CREATE TABLE dst2 LIKE src can suppress each other if they arrive out of order, even though they target different objects. Watermark gating already protects the source-table dependency; the stale-drop key for ActionCreateTable should be the created table only.

💡 Suggested fix
 case commonEvent.InfluenceTypeNormal:
+	if timodel.ActionType(ddl.Type) == timodel.ActionCreateTable {
+		keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
+		return keys
+	}
 	for _, tableID := range blockedTables.TableIDs {
 		if tableID == commonType.DDLSpanTableID {
 			continue
 		}
 		keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
 	}
 	if len(keys) != 0 {
 		return keys
 	}

 	switch timodel.ActionType(ddl.Type) {
 	case timodel.ActionCreateSchema:
 		keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
-	case timodel.ActionCreateTable:
-		keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
 	default:
 		addGlobalKey()
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case commonEvent.InfluenceTypeNormal:
for _, tableID := range blockedTables.TableIDs {
if tableID == commonType.DDLSpanTableID {
continue
}
keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
}
if len(keys) != 0 {
return keys
}
switch timodel.ActionType(ddl.Type) {
case timodel.ActionCreateSchema:
keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
case timodel.ActionCreateTable:
keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
default:
addGlobalKey()
}
case commonEvent.InfluenceTypeNormal:
if timodel.ActionType(ddl.Type) == timodel.ActionCreateTable {
keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
return keys
}
for _, tableID := range blockedTables.TableIDs {
if tableID == commonType.DDLSpanTableID {
continue
}
keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
}
if len(keys) != 0 {
return keys
}
switch timodel.ActionType(ddl.Type) {
case timodel.ActionCreateSchema:
keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
default:
addGlobalKey()
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/pulsar-consumer/writer.go` around lines 306 - 324, The current
commonEvent.InfluenceTypeNormal block returns blockedTables.TableIDs as soon as
any real table ID is present, which causes ActionCreateTable to be keyed by the
referenced/source table instead of the created table; modify the code in that
case block (around blockedTables.TableIDs, ddlOrderKey, ddlOrderKeyKindTable,
commonType.DDLSpanTableID, timodel.ActionType(ddl.Type),
timodel.ActionCreateTable, newDDLOrderKeyForTable, newDDLOrderKeyForSchema,
addGlobalKey) so that ActionCreateTable always uses the created-table key:
specifically, do not short-circuit and return keys for the general blocked table
case when timodel.ActionCreateTable applies—instead either move the len(keys) !=
0 check after the switch or special-case ActionCreateTable to replace/override
keys with newDDLOrderKeyForTable(ddl) (and return that), leaving other action
types to use blocked tables or the schema/global keys as before.

@wlwilliamx
Copy link
Collaborator Author

/test all

@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 13, 2026

@wlwilliamx: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 585691a link true /test pull-error-log-review
pull-cdc-pulsar-integration-heavy 585691a link false /test pull-cdc-pulsar-integration-heavy
pull-cdc-kafka-integration-light 585691a link true /test pull-cdc-kafka-integration-light
pull-cdc-storage-integration-heavy 585691a link true /test pull-cdc-storage-integration-heavy

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

MQ consumer can drop valid DDLSpan-only DDLs before sorting

1 participant