Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
*/
package org.apache.phoenix.jdbc;

import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable;
import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isStaleClusterRoleRecordExceptionExistsInThrowable;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_DURATION_MS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_MUTATION_BLOCKED_COUNT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_STALE_CRR_DETECTED_COUNT;

import java.sql.Array;
import java.sql.Blob;
Expand Down Expand Up @@ -171,55 +175,63 @@ void failover(long timeoutMs) throws SQLException {
return;
}

PhoenixConnection newConn = null;
SQLException cause = null;
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs) {
try {
newConn =
context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo());
} catch (SQLException e) {
cause = e;
LOG.info("Got exception when trying to connect to active cluster.", e);
final long failoverStartMs = EnvironmentEdgeManager.currentTimeMillis();
try {
PhoenixConnection newConn = null;
SQLException cause = null;
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (
newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs
) {
try {
Thread.sleep(100); // TODO: be smart than this
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new SQLException("Got interrupted waiting for connection failover", e);
newConn =
context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo());
} catch (SQLException e) {
cause = e;
LOG.info("Got exception when trying to connect to active cluster.", e);
try {
Thread.sleep(100); // TODO: be smart than this
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new SQLException("Got interrupted waiting for connection failover", e);
}
}
}
}
if (newConn == null) {
throw new FailoverSQLException("Can not failover connection",
context.getHAGroup().getGroupInfo().toString(), cause);
}
if (newConn == null) {
throw new FailoverSQLException("Can not failover connection",
context.getHAGroup().getGroupInfo().toString(), cause);
}

final PhoenixConnection oldConn = connection;
connection = newConn;
if (oldConn != null) {
// aggregate metrics
previousMutationMetrics = oldConn.getMutationMetrics();
previousReadMetrics = oldConn.getReadMetrics();
oldConn.clearMetrics();

// close old connection
if (!oldConn.isClosed()) {
// TODO: what happens to in-flight edits/mutations?
// Can we copy into the new connection we do not allow this failover?
// MutationState state = oldConn.getMutationState();
try {
oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
.setMessage("Phoenix connection got closed due to failover")
.setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build()
.buildException());
} catch (SQLException e) {
LOG.error("Failed to close old connection after failover: {}", e.getMessage());
LOG.info("Full stack when closing old connection after failover", e);
final PhoenixConnection oldConn = connection;
connection = newConn;
if (oldConn != null) {
// aggregate metrics
previousMutationMetrics = oldConn.getMutationMetrics();
previousReadMetrics = oldConn.getReadMetrics();
oldConn.clearMetrics();

// close old connection
if (!oldConn.isClosed()) {
// TODO: what happens to in-flight edits/mutations?
// Can we copy into the new connection we do not allow this failover?
// MutationState state = oldConn.getMutationState();
try {
oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER)
.setMessage("Phoenix connection got closed due to failover")
.setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build()
.buildException());
} catch (SQLException e) {
LOG.error("Failed to close old connection after failover: {}", e.getMessage());
LOG.info("Full stack when closing old connection after failover", e);
}
}
}
LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(),
connection.getURL());
} finally {
GLOBAL_HA_FAILOVER_DURATION_MS
.update(EnvironmentEdgeManager.currentTimeMillis() - failoverStartMs);
}
LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(),
connection.getURL());
}

/**
Expand Down Expand Up @@ -324,6 +336,7 @@ <T> T wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLExceptio
return s.get();
} catch (Exception e) {
if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) {
GLOBAL_HA_STALE_CRR_DETECTED_COUNT.increment();
// If we receive StaleClusterRoleRecordException, that means Operation was
// supposed to be executed on Active Cluster but was in reality was sent to
// STANDBY Cluster, that can happen only when Failover is in Progress, So we
Expand All @@ -348,6 +361,9 @@ <T> T wrapActionDuringFailover(SupplierWithSQLException<T> s) throws SQLExceptio
context.getHAGroup(), e))
.build().buildException();
}
if (isMutationBlockedIOExceptionExistsInThrowable(e)) {
GLOBAL_HA_MUTATION_BLOCKED_COUNT.increment();
}
if (policy.shouldFailover(e, ++failoverCount)) {
failover(timeoutMs);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.phoenix.jdbc;

import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_REFRESH_COUNT;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_COUNT;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;

Expand Down Expand Up @@ -629,6 +632,7 @@ public void init() throws IOException, SQLException {

LOG.info("Initial cluster role for HA group {} is {}", info, roleRecordFromEndpoint);
roleRecord = roleRecordFromEndpoint;
lastClusterRoleRecordRefreshTime = System.currentTimeMillis();
state = State.READY;
}

Expand All @@ -644,6 +648,11 @@ public Connection connect(Properties properties, HAURLInfo haurlInfo) throws SQL
.setMessage("HA group is not ready!").setHaGroupInfo(info.toString()).build()
.buildException();
}
// GAUGE: most-recent-sample of "milliseconds since the last successful CRR refresh".
// Use set(...) to overwrite the prior sample; do NOT increment/update — that would turn
// the gauge into an accumulator and break "current age" semantics.
GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric()
.set(System.currentTimeMillis() - lastClusterRoleRecordRefreshTime);
return roleRecord.getPolicy().provide(this, properties, haurlInfo);
}

Expand Down Expand Up @@ -1035,6 +1044,7 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException
* @throws SQLException if there is an error getting the ClusterRoleRecord
*/
public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLException {
GLOBAL_HA_CRR_REFRESH_COUNT.increment();
// Allow concurrent reads to return fast in case refresh is not needed
readLock.lock();
try {
Expand All @@ -1057,6 +1067,8 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio

ClusterRoleRecord newRoleRecord = getClusterRoleRecordFromEndpoint();
if (roleRecord == null) {
// First-load init path: no prior cache state to compare against, so this is not a
// failover transition and HA_FAILOVER_COUNT is intentionally NOT incremented here.
roleRecord = newRoleRecord;
lastClusterRoleRecordRefreshTime = System.currentTimeMillis();
state = State.READY;
Expand Down Expand Up @@ -1124,6 +1136,16 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio
// The goal here is to gain higher availability even though existing resources against
// previous ACTIVE cluster may have not been closed cleanly.
}
// Count the transition as a failover only when an active cluster is established
// or moves between peers. Operator-driven transitions to a no-active state
// (both clusters STANDBY) are not counted as failovers; recovery from no-active
// back to having an ACTIVE peer is counted.
if (
!oldRecord.getActiveUrl().equals(newRoleRecord.getActiveUrl())
&& newRoleRecord.getActiveUrl().isPresent()
) {
GLOBAL_HA_FAILOVER_COUNT.increment();
}
// Update the role record and the last refresh time
roleRecord = newRoleRecord;
lastClusterRoleRecordRefreshTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.phoenix.jdbc;

import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.phoenix.exception.MutationBlockedIOException;
import org.apache.phoenix.exception.StaleClusterRoleRecordException;

/**
Expand Down Expand Up @@ -45,7 +46,40 @@ public static boolean isStaleClusterRoleRecordExceptionExistsInThrowable(Throwab
return false;
}

/**
* Walks the throwable chain (including {@link RetriesExhaustedWithDetailsException} causes) to
* detect a {@link MutationBlockedIOException} surface. Mirrors the structure of
* {@link #isStaleClusterRoleRecordExceptionExistsInThrowable(Throwable)} so that batched mutation
* rejections wrapped at varying depth are still attributable to the mutation-block gate.
*/
public static boolean isMutationBlockedIOExceptionExistsInThrowable(Throwable e) {
if (e == null) {
return false;
}
if (isGivenThrowableMutationBlockedException(e)) {
return true;
}

if (e instanceof RetriesExhaustedWithDetailsException) {
for (Throwable t : ((RetriesExhaustedWithDetailsException) e).getCauses()) {
if (isGivenThrowableMutationBlockedException(t)) {
return true;
}
}
}

if (e.getCause() != null) {
return isMutationBlockedIOExceptionExistsInThrowable(e.getCause());
}

return false;
}

private static boolean isGivenThrowableStaleException(Throwable t) {
return t instanceof StaleClusterRoleRecordException;
}

private static boolean isGivenThrowableMutationBlockedException(Throwable t) {
return t instanceof MutationBlockedIOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
import static org.apache.phoenix.monitoring.MetricType.HA_CRR_CACHE_AGE_MS;
import static org.apache.phoenix.monitoring.MetricType.HA_CRR_REFRESH_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_DURATION_MS;
import static org.apache.phoenix.monitoring.MetricType.HA_MUTATION_BLOCKED_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_CREATED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_ERROR_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_FALLBACK_COUNTER;
Expand All @@ -49,6 +54,9 @@
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_TASK_TIMEOUT_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_FAILURES;
import static org.apache.phoenix.monitoring.MetricType.HA_STALE_CRR_DETECTED_COUNT;
import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
Expand Down Expand Up @@ -163,6 +171,17 @@ public enum GlobalClientMetrics {
GLOBAL_HA_PARALLEL_CONNECTION_ERROR_COUNTER(HA_PARALLEL_CONNECTION_ERROR_COUNTER),
GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER(HA_PARALLEL_CONNECTION_CREATED_COUNTER),

GLOBAL_HA_FAILOVER_COUNT(HA_FAILOVER_COUNT),
GLOBAL_HA_FAILOVER_DURATION_MS(HA_FAILOVER_DURATION_MS),
GLOBAL_HA_MUTATION_BLOCKED_COUNT(HA_MUTATION_BLOCKED_COUNT),
GLOBAL_HA_STALE_CRR_DETECTED_COUNT(HA_STALE_CRR_DETECTED_COUNT),
GLOBAL_HA_CRR_REFRESH_COUNT(HA_CRR_REFRESH_COUNT),
// GAUGE: most-recent-sample. Use getMetric().set(ageMs) at the sampling site;
// do NOT increment or update — that would accumulate and break gauge semantics.
GLOBAL_HA_CRR_CACHE_AGE_MS(HA_CRR_CACHE_AGE_MS),
GLOBAL_HA_POLLER_TICK_COUNT(HA_POLLER_TICK_COUNT),
GLOBAL_HA_POLLER_TICK_FAILURES(HA_POLLER_TICK_FAILURES),

GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER),
GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER),
GLOBAL_CLIENT_METADATA_CACHE_EVICTION_COUNTER(CLIENT_METADATA_CACHE_EVICTION_COUNTER),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,35 @@ public enum MetricType {
LogLevel.DEBUG, PLong.INSTANCE),
HA_PARALLEL_CONNECTION_CREATED_COUNTER("hpccc",
"Counter for the number of parallel phoenix connections that were created", LogLevel.DEBUG,
PLong.INSTANCE),
HA_FAILOVER_COUNT("hafc",
"Counter for cluster-level failover transitions (active URL flips between peers or recovers "
+ "from no-active state) recorded at the CRR write site",
LogLevel.DEBUG, PLong.INSTANCE),
HA_FAILOVER_DURATION_MS("hafd",
"Total time in milliseconds spent in connection-level failover transitions, summed across "
+ "all observing connections (per-connection observation, not per-cluster-event)",
LogLevel.DEBUG, PLong.INSTANCE),
HA_MUTATION_BLOCKED_COUNT("hambc",
"Counter for MutationBlockedIOException surfaces caught by wrapActionDuringFailover",
LogLevel.DEBUG, PLong.INSTANCE),
HA_STALE_CRR_DETECTED_COUNT("hascd",
"Counter for StaleClusterRoleRecordException surfaces caught by wrapActionDuringFailover",
LogLevel.DEBUG, PLong.INSTANCE),
HA_CRR_REFRESH_COUNT("hcrc", "Counter for refreshClusterRoleRecord invocations", LogLevel.DEBUG,
PLong.INSTANCE),
// GAUGE SEMANTICS: most-recent-sample, NOT an accumulator. Callers MUST use
// getMetric().set(ageMs) to overwrite the previous sample, never increment/update.
// Misuse will produce a monotonically-growing accumulator, which is the wrong shape for an
// "age since last refresh" gauge.
HA_CRR_CACHE_AGE_MS("hccg",
"Gauge: most-recent-sample of milliseconds since the last successful CRR refresh "
+ "(use getMetric().set(ageMs))",
LogLevel.DEBUG, PLong.INSTANCE),
HA_POLLER_TICK_COUNT("hptc", "Counter for non-active CRR poller ticks executed", LogLevel.DEBUG,
PLong.INSTANCE),
HA_POLLER_TICK_FAILURES("hptf",
"Counter for non-active CRR poller tick failures (caught SQLException)", LogLevel.DEBUG,
PLong.INSTANCE);

private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.phoenix.jdbc.HighAvailabilityGroup;
import org.apache.phoenix.jdbc.HighAvailabilityPolicy;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -216,6 +217,7 @@ private static void schedulePoller(String url1, String url2, String haGroupName,
Runnable pollingTask = () -> {
// Increment unconditionally so a failed tick still alternates next iteration.
long tick = tickCount.getAndIncrement();
GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_COUNT.increment();
String tickUrl = selectUrlForTick(url1, url2, tick);
try {
ClusterRoleRecord polledCrr =
Expand All @@ -242,6 +244,7 @@ private static void schedulePoller(String url1, String url2, String haGroupName,
}
}
} catch (SQLException e) {
GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_FAILURES.increment();
LOGGER.error(
"Exception found while polling for ClusterRoleRecord on {} for HA group" + " {}: {}",
tickUrl, haGroupName, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.phoenix.hbase.index.builder.IndexBuilder;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceFactory;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
Expand Down Expand Up @@ -746,6 +747,20 @@ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
// Extract HAGroupName from the mutations
Optional<ReplicationLogGroup> logGroup = getHAGroupFromBatch(c.getEnvironment(), miniBatchOp);

// Mutation batches that arrive without a resolvable HA group cannot be evaluated against
// the cluster-role-based mutation-block gate. Track the bypass globally (not per-table)
// so operators can spot regressions where a write path forgets to attach the
// _HAGroupName attribute. Scope is intentionally !logGroup.isPresent() regardless of
// dataTableName — system-HA-group writes WITH a haGroup are an *intended* gate exemption
// (state writes must proceed during a block window) and are not counted as bypasses.
if (!logGroup.isPresent()) {
try {
MetricsHaBypassSourceFactory.getInstance().incrementBypassedMutationBlockCount();
} catch (Throwable t) {
LOG.warn("Failed to increment bypassed mutation block count metric; continuing", t);
}
}

// We don't want to check for mutation blocking for the system ha group table
if (!dataTableName.equals(SYSTEM_HA_GROUP_NAME) && logGroup.isPresent()) {
// Check if mutation is blocked for the HA Group
Expand Down
Loading