Skip to content

cmd/storage-consumer: gate apply by checkpoint#4431

Open
3AceShowHand wants to merge 1 commit intopingcap:masterfrom
3AceShowHand:codex/fix-storage-consumer-checkpoint
Open

cmd/storage-consumer: gate apply by checkpoint#4431
3AceShowHand wants to merge 1 commit intopingcap:masterfrom
3AceShowHand:codex/fix-storage-consumer-checkpoint

Conversation

@3AceShowHand
Copy link
Collaborator

@3AceShowHand 3AceShowHand commented Mar 11, 2026

What problem does this PR solve?

Issue Number: close #4430

What is changed and how it works?

cmd/storage-consumer used to apply whatever a scan round happened to discover.
That is unsafe for cloud storage because storage visibility is only stable up to the
producer's metadata.checkpoint-ts.

In the failing ddl_with_random_move_table case, the consumer applied:

  1. RENAME TABLE test.table_8 TO test.table_108
  2. RENAME TABLE test.table_108 TO test.table_8

before a later scan round discovered an older DML file that still belonged to
table_108. That reordered apply across scan rounds and finally failed with
Table 'test.table_108' doesn't exist.

This PR fixes that by changing the consumer side only:

  • read storage metadata every round and treat checkpoint-ts as the stable apply watermark
  • defer schema/DML keys whose tableVersion is greater than the stable checkpoint
  • track discovered files separately from handled files, so deferred keys stay pending across later rounds
  • mark keys handled only after they are successfully applied or intentionally ignored as stale replay
  • move storage-consumer initialization out of init() so the package can be tested normally
  • add an intest regression test that reproduces the rename -> DML -> rename-back visibility gap and verifies the final apply order

With this change, the consumer waits until all files in the rename chain are
stably visible and then applies them in the correct order.

Check List

Tests

  • Unit test

Questions

Will it cause performance regression or break compatibility?

No protocol or producer behavior is changed. The consumer may defer applying a
newly discovered file until metadata.checkpoint-ts reaches that table version,
which is the intended correctness fence.

Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

Fix storage consumer reordering DDL and DML across scan rounds by gating apply on the storage checkpoint metadata.

Summary by CodeRabbit

Release Notes

  • New Features
    • Introduced checkpoint-based processing for data operations, ensuring proper sequencing when handling complex database changes.
    • Enhanced observability with improved logging for checkpoint states and operation deferral conditions.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-triage-completed release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Mar 11, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 11, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign asddongmen for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Mar 11, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 11, 2026

📝 Walkthrough

Walkthrough

This PR fixes a reordering issue in the storage consumer where rename DDLs were applied before all intermediate DML became visible. It adds checkpoint-based deferral logic that reads stable checkpoint timestamps from storage metadata, tracks handled DML indexes separately, and defers operations exceeding the checkpoint.

Changes

Cohort / File(s) Summary
Consumer Checkpoint Logic
cmd/storage-consumer/consumer.go
Adds handledDMLIdxMap to track handled DML paths; introduces getStableCheckpointTs() to read checkpoint from storage; adds cloneFileIndexKeyMap() and markDMLPathHandled() helpers; integrates checkpoint validation in handleNewFiles() to defer processing when table versions exceed stable checkpoint; propagates checkpoint through main processing loop in handle().
Consumer Integration Test
cmd/storage-consumer/consumer_test.go
Introduces recordingSink to record operation tokens and newTestConsumer() test harness with metadata/schema/DML file writers; adds TestHandleNewFilesWaitsForStableCheckpointBeforeApplyingRenameChain to validate correct operation ordering across a two-step DDL rename chain with intermediate DML, verifying consumer defers operations until stable checkpoint permits.
Main Error Handling
cmd/storage-consumer/main.go
Refactors initialization into new initialize() function that returns errors; propagates URI parsing and storage scheme validation errors instead of exiting directly; consolidates error handling to single exit point in main().

Sequence Diagram

sequenceDiagram
    participant Consumer
    participant Storage
    participant Metadata
    participant FileIndex
    participant DDL/DML Handler
    participant HandledDMLMap

    Consumer->>Storage: Scan files (handle round)
    Storage-->>Consumer: File list with table versions
    Consumer->>Metadata: getStableCheckpointTs()
    Metadata-->>Consumer: Stable checkpoint timestamp
    Consumer->>FileIndex: getNewFiles(handledDMLIdxMap)
    FileIndex-->>Consumer: Candidates with table versions
    
    loop For each candidate file
        Consumer->>Consumer: Check tableVersion vs stableCheckpointTs
        alt tableVersion > stableCheckpointTs
            Consumer->>Consumer: Defer (skip processing)
        else tableVersion ≤ stableCheckpointTs
            Consumer->>DDL/DML Handler: Process file
            DDL/DML Handler->>Consumer: Operation completed
            Consumer->>HandledDMLMap: markDMLPathHandled()
            HandledDMLMap-->>Consumer: State updated
        end
    end
    
    Consumer->>Consumer: Next round
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • #4328: Adds per-table version state tracking with checkpoint watermarks to consumer for conditional DDL/DML skipping, using similar patterns to defer operations based on stored version state.

Suggested labels

lgtm, approved, size/L

Suggested reviewers

  • wlwilliamx
  • hongyunyan

Poem

🐰 Files out of order? No more despair!
Checkpoints guide us with checkpoint-aware care.
Renames deferred 'til all DML's here,
Stable ordering's restored, the path is clear! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'cmd/storage-consumer: gate apply by checkpoint' accurately describes the main change: gating file application by the storage checkpoint, which is the core fix for the reordering bug.
Description check ✅ Passed The PR description follows the template structure with issue number, detailed explanation of the problem and solution, test coverage declaration, release notes, and answers to compatibility questions.
Linked Issues check ✅ Passed All objectives from issue #4430 are met: reading storage metadata checkpoint-ts each round, deferring keys above the checkpoint, tracking seen vs handled files separately, and adding regression tests to verify correct apply order.
Out of Scope Changes check ✅ Passed All changes are within scope: consumer.go implements checkpoint-based deferral logic, consumer_test.go adds the regression test, and main.go refactors initialization for testability—all directly supporting the fix requirements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@3AceShowHand
Copy link
Collaborator Author

/test all

@gemini-code-assist
Copy link

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical data consistency issue in the cloud storage consumer where DDL and DML operations could be reordered across scan rounds, leading to errors. By implementing a checkpoint-based gating mechanism, the consumer now defers applying operations until the storage's checkpoint-ts confirms their stability, thereby guaranteeing the correct sequence of events and preventing data integrity problems, especially during complex schema changes like table renames.

Highlights

  • Checkpoint-gated Apply: The storage consumer now reads the storage metadata every round and uses checkpoint-ts as a stable watermark, ensuring that DDL and DML operations are applied only when their tableVersion is less than or equal to this checkpoint.
  • Deferred Processing: Schema and DML keys with a tableVersion greater than the stable checkpoint are now deferred, preventing premature application of data that might not yet be fully visible or stable in cloud storage.
  • Improved File Tracking: Introduced handledDMLIdxMap to track files that have been successfully applied or intentionally ignored, ensuring that deferred keys remain pending across scan rounds until they can be safely processed.
  • Enhanced Testability: The storage-consumer initialization logic has been moved out of the init() function into a separate initialize() function, allowing the package to be tested more effectively.
  • Regression Test: A new integration test (intest) was added to specifically reproduce and verify the fix for the rename -> DML -> rename-back visibility gap, ensuring correct apply order in complex DDL scenarios.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • cmd/storage-consumer/consumer.go
    • Added os package import.
    • Introduced handledDMLIdxMap to track processed DML file indexes.
    • Initialized handledDMLIdxMap in the newConsumer function.
    • Removed the origDMLIdxMap cloning logic from getNewFiles.
    • Modified diffDMLMaps to compare tableDMLIdxMap with handledDMLIdxMap.
    • Added getStableCheckpointTs function to read the checkpoint-ts from the metadata file.
    • Implemented cloneFileIndexKeyMap for deep copying file index maps.
    • Added markDMLPathHandled function to record DML paths as processed.
    • Updated handleNewFiles to accept stableCheckpointTs as a parameter.
    • Introduced logic in handleNewFiles to defer processing of DML/DDL keys if their tableVersion exceeds stableCheckpointTs.
    • Called markDMLPathHandled when DDLs are ignored as stale replays, successfully executed, or DMLs are flushed.
    • Modified the handle function to fetch stableCheckpointTs and pass it to handleNewFiles.
  • cmd/storage-consumer/consumer_test.go
    • Added a new integration test file for the storage consumer.
    • Implemented recordingSink to capture applied DDL and DML operations for verification.
    • Created newTestConsumer helper function for setting up a test consumer instance.
    • Added helper functions writeMetadataFile, writeSchemaFile, and writeDMLFile for test data preparation.
    • Included TestHandleNewFilesWaitsForStableCheckpointBeforeApplyingRenameChain to validate the checkpoint-based deferral logic during table renames and DML operations.
  • cmd/storage-consumer/main.go
    • Added errors package import.
    • Moved the initialization logic from the init() function to a new initialize() function.
    • Updated the main() function to call flag.Parse() and then initialize(), handling potential errors during setup.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
cmd/storage-consumer/main.go (1)

62-81: Wrap external errors with errors.Trace for stack traces.

Per coding guidelines, errors from library calls (like logger.InitLogger and url.Parse) should be wrapped with errors.Trace(err) to attach stack traces. The current import uses standard errors package, but the project uses github.com/pingcap/errors for tracing.

Proposed fix
 import (
 	"context"
-	"errors"
 	"flag"
 	"net/http"
 	"net/url"
 	"os"
 	"os/signal"
 	"strings"
 	"syscall"
 	"time"

+	"github.com/pingcap/errors"
 	"github.com/pingcap/log"
 func initialize() error {
 	err := logger.InitLogger(&logger.Config{
 		Level: logLevel,
 		File:  logFile,
 	})
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}

 	uri, err := url.Parse(upstreamURIStr)
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}
 	upstreamURI = uri
 	scheme := strings.ToLower(upstreamURI.Scheme)
 	if !config.IsStorageScheme(scheme) {
 		return errors.New("invalid storage scheme, the scheme of upstream-uri must be file/s3/azblob/gcs")
 	}
 	return nil
 }

As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with errors.Trace(err) or errors.WrapError(...) to attach a stack trace."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/storage-consumer/main.go` around lines 62 - 81, The initialize function
returns raw library errors; change imports to use github.com/pingcap/errors and
wrap the errors returned from library calls (logger.InitLogger and url.Parse)
with errors.Trace(err) before returning (e.g., return errors.Trace(err)); leave
the invalid-scheme error creation but ensure it uses the same errors package
(errors.New) so you don't mix std errors with pingcap/errors. Ensure you update
imports and replace the two direct "return err" lines in initialize with traced
returns.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@cmd/storage-consumer/consumer_test.go`:
- Line 4: Update the file's top-of-file copyright header to use the correct
year: replace "2026" with "2025" in the copyright comment line at the top of
consumer_test.go (the header comment that currently reads "// Copyright 2026
PingCAP, Inc.").

In `@cmd/storage-consumer/consumer.go`:
- Around line 238-254: In getStableCheckpointTs, replace the
os.IsNotExist(errors.Cause(err)) check with util.IsNotExistInExtStorage(err) so
missing metadata on cloud backends (S3/GCS/Azure) is handled correctly; update
the error handling branch in the consumer.getStableCheckpointTs function and
ensure the util package is imported where this function is defined.

---

Nitpick comments:
In `@cmd/storage-consumer/main.go`:
- Around line 62-81: The initialize function returns raw library errors; change
imports to use github.com/pingcap/errors and wrap the errors returned from
library calls (logger.InitLogger and url.Parse) with errors.Trace(err) before
returning (e.g., return errors.Trace(err)); leave the invalid-scheme error
creation but ensure it uses the same errors package (errors.New) so you don't
mix std errors with pingcap/errors. Ensure you update imports and replace the
two direct "return err" lines in initialize with traced returns.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 000031d2-bb6b-4246-bf16-30842eac6413

📥 Commits

Reviewing files that changed from the base of the PR and between 85cc827 and b831039.

📒 Files selected for processing (3)
  • cmd/storage-consumer/consumer.go
  • cmd/storage-consumer/consumer_test.go
  • cmd/storage-consumer/main.go

//go:build intest
// +build intest

// Copyright 2026 PingCAP, Inc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix copyright year.

The pipeline failed because the copyright year is 2026 but should be 2025 to match the project standard.

Proposed fix
-// Copyright 2026 PingCAP, Inc.
+// Copyright 2025 PingCAP, Inc.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Copyright 2026 PingCAP, Inc.
// Copyright 2025 PingCAP, Inc.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/storage-consumer/consumer_test.go` at line 4, Update the file's
top-of-file copyright header to use the correct year: replace "2026" with "2025"
in the copyright comment line at the top of consumer_test.go (the header comment
that currently reads "// Copyright 2026 PingCAP, Inc.").

Comment on lines +238 to +254
func (c *consumer) getStableCheckpointTs(ctx context.Context) (uint64, error) {
data, err := c.externalStorage.ReadFile(ctx, "metadata")
if err != nil {
if os.IsNotExist(errors.Cause(err)) {
return 0, nil
}
return 0, errors.Trace(err)
}

var metadata struct {
CheckpointTs uint64 `json:"checkpoint-ts"`
}
if err := json.Unmarshal(data, &metadata); err != nil {
return 0, errors.Trace(err)
}
return metadata.CheckpointTs, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how other parts of the codebase handle file-not-found from external storage
rg -n "IsNotExist|FileNotExist|ErrNotExist" --type go -C3 | head -80

Repository: pingcap/ticdc

Length of output: 4543


🏁 Script executed:

#!/bin/bash
# Check the ExternalStorage interface and ReadFile implementations
ast-grep --pattern $'func ($_) ReadFile($_, $_) ($_, error) {
  $$$
}'

Repository: pingcap/ticdc

Length of output: 684


🏁 Script executed:

cat -n pkg/util/external_storage.go | sed -n '360,400p'

Repository: pingcap/ticdc

Length of output: 1404


🏁 Script executed:

# Find if there are cloud storage specific error checks in the codebase
rg -n "ErrNotFound|NoSuchKey|NotFound" --type go | head -30

Repository: pingcap/ticdc

Length of output: 3595


Use util.IsNotExistInExtStorage() to handle cloud storage backends correctly.

The os.IsNotExist(errors.Cause(err)) check only detects local filesystem "file not found" errors. Cloud storage backends (S3, GCS, Azure) return their own error types that os.IsNotExist does not catch. This causes the consumer to fail with an error instead of gracefully handling a missing metadata file on initial startup.

Change line 241 to use util.IsNotExistInExtStorage(err), which properly handles AWS, Google Cloud, and Azure error types. This pattern is already used in pkg/redo/reader/reader.go:278 and downstreamadapter/sink/redo/meta.go:206.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/storage-consumer/consumer.go` around lines 238 - 254, In
getStableCheckpointTs, replace the os.IsNotExist(errors.Cause(err)) check with
util.IsNotExistInExtStorage(err) so missing metadata on cloud backends
(S3/GCS/Azure) is handled correctly; update the error handling branch in the
consumer.getStableCheckpointTs function and ensure the util package is imported
where this function is defined.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a crucial correctness fix for the storage consumer by gating the application of DDL/DML files on a stable checkpoint timestamp read from storage metadata. This prevents applying changes that are not yet stably visible, fixing a potential data inconsistency issue with out-of-order processing. The implementation correctly defers processing of files with a tableVersion greater than the checkpoint and introduces a mechanism to track handled files to avoid reprocessing. The change also includes a valuable refactoring of the main package initialization to improve testability, and adds a comprehensive integration test that validates the new logic. The changes are well-implemented. I have one suggestion regarding logging verbosity.

Comment on lines +590 to +598
log.Info("storage consumer defer file key until checkpoint catches up",
zap.Uint64("round", round),
zap.Int("order", order),
zap.String("schema", key.Schema),
zap.String("table", key.Table),
zap.Uint64("tableVersion", key.TableVersion),
zap.Uint64("stableCheckpointTs", stableCheckpointTs),
zap.Int64("partition", key.PartitionNum),
zap.String("date", key.Date))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This log.Info statement will be executed for every deferred file in every round. If a large number of files are deferred for an extended period (e.g., if the checkpoint is stuck), this could lead to excessive log volume. Consider changing this to log.Debug or logging a summary of deferred files per round to reduce log noise in normal operation.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Mar 11, 2026

@3AceShowHand: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review b831039 link true /test pull-error-log-review
pull-cdc-storage-integration-light b831039 link true /test pull-cdc-storage-integration-light

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-triage-completed release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

storage-consumer can reorder rename DDL and DML across scan rounds

1 participant