cmd/storage-consumer: gate apply by checkpoint#4431
cmd/storage-consumer: gate apply by checkpoint#44313AceShowHand wants to merge 1 commit intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR fixes a reordering issue in the storage consumer where rename DDLs were applied before all intermediate DML became visible. It adds checkpoint-based deferral logic that reads stable checkpoint timestamps from storage metadata, tracks handled DML indexes separately, and defers operations exceeding the checkpoint. Changes
Sequence DiagramsequenceDiagram
participant Consumer
participant Storage
participant Metadata
participant FileIndex
participant DDL/DML Handler
participant HandledDMLMap
Consumer->>Storage: Scan files (handle round)
Storage-->>Consumer: File list with table versions
Consumer->>Metadata: getStableCheckpointTs()
Metadata-->>Consumer: Stable checkpoint timestamp
Consumer->>FileIndex: getNewFiles(handledDMLIdxMap)
FileIndex-->>Consumer: Candidates with table versions
loop For each candidate file
Consumer->>Consumer: Check tableVersion vs stableCheckpointTs
alt tableVersion > stableCheckpointTs
Consumer->>Consumer: Defer (skip processing)
else tableVersion ≤ stableCheckpointTs
Consumer->>DDL/DML Handler: Process file
DDL/DML Handler->>Consumer: Operation completed
Consumer->>HandledDMLMap: markDMLPathHandled()
HandledDMLMap-->>Consumer: State updated
end
end
Consumer->>Consumer: Next round
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/test all |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical data consistency issue in the cloud storage consumer where DDL and DML operations could be reordered across scan rounds, leading to errors. By implementing a checkpoint-based gating mechanism, the consumer now defers applying operations until the storage's Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
cmd/storage-consumer/main.go (1)
62-81: Wrap external errors witherrors.Tracefor stack traces.Per coding guidelines, errors from library calls (like
logger.InitLoggerandurl.Parse) should be wrapped witherrors.Trace(err)to attach stack traces. The current import uses standarderrorspackage, but the project usesgithub.com/pingcap/errorsfor tracing.Proposed fix
import ( "context" - "errors" "flag" "net/http" "net/url" "os" "os/signal" "strings" "syscall" "time" + "github.com/pingcap/errors" "github.com/pingcap/log"func initialize() error { err := logger.InitLogger(&logger.Config{ Level: logLevel, File: logFile, }) if err != nil { - return err + return errors.Trace(err) } uri, err := url.Parse(upstreamURIStr) if err != nil { - return err + return errors.Trace(err) } upstreamURI = uri scheme := strings.ToLower(upstreamURI.Scheme) if !config.IsStorageScheme(scheme) { return errors.New("invalid storage scheme, the scheme of upstream-uri must be file/s3/azblob/gcs") } return nil }As per coding guidelines: "When an error comes from a third-party or library call in Go, wrap it immediately with
errors.Trace(err)orerrors.WrapError(...)to attach a stack trace."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/storage-consumer/main.go` around lines 62 - 81, The initialize function returns raw library errors; change imports to use github.com/pingcap/errors and wrap the errors returned from library calls (logger.InitLogger and url.Parse) with errors.Trace(err) before returning (e.g., return errors.Trace(err)); leave the invalid-scheme error creation but ensure it uses the same errors package (errors.New) so you don't mix std errors with pingcap/errors. Ensure you update imports and replace the two direct "return err" lines in initialize with traced returns.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/storage-consumer/consumer_test.go`:
- Line 4: Update the file's top-of-file copyright header to use the correct
year: replace "2026" with "2025" in the copyright comment line at the top of
consumer_test.go (the header comment that currently reads "// Copyright 2026
PingCAP, Inc.").
In `@cmd/storage-consumer/consumer.go`:
- Around line 238-254: In getStableCheckpointTs, replace the
os.IsNotExist(errors.Cause(err)) check with util.IsNotExistInExtStorage(err) so
missing metadata on cloud backends (S3/GCS/Azure) is handled correctly; update
the error handling branch in the consumer.getStableCheckpointTs function and
ensure the util package is imported where this function is defined.
---
Nitpick comments:
In `@cmd/storage-consumer/main.go`:
- Around line 62-81: The initialize function returns raw library errors; change
imports to use github.com/pingcap/errors and wrap the errors returned from
library calls (logger.InitLogger and url.Parse) with errors.Trace(err) before
returning (e.g., return errors.Trace(err)); leave the invalid-scheme error
creation but ensure it uses the same errors package (errors.New) so you don't
mix std errors with pingcap/errors. Ensure you update imports and replace the
two direct "return err" lines in initialize with traced returns.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 000031d2-bb6b-4246-bf16-30842eac6413
📒 Files selected for processing (3)
cmd/storage-consumer/consumer.gocmd/storage-consumer/consumer_test.gocmd/storage-consumer/main.go
| //go:build intest | ||
| // +build intest | ||
|
|
||
| // Copyright 2026 PingCAP, Inc. |
There was a problem hiding this comment.
Fix copyright year.
The pipeline failed because the copyright year is 2026 but should be 2025 to match the project standard.
Proposed fix
-// Copyright 2026 PingCAP, Inc.
+// Copyright 2025 PingCAP, Inc.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Copyright 2026 PingCAP, Inc. | |
| // Copyright 2025 PingCAP, Inc. |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/storage-consumer/consumer_test.go` at line 4, Update the file's
top-of-file copyright header to use the correct year: replace "2026" with "2025"
in the copyright comment line at the top of consumer_test.go (the header comment
that currently reads "// Copyright 2026 PingCAP, Inc.").
| func (c *consumer) getStableCheckpointTs(ctx context.Context) (uint64, error) { | ||
| data, err := c.externalStorage.ReadFile(ctx, "metadata") | ||
| if err != nil { | ||
| if os.IsNotExist(errors.Cause(err)) { | ||
| return 0, nil | ||
| } | ||
| return 0, errors.Trace(err) | ||
| } | ||
|
|
||
| var metadata struct { | ||
| CheckpointTs uint64 `json:"checkpoint-ts"` | ||
| } | ||
| if err := json.Unmarshal(data, &metadata); err != nil { | ||
| return 0, errors.Trace(err) | ||
| } | ||
| return metadata.CheckpointTs, nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how other parts of the codebase handle file-not-found from external storage
rg -n "IsNotExist|FileNotExist|ErrNotExist" --type go -C3 | head -80Repository: pingcap/ticdc
Length of output: 4543
🏁 Script executed:
#!/bin/bash
# Check the ExternalStorage interface and ReadFile implementations
ast-grep --pattern $'func ($_) ReadFile($_, $_) ($_, error) {
$$$
}'Repository: pingcap/ticdc
Length of output: 684
🏁 Script executed:
cat -n pkg/util/external_storage.go | sed -n '360,400p'Repository: pingcap/ticdc
Length of output: 1404
🏁 Script executed:
# Find if there are cloud storage specific error checks in the codebase
rg -n "ErrNotFound|NoSuchKey|NotFound" --type go | head -30Repository: pingcap/ticdc
Length of output: 3595
Use util.IsNotExistInExtStorage() to handle cloud storage backends correctly.
The os.IsNotExist(errors.Cause(err)) check only detects local filesystem "file not found" errors. Cloud storage backends (S3, GCS, Azure) return their own error types that os.IsNotExist does not catch. This causes the consumer to fail with an error instead of gracefully handling a missing metadata file on initial startup.
Change line 241 to use util.IsNotExistInExtStorage(err), which properly handles AWS, Google Cloud, and Azure error types. This pattern is already used in pkg/redo/reader/reader.go:278 and downstreamadapter/sink/redo/meta.go:206.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/storage-consumer/consumer.go` around lines 238 - 254, In
getStableCheckpointTs, replace the os.IsNotExist(errors.Cause(err)) check with
util.IsNotExistInExtStorage(err) so missing metadata on cloud backends
(S3/GCS/Azure) is handled correctly; update the error handling branch in the
consumer.getStableCheckpointTs function and ensure the util package is imported
where this function is defined.
There was a problem hiding this comment.
Code Review
This pull request introduces a crucial correctness fix for the storage consumer by gating the application of DDL/DML files on a stable checkpoint timestamp read from storage metadata. This prevents applying changes that are not yet stably visible, fixing a potential data inconsistency issue with out-of-order processing. The implementation correctly defers processing of files with a tableVersion greater than the checkpoint and introduces a mechanism to track handled files to avoid reprocessing. The change also includes a valuable refactoring of the main package initialization to improve testability, and adds a comprehensive integration test that validates the new logic. The changes are well-implemented. I have one suggestion regarding logging verbosity.
| log.Info("storage consumer defer file key until checkpoint catches up", | ||
| zap.Uint64("round", round), | ||
| zap.Int("order", order), | ||
| zap.String("schema", key.Schema), | ||
| zap.String("table", key.Table), | ||
| zap.Uint64("tableVersion", key.TableVersion), | ||
| zap.Uint64("stableCheckpointTs", stableCheckpointTs), | ||
| zap.Int64("partition", key.PartitionNum), | ||
| zap.String("date", key.Date)) |
There was a problem hiding this comment.
This log.Info statement will be executed for every deferred file in every round. If a large number of files are deferred for an extended period (e.g., if the checkpoint is stuck), this could lead to excessive log volume. Consider changing this to log.Debug or logging a summary of deferred files per round to reduce log noise in normal operation.
|
@3AceShowHand: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #4430
What is changed and how it works?
cmd/storage-consumerused to apply whatever a scan round happened to discover.That is unsafe for cloud storage because storage visibility is only stable up to the
producer's
metadata.checkpoint-ts.In the failing
ddl_with_random_move_tablecase, the consumer applied:RENAME TABLE test.table_8 TO test.table_108RENAME TABLE test.table_108 TO test.table_8before a later scan round discovered an older DML file that still belonged to
table_108. That reordered apply across scan rounds and finally failed withTable 'test.table_108' doesn't exist.This PR fixes that by changing the consumer side only:
metadataevery round and treatcheckpoint-tsas the stable apply watermarktableVersionis greater than the stable checkpointstorage-consumerinitialization out ofinit()so the package can be tested normallyintestregression test that reproduces the rename -> DML -> rename-back visibility gap and verifies the final apply orderWith this change, the consumer waits until all files in the rename chain are
stably visible and then applies them in the correct order.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No protocol or producer behavior is changed. The consumer may defer applying a
newly discovered file until
metadata.checkpoint-tsreaches that table version,which is the intended correctness fence.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
Release Notes