From 5533375dda2b6d5ca1c2e58fa4753997498c6f2a Mon Sep 17 00:00:00 2001 From: Palash Chauhan Date: Tue, 9 Jun 2026 14:35:46 -0700 Subject: [PATCH 1/2] PHOENIX-7884 : Refactor tracking of IndexCdcConsumer lag --- .../phoenix/hbase/index/IndexCDCConsumer.java | 90 +++++++---- .../hbase/index/IndexCDCConsumerProgress.java | 79 +++++++++ .../end2end/IndexCDCConsumerLagIT.java | 153 ++++++++++++++++++ .../index/IndexCDCConsumerProgressTest.java | 121 ++++++++++++++ 4 files changed, 415 insertions(+), 28 deletions(-) create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index 6c4ceab789f..ff4c87478dc 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -140,6 +140,15 @@ public class IndexCDCConsumer implements Runnable { "phoenix.index.cdc.consumer.parent.progress.pause.ms"; private static final long DEFAULT_PARENT_PROGRESS_PAUSE_MS = 15000; + /** + * Interval between {@code cdcIndexUpdateLag} samples emitted while the consumer is sleeping (idle + * poll, backoff, parent-progress wait, etc.). Clamped to at least 50ms. + */ + public static final String INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS = + "phoenix.index.cdc.consumer.lag.sample.interval.ms"; + private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 1000L; + private static final long MIN_LAG_SAMPLE_INTERVAL_MS = 50L; + private final RegionCoprocessorEnvironment env; private final String dataTableName; private final String encodedRegionName; @@ -154,6 +163,8 @@ public class IndexCDCConsumer implements Runnable { private final Configuration config; private final boolean serializeCDCMutations; private final MetricsIndexCDCConsumerSource metricSource; + private final long lagSampleIntervalMs; + private final IndexCDCConsumerProgress progress; private volatile boolean stopped = false; private Thread consumerThread; private boolean hasParentPartitions = false; @@ -228,7 +239,11 @@ public IndexCDCConsumer(RegionCoprocessorEnvironment env, String dataTableName, DEFAULT_MAX_DATA_VISIBILITY_RETRIES); this.parentProgressPauseMs = config.getLong(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, DEFAULT_PARENT_PROGRESS_PAUSE_MS); + this.lagSampleIntervalMs = Math.max(MIN_LAG_SAMPLE_INTERVAL_MS, + config.getLong(INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS, DEFAULT_LAG_SAMPLE_INTERVAL_MS)); this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexCDCConsumerSource(); + this.progress = new IndexCDCConsumerProgress(EnvironmentEdgeManager.currentTimeMillis(), + this.timestampBufferMs); DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); this.indexWriter = @@ -259,13 +274,21 @@ public void stop() { } /** - * Sleeps for the specified duration if the consumer has not been stopped. - * @param millis the duration to sleep in milliseconds. - * @throws InterruptedException if the thread is interrupted while sleeping. + * Sleeps for up to {@code totalMillis}, emitting a {@code cdcIndexUpdateLag} sample at the start + * of each {@code lagSampleIntervalMs} slice. Aborts immediately when stopped. Used for all + * consumer-thread sleeps so the lag metric stays non-silent across idle, failure, startup, and + * parent-replay phases. */ - private void sleepIfNotStopped(long millis) throws InterruptedException { - if (!stopped) { - Thread.sleep(millis); + private void sleepWithLagSampling(long totalMillis) throws InterruptedException { + long deadline = EnvironmentEdgeManager.currentTimeMillis() + totalMillis; + while (!stopped) { + long now = EnvironmentEdgeManager.currentTimeMillis(); + metricSource.updateCdcLag(dataTableName, progress.currentLagMs(now)); + long remaining = deadline - now; + if (remaining <= 0) { + return; + } + Thread.sleep(Math.min(remaining, lagSampleIntervalMs)); } } @@ -374,7 +397,7 @@ private byte[][] lookupPartitionKeys(String partitionId) throws InterruptedExcep "Error while retrieving partition keys from CDC_STREAM for partition {} table {}. " + "Retry #{}, sleeping {} ms before retrying...", partitionId, dataTableName, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } return null; @@ -405,7 +428,7 @@ private TenantScanInfo getPartitionTenantScanInfo(String partitionId) public void run() { try { if (startupDelayMs > 0 && getCDCStreamNumPartitions() <= 1) { - sleepIfNotStopped(startupDelayMs); + sleepWithLagSampling(startupDelayMs); } if (stopped) { return; @@ -438,13 +461,14 @@ public void run() { dataTableName, encodedRegionName); return; } else if (lastProcessedTimestamp > 0) { + progress.recordProcessed(lastProcessedTimestamp); LOG.info( "Found existing tracker entry for table {} region {} with lastTimestamp {}. " + "Resuming from last position (region movement scenario).", dataTableName, encodedRegionName, lastProcessedTimestamp); } else { if (hasParentPartitions) { - sleepIfNotStopped(timestampBufferMs + 1); + sleepWithLagSampling(timestampBufferMs + 1); replayAndCompleteParentRegions(encodedRegionName); } else { LOG.info("No parent partitions for table {} region {}, skipping parent replay", @@ -463,10 +487,10 @@ public void run() { lastProcessedTimestamp, false); } if (lastProcessedTimestamp == previousTimestamp) { - sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, ++retryCount)); + sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); } else { retryCount = 0; - sleepIfNotStopped(pollIntervalMs); + sleepWithLagSampling(pollIntervalMs); } } catch (Exception e) { if (e instanceof InterruptedException) { @@ -478,7 +502,7 @@ public void run() { "Error processing CDC mutations for table {} region {}. " + "Retry #{}, sleeping {} ms before retrying...", dataTableName, encodedRegionName, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } } catch (InterruptedException e) { @@ -518,7 +542,7 @@ private boolean hasEventuallyConsistentIndexes() throws InterruptedException { "Error checking for eventually consistent indexes for table {}. " + "Retry #{}, sleeping {} ms before retrying...", dataTableName, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } return false; @@ -559,7 +583,7 @@ private long getCDCStreamNumPartitions() throws InterruptedException { "Error getting CDC_STREAM row count for table {}. " + "Retry #{}, sleeping {} ms before retrying...", dataTableName, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } return -1; @@ -597,14 +621,14 @@ private boolean waitForCDCStreamEntry() throws InterruptedException { "CDC_STREAM entry not found for table {} partition {}. " + "Attempt #{}, sleeping {} ms before retrying...", dataTableName, encodedRegionName, retryCount, sleepTime); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } catch (SQLException e) { long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); LOG.warn( "Error checking CDC_STREAM for table {} partition {}. " + "Retry #{}, sleeping {} ms before retrying...", dataTableName, encodedRegionName, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } return false; @@ -662,7 +686,7 @@ private long checkTrackerStatus(String partitionId, String ownerPartitionId) "Error checking IDX_CDC_TRACKER for table {} partition {} owner {}. " + "Retry #{}, sleeping {} ms before retrying...", dataTableName, partitionId, ownerPartitionId, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } return 0; @@ -695,7 +719,7 @@ private boolean isPartitionCompleted(String partitionId) throws InterruptedExcep "Error checking if partition {} is completed for table {}. " + "Retry #{}, sleeping {} ms before retrying...", partitionId, dataTableName, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } return false; @@ -739,7 +763,7 @@ private long getParentProgress(String partitionId) throws InterruptedException { "Error getting parent progress for partition {} table {}. " + "Retry #{}, sleeping {} ms before retrying...", partitionId, dataTableName, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } throw new InterruptedException("IndexCDCConsumer stopped while getting parent progress."); @@ -778,7 +802,7 @@ private List getParentPartitionIds(String partitionId) throws Interrupte "Error querying parent partitions from CDC_STREAM for table {} partition {}. " + "Retry #{}, sleeping {} ms before retrying...", dataTableName, partitionId, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } return Collections.emptyList(); @@ -812,7 +836,7 @@ private void processPartitionToCompletion(String partitionId, String ownerPartit long previousOtherProgress; do { previousOtherProgress = otherProgress; - sleepIfNotStopped(parentProgressPauseMs); + sleepWithLagSampling(parentProgressPauseMs); if (isPartitionCompleted(partitionId)) { return; } @@ -856,7 +880,7 @@ private void processPartitionToCompletion(String partitionId, String ownerPartit + "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms", partitionId, ownerPartitionId, dataTableName, currentLastProcessedTimestamp, retryCount, sleepTime, e); - sleepIfNotStopped(sleepTime); + sleepWithLagSampling(sleepTime); } } LOG.info("Processing partition {} (owner {}) stopped before completion for table {}", @@ -952,11 +976,15 @@ private long processCDCBatch(String partitionId, String ownerPartitionId, if (hasMoreRows) { newLastTimestamp = result.getFirst(); if (batchMutations.isEmpty()) { - sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, ++retryCount)); + sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); } } } } + // Empty own-partition poll proves we are caught up to (now - timestampBufferMs). + if (!hasMoreRows && !isParentReplay) { + progress.recordEmptyPoll(EnvironmentEdgeManager.currentTimeMillis()); + } // With predefined LIMIT, there might be more rows with the same timestamp that were not // included in this batch. if (newLastTimestamp > lastProcessedTimestamp) { @@ -985,8 +1013,9 @@ private long processCDCBatch(String partitionId, String ownerPartitionId, metricSource.updateCdcBatchProcessTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - batchStartTime); metricSource.incrementCdcBatchCount(dataTableName); - metricSource.updateCdcLag(dataTableName, - EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp); + if (!isParentReplay) { + progress.recordProcessed(newLastTimestamp); + } updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp, PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS); } @@ -1072,11 +1101,15 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI // CDC index entries are written but the data is not yet visible. // Don't advance newLastTimestamp so the same events are re-fetched // once the data becomes visible. - sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, ++retryCount)); + sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount)); } } } } + // Empty own-partition poll proves we are caught up to (now - timestampBufferMs). + if (!hasMoreRows && !isParentReplay) { + progress.recordEmptyPoll(EnvironmentEdgeManager.currentTimeMillis()); + } if (newLastTimestamp > lastProcessedTimestamp) { String sameTimestampQuery = String.format( "SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " @@ -1106,8 +1139,9 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI metricSource.updateCdcBatchProcessTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - batchStartTime); metricSource.incrementCdcBatchCount(dataTableName); - metricSource.updateCdcLag(dataTableName, - EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp); + if (!isParentReplay) { + progress.recordProcessed(newLastTimestamp); + } } if (newLastTimestamp > lastProcessedTimestamp) { updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp, diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java new file mode 100644 index 00000000000..f90b7c35dcb --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index; + +/** Observable progress of an {@link IndexCDCConsumer}. */ +final class IndexCDCConsumerProgress { + + // Wall-clock time at consumer construction; floor for currentLagMs before any signal. + private final long consumerStartTime; + // Read-back buffer the consumer subtracts from `now` on its own-partition CDC filter. + private final long timestampBufferMs; + + // Latest CDC event timestamp this consumer has acknowledged for its own partition. + private volatile long lastProcessedTimestamp = 0L; + // Wall-clock time at the end of the most recent empty own-partition CDC poll. + private volatile long lastEmptyPollEndTime = 0L; + // Highest own-partition CDC timestamp the consumer is confirmed caught up to (monotonic). + private volatile long effectiveWatermark = 0L; + + IndexCDCConsumerProgress(long consumerStartTime, long timestampBufferMs) { + this.consumerStartTime = consumerStartTime; + this.timestampBufferMs = timestampBufferMs; + } + + /** Record progress from a successful own-partition batch. */ + synchronized void recordProcessed(long ts) { + if (ts > lastProcessedTimestamp) { + lastProcessedTimestamp = ts; + } + advanceWatermark(); + } + + /** Record an own-partition CDC poll that returned zero rows. */ + synchronized void recordEmptyPoll(long pollEndWallClock) { + if (pollEndWallClock > lastEmptyPollEndTime) { + lastEmptyPollEndTime = pollEndWallClock; + } + advanceWatermark(); + } + + /** Current lag in milliseconds. Floors at {@code now - consumerStartTime} before any signal. */ + long currentLagMs(long now) { + long base = effectiveWatermark > 0 ? effectiveWatermark : consumerStartTime; + long lag = now - base; + return lag < 0 ? 0 : lag; + } + + private void advanceWatermark() { + long emptyPollWatermark = + lastEmptyPollEndTime > 0 ? lastEmptyPollEndTime - timestampBufferMs : 0L; + long candidate = Math.max(lastProcessedTimestamp, emptyPollWatermark); + if (candidate > effectiveWatermark) { + effectiveWatermark = candidate; + } + } + + long getEffectiveWatermark() { + return effectiveWatermark; + } + + long getLastEmptyPollEndTime() { + return lastEmptyPollEndTime; + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java new file mode 100644 index 00000000000..9ec03213711 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_POLL_INTERVAL_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSource.CDC_INDEX_UPDATE_LAG; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Map; +import java.util.Properties; +import org.apache.hadoop.metrics2.MetricHistogram; +import org.apache.phoenix.coprocessor.PhoenixMasterObserver; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.hbase.index.metrics.MetricsIndexCDCConsumerSourceImpl; +import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Regression coverage for the pre-fix behavior where {@code cdcIndexUpdateLag} went silent whenever + * the consumer had no batch to process. Asserts only that the sampler keeps emitting during an idle + * window — a binary check that's robust to JVM scheduler/GC jitter and to histogram snapshot + * timing. Value-correctness of the watermark math is covered deterministically by + * {@code IndexCDCConsumerStateTest}. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class IndexCDCConsumerLagIT extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(IndexCDCConsumerLagIT.class); + + private static final int TIMESTAMP_BUFFER_MS = 2_000; + private static final int POLL_INTERVAL_MS = 500; + private static final int LAG_SAMPLE_INTERVAL_MS = 500; + // Small retry pause so empty-poll backoff doesn't dominate idle behavior. + private static final int RETRY_PAUSE_MS = 100; + // Budget for the consumer to start up and emit its first lag sample. Generous because the + // consumer waits up to INDEX_CDC_CONSUMER_STARTUP_DELAY_MS (default 10s) and then performs + // CDC_STREAM / IDX_CDC_TRACKER lookups before its first poll. Sized for slow CI / cold JVM. + private static final long CONSUMER_STARTUP_BUDGET_MS = 120_000L; + // Idle window for the flow check. Only need to prove ≥ 1 sample fires; kept short to keep + // total test runtime low. + private static final long IDLE_WAIT_MS = 5_000L; + private static final long MAX_LOOKBACK_AGE = 1_000_000L; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(10); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + Long.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, Boolean.TRUE.toString()); + props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(TIMESTAMP_BUFFER_MS)); + props.put(INDEX_CDC_CONSUMER_POLL_INTERVAL_MS, Integer.toString(POLL_INTERVAL_MS)); + props.put(INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS, Integer.toString(LAG_SAMPLE_INTERVAL_MS)); + props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(RETRY_PAUSE_MS)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + private Connection getConnection() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + return DriverManager.getConnection(getUrl(), props); + } + + private MetricHistogram lagHistogram() { + MetricsIndexCDCConsumerSourceImpl source = + (MetricsIndexCDCConsumerSourceImpl) MetricsIndexerSourceFactory.getInstance() + .getIndexCDCConsumerSource(); + return source.getMetricsRegistry().getHistogram(CDC_INDEX_UPDATE_LAG); + } + + /** + * Polls until the lag histogram has at least {@code minCount} samples, or fails after timeout. + */ + private void awaitMinCount(long minCount, long timeoutMs) throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMs; + long observed = 0L; + while (System.currentTimeMillis() < deadline) { + observed = lagHistogram().getCount(); + if (observed >= minCount) { + return; + } + Thread.sleep(500L); + } + fail("Lag histogram never reached count=" + minCount + " within " + timeoutMs + "ms; observed=" + + observed); + } + + @Test + public void testLagMetricKeepsSamplingWhenIdle() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + + try (Connection conn = getConnection()) { + conn.createStatement().execute("CREATE TABLE " + tableName + + " (PK VARCHAR NOT NULL PRIMARY KEY," + " V1 VARCHAR, V2 VARCHAR) COLUMN_ENCODED_BYTES=0"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL"); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " (PK, V1, V2) VALUES ('r1', 'v1', 'd1')"); + conn.commit(); + } + + // Wait for the consumer thread to start and emit its first lag sample. Replaces a fixed + // settle sleep so the test is robust to slow CI / cold JVM startup. + awaitMinCount(1L, CONSUMER_STARTUP_BUDGET_MS); + + long countBeforeIdle = lagHistogram().getCount(); + Thread.sleep(IDLE_WAIT_MS); + long countAfterIdle = lagHistogram().getCount(); + long delta = countAfterIdle - countBeforeIdle; + LOG.info("Idle window {}ms: countBefore={}, countAfter={}, delta={}", IDLE_WAIT_MS, + countBeforeIdle, countAfterIdle, delta); + + // Binary flow check — pre-fix, delta during idle was 0 because the metric was only emitted + // on non-empty batch completion. Post-fix, the sampler fires on a clock so at least one + // sample must land in any non-trivial idle window. Numerical value-correctness of those + // samples is asserted deterministically in IndexCDCConsumerStateTest. + assertTrue("Sampler must continue emitting during idle; delta=" + delta, delta >= 1); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java new file mode 100644 index 00000000000..3c87d8849ba --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgressTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class IndexCDCConsumerProgressTest { + + private static final long BUFFER_MS = 5_000L; + private static final long START_TIME = 1_000_000L; + + private IndexCDCConsumerProgress newProgress() { + return new IndexCDCConsumerProgress(START_TIME, BUFFER_MS); + } + + @Test + public void coldStartReportsLagSinceConsumerStart() { + IndexCDCConsumerProgress p = newProgress(); + assertEquals(100L, p.currentLagMs(START_TIME + 100L)); + assertEquals(0L, p.getEffectiveWatermark()); + } + + @Test + public void processedAdvancesWatermark() { + IndexCDCConsumerProgress p = newProgress(); + long processed = START_TIME + 10_000L; + p.recordProcessed(processed); + assertEquals(processed, p.getEffectiveWatermark()); + assertEquals(2_000L, p.currentLagMs(processed + 2_000L)); + } + + @Test + public void processedIsMonotonic() { + IndexCDCConsumerProgress p = newProgress(); + p.recordProcessed(START_TIME + 1_000L); + p.recordProcessed(START_TIME + 500L); + assertEquals(START_TIME + 1_000L, p.getEffectiveWatermark()); + } + + @Test + public void emptyPollAdvancesWatermarkBuffersBelowPollTime() { + IndexCDCConsumerProgress p = newProgress(); + long pollEnd = START_TIME + 20_000L; + p.recordEmptyPoll(pollEnd); + assertEquals(pollEnd - BUFFER_MS, p.getEffectiveWatermark()); + // Lag at the same instant collapses to the buffer baseline. + assertEquals(BUFFER_MS, p.currentLagMs(pollEnd)); + } + + @Test + public void emptyPollIsMonotonic() { + IndexCDCConsumerProgress p = newProgress(); + long firstPoll = START_TIME + 20_000L; + long earlierPoll = START_TIME + 10_000L; + p.recordEmptyPoll(firstPoll); + p.recordEmptyPoll(earlierPoll); + assertEquals(firstPoll, p.getLastEmptyPollEndTime()); + assertEquals(firstPoll - BUFFER_MS, p.getEffectiveWatermark()); + } + + @Test + public void watermarkIsMaxOfProcessedAndEmptyPollFloor() { + IndexCDCConsumerProgress p = newProgress(); + long processed = START_TIME + 50_000L; + long pollEnd = START_TIME + 30_000L; + p.recordProcessed(processed); + p.recordEmptyPoll(pollEnd); + // processed dominates because (pollEnd - BUFFER_MS) < processed + assertEquals(processed, p.getEffectiveWatermark()); + + // A later empty poll above (processed + BUFFER_MS) advances the watermark again. + long laterPoll = processed + BUFFER_MS + 1_000L; + p.recordEmptyPoll(laterPoll); + assertEquals(laterPoll - BUFFER_MS, p.getEffectiveWatermark()); + } + + @Test + public void idleAfterEmptyPollStaysBoundedByBufferPlusElapsed() { + IndexCDCConsumerProgress p = newProgress(); + long pollEnd = START_TIME + 20_000L; + p.recordEmptyPoll(pollEnd); + long elapsed = 7_500L; + assertEquals(BUFFER_MS + elapsed, p.currentLagMs(pollEnd + elapsed)); + } + + @Test + public void negativeLagClampedToZero() { + IndexCDCConsumerProgress p = newProgress(); + long processed = START_TIME + 50_000L; + p.recordProcessed(processed); + // clock went backwards relative to the watermark + assertEquals(0L, p.currentLagMs(processed - 100L)); + } + + @Test + public void emptyPollOlderThanBufferContributesNothing() { + IndexCDCConsumerProgress p = newProgress(); + long pollEnd = BUFFER_MS - 1L; + p.recordEmptyPoll(pollEnd); + // pollEnd - BUFFER_MS would be negative; do not pollute the watermark. + assertEquals(pollEnd, p.getLastEmptyPollEndTime()); + assertEquals(0L, p.getEffectiveWatermark()); + } +} From dd4f37db20ef601d0c338532f0dd5ebca6460978 Mon Sep 17 00:00:00 2001 From: Palash Chauhan Date: Tue, 9 Jun 2026 16:08:42 -0700 Subject: [PATCH 2/2] changes --- .../MetricsIndexCDCConsumerSource.java | 6 ++- .../phoenix/hbase/index/IndexCDCConsumer.java | 37 +++++++++++++++---- .../hbase/index/IndexCDCConsumerProgress.java | 29 +++++++++++---- .../end2end/IndexCDCConsumerLagIT.java | 13 ++++++- 4 files changed, 66 insertions(+), 19 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java index c278c1fa4ef..cb39b5b38a8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexCDCConsumerSource.java @@ -52,7 +52,11 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource { String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag"; String CDC_INDEX_UPDATE_LAG_DESC = - "Histogram for the lag in milliseconds between current time and the last processed CDC event"; + "Histogram of current time minus the consumer's effective freshness watermark, in " + + "milliseconds. The watermark advances on successful own-partition batches AND on empty " + + "polls (which prove caught-up to queryStart - timestampBufferMs). Idle steady state is " + + "≈ timestampBufferMs; grows during sustained failure, parent-region replay, or cold " + + "start (where it is floored at now - consumerStartTime)."; /** * Updates the CDC batch processing time histogram. diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index ff4c87478dc..f10fd304d14 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -165,6 +165,10 @@ public class IndexCDCConsumer implements Runnable { private final MetricsIndexCDCConsumerSource metricSource; private final long lagSampleIntervalMs; private final IndexCDCConsumerProgress progress; + // Flipped true once hasEventuallyConsistentIndexes() confirms this region actually has an EC + // index. Until then sleepWithLagSampling does not emit, so tables that immediately exit "no EC + // index" produce no cold-start lag samples into the global / per-table histograms. + private volatile boolean lagEmissionEnabled = false; private volatile boolean stopped = false; private Thread consumerThread; private boolean hasParentPartitions = false; @@ -275,15 +279,17 @@ public void stop() { /** * Sleeps for up to {@code totalMillis}, emitting a {@code cdcIndexUpdateLag} sample at the start - * of each {@code lagSampleIntervalMs} slice. Aborts immediately when stopped. Used for all - * consumer-thread sleeps so the lag metric stays non-silent across idle, failure, startup, and - * parent-replay phases. + * of each {@code lagSampleIntervalMs} slice once {@link #lagEmissionEnabled} is set. Aborts + * immediately when stopped. Used for all consumer-thread sleeps so the lag metric stays + * non-silent across idle, failure, post-startup, and parent-replay phases. */ private void sleepWithLagSampling(long totalMillis) throws InterruptedException { long deadline = EnvironmentEdgeManager.currentTimeMillis() + totalMillis; while (!stopped) { long now = EnvironmentEdgeManager.currentTimeMillis(); - metricSource.updateCdcLag(dataTableName, progress.currentLagMs(now)); + if (lagEmissionEnabled) { + metricSource.updateCdcLag(dataTableName, progress.currentLagMs(now)); + } long remaining = deadline - now; if (remaining <= 0) { return; @@ -438,6 +444,9 @@ public void run() { dataTableName); return; } + // Only enable lag sampling once we've confirmed this table actually has an EC index, + // so non-EC-indexed tables don't pollute the lag histograms with cold-start samples. + lagEmissionEnabled = true; LOG.info( "IndexCDCConsumer started for table {} region {}" + " [batchSize: {}, pollIntervalMs: {}, timestampBufferMs: {}, startupDelayMs: {}," @@ -967,7 +976,11 @@ private long processCDCBatch(String partitionId, String ownerPartitionId, long newLastTimestamp = lastProcessedTimestamp; boolean hasMoreRows = true; int retryCount = 0; + // Captured immediately before each query so the empty-poll watermark cannot over-advance + // past what the query's own (now - timestampBufferMs) upper bound actually proved empty. + long lastQueryStartTime = newLastTimestamp; while (hasMoreRows && batchMutations.isEmpty()) { + lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis(); try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) { setStatementParams(scanInfo, partitionId, isParentReplay, newLastTimestamp, ps); Pair result = @@ -981,9 +994,9 @@ private long processCDCBatch(String partitionId, String ownerPartitionId, } } } - // Empty own-partition poll proves we are caught up to (now - timestampBufferMs). + // Empty own-partition poll proves we are caught up to (queryStart - timestampBufferMs). if (!hasMoreRows && !isParentReplay) { - progress.recordEmptyPoll(EnvironmentEdgeManager.currentTimeMillis()); + progress.recordEmptyPoll(lastQueryStartTime); } // With predefined LIMIT, there might be more rows with the same timestamp that were not // included in this batch. @@ -1081,7 +1094,11 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI long[] lastScannedTimestamp = { lastProcessedTimestamp }; boolean hasMoreRows = true; int retryCount = 0; + // Captured immediately before each query so the empty-poll watermark cannot over-advance + // past what the query's own (now - timestampBufferMs) upper bound actually proved empty. + long lastQueryStartTime = newLastTimestamp; while (hasMoreRows && batchStates.isEmpty()) { + lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis(); try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) { setStatementParams(scanInfo, partitionId, isParentReplay, newLastTimestamp, ps); Pair result = @@ -1096,6 +1113,10 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI + " to {} after {} retries — data table mutations may have failed", dataTableName, partitionId, newLastTimestamp, lastScannedTimestamp[0], retryCount); newLastTimestamp = lastScannedTimestamp[0]; + // NOTE: durable tracker advances below (newLastTimestamp > lastProcessedTimestamp) + // but progress.recordProcessed is skipped (batchStates.isEmpty()). The in-memory + // watermark lags durable state until the next empty poll heals it — over-reports + // lag temporarily (safe direction, self-healing). break; } else { // CDC index entries are written but the data is not yet visible. @@ -1106,9 +1127,9 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI } } } - // Empty own-partition poll proves we are caught up to (now - timestampBufferMs). + // Empty own-partition poll proves we are caught up to (queryStart - timestampBufferMs). if (!hasMoreRows && !isParentReplay) { - progress.recordEmptyPoll(EnvironmentEdgeManager.currentTimeMillis()); + progress.recordEmptyPoll(lastQueryStartTime); } if (newLastTimestamp > lastProcessedTimestamp) { String sameTimestampQuery = String.format( diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java index f90b7c35dcb..9c6e452a9d4 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumerProgress.java @@ -17,7 +17,16 @@ */ package org.apache.phoenix.hbase.index; -/** Observable progress of an {@link IndexCDCConsumer}. */ +/** + * Observable progress of an {@link IndexCDCConsumer}. + *

+ * Thread-safety: single-writer. The owning consumer thread is the only writer of + * {@link #recordProcessed} / {@link #recordEmptyPoll}; the same thread also reads via + * {@link #currentLagMs}. {@code volatile} fields provide safe publication for an off-thread + * observer (e.g. JMX scraper) that may read {@link #getEffectiveWatermark} or + * {@link #getLastEmptyPollEndTime}, but no off-thread writer is supported. Watermark is + * monotonic; lag is clamped to zero on clock regressions. + */ final class IndexCDCConsumerProgress { // Wall-clock time at consumer construction; floor for currentLagMs before any signal. @@ -27,7 +36,9 @@ final class IndexCDCConsumerProgress { // Latest CDC event timestamp this consumer has acknowledged for its own partition. private volatile long lastProcessedTimestamp = 0L; - // Wall-clock time at the end of the most recent empty own-partition CDC poll. + // Wall-clock time captured immediately before the most recent own-partition CDC poll that + // returned zero rows. Equals the upper bound of the poll's "no events exist below" proof: + // (lastEmptyPollEndTime - timestampBufferMs) is the latest CDC ts the consumer is caught up to. private volatile long lastEmptyPollEndTime = 0L; // Highest own-partition CDC timestamp the consumer is confirmed caught up to (monotonic). private volatile long effectiveWatermark = 0L; @@ -37,18 +48,20 @@ final class IndexCDCConsumerProgress { this.timestampBufferMs = timestampBufferMs; } - /** Record progress from a successful own-partition batch. */ - synchronized void recordProcessed(long ts) { + /** + * Record progress from a successful own-partition batch. Single-writer (consumer thread only). + */ + void recordProcessed(long ts) { if (ts > lastProcessedTimestamp) { lastProcessedTimestamp = ts; } advanceWatermark(); } - /** Record an own-partition CDC poll that returned zero rows. */ - synchronized void recordEmptyPoll(long pollEndWallClock) { - if (pollEndWallClock > lastEmptyPollEndTime) { - lastEmptyPollEndTime = pollEndWallClock; + /** Record an own-partition CDC poll that returned zero rows. Single-writer. */ + void recordEmptyPoll(long queryStartWallClock) { + if (queryStartWallClock > lastEmptyPollEndTime) { + lastEmptyPollEndTime = queryStartWallClock; } advanceWatermark(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java index 9ec03213711..71ce9a454d6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexCDCConsumerLagIT.java @@ -51,7 +51,16 @@ * the consumer had no batch to process. Asserts only that the sampler keeps emitting during an idle * window — a binary check that's robust to JVM scheduler/GC jitter and to histogram snapshot * timing. Value-correctness of the watermark math is covered deterministically by - * {@code IndexCDCConsumerStateTest}. + * {@code IndexCDCConsumerProgressTest}. + *

+ * Integration branches NOT directly asserted here: the {@code !isParentReplay} suppression + * of {@code progress.recordProcessed} / {@code recordEmptyPoll}; the + * {@code maxDataVisibilityRetries} skip path; the resume-from-tracker watermark seed; the + * empty-poll query-start timestamp choice. Each is a simple guard at a well-defined call site, but + * a deterministic IT for any of them would need forced region splits, tracker corruption, or + * data-visibility mocks beyond what is justified for this regression IT. Existing + * {@code MultiTenantEventualIndexIT*} exercises split-and-replay end-to-end and would surface + * functional breakage in those branches. */ @Category(NeedsOwnMiniClusterTest.class) public class IndexCDCConsumerLagIT extends ParallelStatsDisabledIT { @@ -147,7 +156,7 @@ public void testLagMetricKeepsSamplingWhenIdle() throws Exception { // Binary flow check — pre-fix, delta during idle was 0 because the metric was only emitted // on non-empty batch completion. Post-fix, the sampler fires on a clock so at least one // sample must land in any non-trivial idle window. Numerical value-correctness of those - // samples is asserted deterministically in IndexCDCConsumerStateTest. + // samples is asserted deterministically in IndexCDCConsumerProgressTest. assertTrue("Sampler must continue emitting during idle; delta=" + delta, delta >= 1); } }