From 5533375dda2b6d5ca1c2e58fa4753997498c6f2a Mon Sep 17 00:00:00 2001 From: Palash Chauhan
+ * Thread-safety: single-writer. The owning consumer thread is the only writer of + * {@link #recordProcessed} / {@link #recordEmptyPoll}; the same thread also reads via + * {@link #currentLagMs}. {@code volatile} fields provide safe publication for an off-thread + * observer (e.g. JMX scraper) that may read {@link #getEffectiveWatermark} or + * {@link #getLastEmptyPollEndTime}, but no off-thread writer is supported. Watermark is + * monotonic; lag is clamped to zero on clock regressions. + */ final class IndexCDCConsumerProgress { // Wall-clock time at consumer construction; floor for currentLagMs before any signal. @@ -27,7 +36,9 @@ final class IndexCDCConsumerProgress { // Latest CDC event timestamp this consumer has acknowledged for its own partition. private volatile long lastProcessedTimestamp = 0L; - // Wall-clock time at the end of the most recent empty own-partition CDC poll. + // Wall-clock time captured immediately before the most recent own-partition CDC poll that + // returned zero rows. Equals the upper bound of the poll's "no events exist below" proof: + // (lastEmptyPollEndTime - timestampBufferMs) is the latest CDC ts the consumer is caught up to. private volatile long lastEmptyPollEndTime = 0L; // Highest own-partition CDC timestamp the consumer is confirmed caught up to (monotonic). private volatile long effectiveWatermark = 0L; @@ -37,18 +48,20 @@ final class IndexCDCConsumerProgress { this.timestampBufferMs = timestampBufferMs; } - /** Record progress from a successful own-partition batch. */ - synchronized void recordProcessed(long ts) { + /** + * Record progress from a successful own-partition batch. Single-writer (consumer thread only). + */ + void recordProcessed(long ts) { if (ts > lastProcessedTimestamp) { lastProcessedTimestamp = ts; } advanceWatermark(); } - /** Record an own-partition CDC poll that returned zero rows. */ - synchronized void recordEmptyPoll(long pollEndWallClock) { - if (pollEndWallClock > lastEmptyPollEndTime) { - lastEmptyPollEndTime = pollEndWallClock; + /** Record an own-partition CDC poll that returned zero rows. Single-writer. */ + void recordEmptyPoll(long queryStartWallClock) { + if (queryStartWallClock > lastEmptyPollEndTime) { + lastEmptyPollEndTime = queryStartWallClock; } advanceWatermark(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java index 9ec03213711..71ce9a454d6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java @@ -51,7 +51,16 @@ * the consumer had no batch to process. Asserts only that the sampler keeps emitting during an idle * window — a binary check that's robust to JVM scheduler/GC jitter and to histogram snapshot * timing. Value-correctness of the watermark math is covered deterministically by - * {@code IndexCDCConsumerStateTest}. + * {@code IndexCDCConsumerProgressTest}. + *
+ * Integration branches NOT directly asserted here: the {@code !isParentReplay} suppression + * of {@code progress.recordProcessed} / {@code recordEmptyPoll}; the + * {@code maxDataVisibilityRetries} skip path; the resume-from-tracker watermark seed; the + * empty-poll query-start timestamp choice. Each is a simple guard at a well-defined call site, but + * a deterministic IT for any of them would need forced region splits, tracker corruption, or + * data-visibility mocks beyond what is justified for this regression IT. Existing + * {@code MultiTenantEventualIndexIT*} exercises split-and-replay end-to-end and would surface + * functional breakage in those branches. */ @Category(NeedsOwnMiniClusterTest.class) public class IndexCDCConsumerLagIT extends ParallelStatsDisabledIT { @@ -147,7 +156,7 @@ public void testLagMetricKeepsSamplingWhenIdle() throws Exception { // Binary flow check — pre-fix, delta during idle was 0 because the metric was only emitted // on non-empty batch completion. Post-fix, the sampler fires on a clock so at least one // sample must land in any non-trivial idle window. Numerical value-correctness of those - // samples is asserted deterministically in IndexCDCConsumerStateTest. + // samples is asserted deterministically in IndexCDCConsumerProgressTest. assertTrue("Sampler must continue emitting during idle; delta=" + delta, delta >= 1); } }