Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public interface MetricsIndexCDCConsumerSource extends BaseSource {

String CDC_INDEX_UPDATE_LAG = "cdcIndexUpdateLag";
String CDC_INDEX_UPDATE_LAG_DESC =
"Histogram for the lag in milliseconds between current time and the last processed CDC event";
"Histogram of current time minus the consumer's effective freshness watermark, in "
+ "milliseconds. The watermark advances on successful own-partition batches AND on empty "
+ "polls (which prove caught-up to queryStart - timestampBufferMs). Idle steady state is "
+ "≈ timestampBufferMs; grows during sustained failure, parent-region replay, or cold "
+ "start (where it is floored at now - consumerStartTime).";

/**
* Updates the CDC batch processing time histogram.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ public class IndexCDCConsumer implements Runnable {
"phoenix.index.cdc.consumer.parent.progress.pause.ms";
private static final long DEFAULT_PARENT_PROGRESS_PAUSE_MS = 15000;

/**
* Interval between {@code cdcIndexUpdateLag} samples emitted while the consumer is sleeping (idle
* poll, backoff, parent-progress wait, etc.). Clamped to at least 50ms.
*/
public static final String INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS =
"phoenix.index.cdc.consumer.lag.sample.interval.ms";
private static final long DEFAULT_LAG_SAMPLE_INTERVAL_MS = 1000L;
private static final long MIN_LAG_SAMPLE_INTERVAL_MS = 50L;

private final RegionCoprocessorEnvironment env;
private final String dataTableName;
private final String encodedRegionName;
Expand All @@ -154,6 +163,12 @@ public class IndexCDCConsumer implements Runnable {
private final Configuration config;
private final boolean serializeCDCMutations;
private final MetricsIndexCDCConsumerSource metricSource;
private final long lagSampleIntervalMs;
private final IndexCDCConsumerProgress progress;
// Flipped true once hasEventuallyConsistentIndexes() confirms this region actually has an EC
// index. Until then sleepWithLagSampling does not emit, so tables that immediately exit "no EC
// index" produce no cold-start lag samples into the global / per-table histograms.
private volatile boolean lagEmissionEnabled = false;
private volatile boolean stopped = false;
private Thread consumerThread;
private boolean hasParentPartitions = false;
Expand Down Expand Up @@ -228,7 +243,11 @@ public IndexCDCConsumer(RegionCoprocessorEnvironment env, String dataTableName,
DEFAULT_MAX_DATA_VISIBILITY_RETRIES);
this.parentProgressPauseMs =
config.getLong(INDEX_CDC_CONSUMER_PARENT_PROGRESS_PAUSE_MS, DEFAULT_PARENT_PROGRESS_PAUSE_MS);
this.lagSampleIntervalMs = Math.max(MIN_LAG_SAMPLE_INTERVAL_MS,
config.getLong(INDEX_CDC_CONSUMER_LAG_SAMPLE_INTERVAL_MS, DEFAULT_LAG_SAMPLE_INTERVAL_MS));
this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexCDCConsumerSource();
this.progress = new IndexCDCConsumerProgress(EnvironmentEdgeManager.currentTimeMillis(),
this.timestampBufferMs);
DelegateRegionCoprocessorEnvironment indexWriterEnv =
new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION);
this.indexWriter =
Expand Down Expand Up @@ -259,13 +278,23 @@ public void stop() {
}

/**
* Sleeps for the specified duration if the consumer has not been stopped.
* @param millis the duration to sleep in milliseconds.
* @throws InterruptedException if the thread is interrupted while sleeping.
* Sleeps for up to {@code totalMillis}, emitting a {@code cdcIndexUpdateLag} sample at the start
* of each {@code lagSampleIntervalMs} slice once {@link #lagEmissionEnabled} is set. Aborts
* immediately when stopped. Used for all consumer-thread sleeps so the lag metric stays
* non-silent across idle, failure, post-startup, and parent-replay phases.
*/
private void sleepIfNotStopped(long millis) throws InterruptedException {
if (!stopped) {
Thread.sleep(millis);
private void sleepWithLagSampling(long totalMillis) throws InterruptedException {
long deadline = EnvironmentEdgeManager.currentTimeMillis() + totalMillis;
while (!stopped) {
long now = EnvironmentEdgeManager.currentTimeMillis();
if (lagEmissionEnabled) {
metricSource.updateCdcLag(dataTableName, progress.currentLagMs(now));
}
long remaining = deadline - now;
if (remaining <= 0) {
return;
}
Thread.sleep(Math.min(remaining, lagSampleIntervalMs));
}
}

Expand Down Expand Up @@ -374,7 +403,7 @@ private byte[][] lookupPartitionKeys(String partitionId) throws InterruptedExcep
"Error while retrieving partition keys from CDC_STREAM for partition {} table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
partitionId, dataTableName, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
return null;
Expand Down Expand Up @@ -405,7 +434,7 @@ private TenantScanInfo getPartitionTenantScanInfo(String partitionId)
public void run() {
try {
if (startupDelayMs > 0 && getCDCStreamNumPartitions() <= 1) {
sleepIfNotStopped(startupDelayMs);
sleepWithLagSampling(startupDelayMs);
}
if (stopped) {
return;
Expand All @@ -415,6 +444,9 @@ public void run() {
dataTableName);
return;
}
// Only enable lag sampling once we've confirmed this table actually has an EC index,
// so non-EC-indexed tables don't pollute the lag histograms with cold-start samples.
lagEmissionEnabled = true;
LOG.info(
"IndexCDCConsumer started for table {} region {}"
+ " [batchSize: {}, pollIntervalMs: {}, timestampBufferMs: {}, startupDelayMs: {},"
Expand All @@ -438,13 +470,14 @@ public void run() {
dataTableName, encodedRegionName);
return;
} else if (lastProcessedTimestamp > 0) {
progress.recordProcessed(lastProcessedTimestamp);
LOG.info(
"Found existing tracker entry for table {} region {} with lastTimestamp {}. "
+ "Resuming from last position (region movement scenario).",
dataTableName, encodedRegionName, lastProcessedTimestamp);
} else {
if (hasParentPartitions) {
sleepIfNotStopped(timestampBufferMs + 1);
sleepWithLagSampling(timestampBufferMs + 1);
replayAndCompleteParentRegions(encodedRegionName);
} else {
LOG.info("No parent partitions for table {} region {}, skipping parent replay",
Expand All @@ -463,10 +496,10 @@ public void run() {
lastProcessedTimestamp, false);
}
if (lastProcessedTimestamp == previousTimestamp) {
sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, ++retryCount));
sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount));
} else {
retryCount = 0;
sleepIfNotStopped(pollIntervalMs);
sleepWithLagSampling(pollIntervalMs);
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
Expand All @@ -478,7 +511,7 @@ public void run() {
"Error processing CDC mutations for table {} region {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, encodedRegionName, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
} catch (InterruptedException e) {
Expand Down Expand Up @@ -518,7 +551,7 @@ private boolean hasEventuallyConsistentIndexes() throws InterruptedException {
"Error checking for eventually consistent indexes for table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
return false;
Expand Down Expand Up @@ -559,7 +592,7 @@ private long getCDCStreamNumPartitions() throws InterruptedException {
"Error getting CDC_STREAM row count for table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
return -1;
Expand Down Expand Up @@ -597,14 +630,14 @@ private boolean waitForCDCStreamEntry() throws InterruptedException {
"CDC_STREAM entry not found for table {} partition {}. "
+ "Attempt #{}, sleeping {} ms before retrying...",
dataTableName, encodedRegionName, retryCount, sleepTime);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
} catch (SQLException e) {
long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
LOG.warn(
"Error checking CDC_STREAM for table {} partition {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, encodedRegionName, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
return false;
Expand Down Expand Up @@ -662,7 +695,7 @@ private long checkTrackerStatus(String partitionId, String ownerPartitionId)
"Error checking IDX_CDC_TRACKER for table {} partition {} owner {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, partitionId, ownerPartitionId, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
return 0;
Expand Down Expand Up @@ -695,7 +728,7 @@ private boolean isPartitionCompleted(String partitionId) throws InterruptedExcep
"Error checking if partition {} is completed for table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
partitionId, dataTableName, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
return false;
Expand Down Expand Up @@ -739,7 +772,7 @@ private long getParentProgress(String partitionId) throws InterruptedException {
"Error getting parent progress for partition {} table {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
partitionId, dataTableName, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
throw new InterruptedException("IndexCDCConsumer stopped while getting parent progress.");
Expand Down Expand Up @@ -778,7 +811,7 @@ private List<String> getParentPartitionIds(String partitionId) throws Interrupte
"Error querying parent partitions from CDC_STREAM for table {} partition {}. "
+ "Retry #{}, sleeping {} ms before retrying...",
dataTableName, partitionId, retryCount, sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
return Collections.emptyList();
Expand Down Expand Up @@ -812,7 +845,7 @@ private void processPartitionToCompletion(String partitionId, String ownerPartit
long previousOtherProgress;
do {
previousOtherProgress = otherProgress;
sleepIfNotStopped(parentProgressPauseMs);
sleepWithLagSampling(parentProgressPauseMs);
if (isPartitionCompleted(partitionId)) {
return;
}
Expand Down Expand Up @@ -856,7 +889,7 @@ private void processPartitionToCompletion(String partitionId, String ownerPartit
+ "lastProcessedTimestamp {}. Retry #{}, sleeping {} ms",
partitionId, ownerPartitionId, dataTableName, currentLastProcessedTimestamp, retryCount,
sleepTime, e);
sleepIfNotStopped(sleepTime);
sleepWithLagSampling(sleepTime);
}
}
LOG.info("Processing partition {} (owner {}) stopped before completion for table {}",
Expand Down Expand Up @@ -943,7 +976,11 @@ private long processCDCBatch(String partitionId, String ownerPartitionId,
long newLastTimestamp = lastProcessedTimestamp;
boolean hasMoreRows = true;
int retryCount = 0;
// Captured immediately before each query so the empty-poll watermark cannot over-advance
// past what the query's own (now - timestampBufferMs) upper bound actually proved empty.
long lastQueryStartTime = newLastTimestamp;
while (hasMoreRows && batchMutations.isEmpty()) {
lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis();
try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
setStatementParams(scanInfo, partitionId, isParentReplay, newLastTimestamp, ps);
Pair<Long, Boolean> result =
Expand All @@ -952,11 +989,15 @@ private long processCDCBatch(String partitionId, String ownerPartitionId,
if (hasMoreRows) {
newLastTimestamp = result.getFirst();
if (batchMutations.isEmpty()) {
sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, ++retryCount));
sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount));
}
}
}
}
// Empty own-partition poll proves we are caught up to (queryStart - timestampBufferMs).
if (!hasMoreRows && !isParentReplay) {
progress.recordEmptyPoll(lastQueryStartTime);
}
// With predefined LIMIT, there might be more rows with the same timestamp that were not
// included in this batch.
if (newLastTimestamp > lastProcessedTimestamp) {
Expand Down Expand Up @@ -985,8 +1026,9 @@ private long processCDCBatch(String partitionId, String ownerPartitionId,
metricSource.updateCdcBatchProcessTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
metricSource.incrementCdcBatchCount(dataTableName);
metricSource.updateCdcLag(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp);
if (!isParentReplay) {
progress.recordProcessed(newLastTimestamp);
}
updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
}
Expand Down Expand Up @@ -1052,7 +1094,11 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI
long[] lastScannedTimestamp = { lastProcessedTimestamp };
boolean hasMoreRows = true;
int retryCount = 0;
// Captured immediately before each query so the empty-poll watermark cannot over-advance
// past what the query's own (now - timestampBufferMs) upper bound actually proved empty.
long lastQueryStartTime = newLastTimestamp;
while (hasMoreRows && batchStates.isEmpty()) {
lastQueryStartTime = EnvironmentEdgeManager.currentTimeMillis();
try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
setStatementParams(scanInfo, partitionId, isParentReplay, newLastTimestamp, ps);
Pair<Long, Boolean> result =
Expand All @@ -1067,16 +1113,24 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI
+ " to {} after {} retries — data table mutations may have failed",
dataTableName, partitionId, newLastTimestamp, lastScannedTimestamp[0], retryCount);
newLastTimestamp = lastScannedTimestamp[0];
// NOTE: durable tracker advances below (newLastTimestamp > lastProcessedTimestamp)
// but progress.recordProcessed is skipped (batchStates.isEmpty()). The in-memory
// watermark lags durable state until the next empty poll heals it — over-reports
// lag temporarily (safe direction, self-healing).
break;
} else {
// CDC index entries are written but the data is not yet visible.
// Don't advance newLastTimestamp so the same events are re-fetched
// once the data becomes visible.
sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, ++retryCount));
sleepWithLagSampling(ConnectionUtils.getPauseTime(pause, ++retryCount));
}
}
}
}
// Empty own-partition poll proves we are caught up to (queryStart - timestampBufferMs).
if (!hasMoreRows && !isParentReplay) {
progress.recordEmptyPoll(lastQueryStartTime);
}
if (newLastTimestamp > lastProcessedTimestamp) {
String sameTimestampQuery = String.format(
"SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
Expand Down Expand Up @@ -1106,8 +1160,9 @@ private long processCDCBatchGenerated(String partitionId, String ownerPartitionI
metricSource.updateCdcBatchProcessTime(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - batchStartTime);
metricSource.incrementCdcBatchCount(dataTableName);
metricSource.updateCdcLag(dataTableName,
EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp);
if (!isParentReplay) {
progress.recordProcessed(newLastTimestamp);
}
}
if (newLastTimestamp > lastProcessedTimestamp) {
updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp,
Expand Down
Loading