From 1ff533b7ea9dd2be792d0e895c0c6c026d8f8955 Mon Sep 17 00:00:00 2001 From: lokiore Date: Mon, 8 Jun 2026 15:13:23 -0700 Subject: [PATCH] PHOENIX-7872 :- HA observability metrics for poller, CRR refresh/age, failover, mutation-block + RegionServer bypass counter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds client-side and server-side observability metrics for the Consistent Failover (CCF) high-availability path: Tier-1 client-side counters (4): - HA_POLLER_TICK_COUNT — total poller ticks across all HA groups - HA_POLLER_TICK_FAILURES — per-tick CRR fetch failures - HA_FAILOVER_COUNT — failover transitions executed by the client - HA_MUTATION_BLOCKED_COUNT — MutationBlockedIOException occurrences detected via the wrap-and-propagate path Tier-2 client-side metrics (4): - HA_FAILOVER_DURATION_MS — failover end-to-end latency histogram - HA_STALE_CRR_DETECTED_COUNT — StaleClusterRoleRecordException occurrences - HA_CRR_CACHE_AGE_MS — gauge of staleness of the in-memory CRR cache - (HA_FAILOVER_COUNT moved to applyClusterRoleRecord with role-transition guard so it only fires on actual ACTIVE -> STANDBY or STANDBY -> ACTIVE transitions) Tier-2 server-side counter (1): - BYPASSED_MUTATION_BLOCK_COUNT — emitted from IndexRegionObserver when a mutation bypasses the mutation-block check because no log group is present. Implemented as 3-file Hadoop-metrics2 source: interface + static factory (DefaultMetricsSystem.instance()) + impl. Tests: - HAGroupMetricsIT — IT covering all 8 client-side metrics - BypassedMutationBlockMetricsIT — IT covering server-side bypass counter - HighAvailabilityUtilTest — UT covering RetriesExhaustedWithDetailsException + IOException cause-chain MBIOE detection - MetricsHaBypassSourceFactoryTest — UT covering factory thread-safety Generated-by: Claude Code (Opus 4.7) --- .../jdbc/FailoverPhoenixConnection.java | 100 ++++--- .../phoenix/jdbc/HighAvailabilityGroup.java | 22 ++ .../phoenix/jdbc/HighAvailabilityUtil.java | 34 +++ .../monitoring/GlobalClientMetrics.java | 19 ++ .../apache/phoenix/monitoring/MetricType.java | 29 ++ .../util/GetClusterRoleRecordUtil.java | 3 + .../hbase/index/IndexRegionObserver.java | 15 + .../index/metrics/MetricsHaBypassSource.java | 53 ++++ .../metrics/MetricsHaBypassSourceFactory.java | 42 +++ .../metrics/MetricsHaBypassSourceImpl.java | 67 +++++ .../index/BypassedMutationBlockMetricsIT.java | 83 ++++++ .../apache/phoenix/jdbc/HAGroupMetricsIT.java | 259 ++++++++++++++++++ .../MetricsHaBypassSourceFactoryTest.java | 44 +++ .../jdbc/HighAvailabilityUtilTest.java | 71 +++++ 14 files changed, 799 insertions(+), 42 deletions(-) create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java create mode 100644 phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java index 58c79c49e9b..5916ea86c4d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java @@ -17,7 +17,11 @@ */ package org.apache.phoenix.jdbc; +import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable; import static org.apache.phoenix.jdbc.HighAvailabilityUtil.isStaleClusterRoleRecordExceptionExistsInThrowable; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_MUTATION_BLOCKED_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_STALE_CRR_DETECTED_COUNT; import java.sql.Array; import java.sql.Blob; @@ -171,55 +175,63 @@ void failover(long timeoutMs) throws SQLException { return; } - PhoenixConnection newConn = null; - SQLException cause = null; - final long startTime = EnvironmentEdgeManager.currentTimeMillis(); - while (newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs) { - try { - newConn = - context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo()); - } catch (SQLException e) { - cause = e; - LOG.info("Got exception when trying to connect to active cluster.", e); + final long failoverStartMs = EnvironmentEdgeManager.currentTimeMillis(); + try { + PhoenixConnection newConn = null; + SQLException cause = null; + final long startTime = EnvironmentEdgeManager.currentTimeMillis(); + while ( + newConn == null && EnvironmentEdgeManager.currentTimeMillis() < startTime + timeoutMs + ) { try { - Thread.sleep(100); // TODO: be smart than this - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new SQLException("Got interrupted waiting for connection failover", e); + newConn = + context.getHAGroup().connectActive(context.getProperties(), context.getHAURLInfo()); + } catch (SQLException e) { + cause = e; + LOG.info("Got exception when trying to connect to active cluster.", e); + try { + Thread.sleep(100); // TODO: be smart than this + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new SQLException("Got interrupted waiting for connection failover", e); + } } } - } - if (newConn == null) { - throw new FailoverSQLException("Can not failover connection", - context.getHAGroup().getGroupInfo().toString(), cause); - } + if (newConn == null) { + throw new FailoverSQLException("Can not failover connection", + context.getHAGroup().getGroupInfo().toString(), cause); + } - final PhoenixConnection oldConn = connection; - connection = newConn; - if (oldConn != null) { - // aggregate metrics - previousMutationMetrics = oldConn.getMutationMetrics(); - previousReadMetrics = oldConn.getReadMetrics(); - oldConn.clearMetrics(); - - // close old connection - if (!oldConn.isClosed()) { - // TODO: what happens to in-flight edits/mutations? - // Can we copy into the new connection we do not allow this failover? - // MutationState state = oldConn.getMutationState(); - try { - oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) - .setMessage("Phoenix connection got closed due to failover") - .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build() - .buildException()); - } catch (SQLException e) { - LOG.error("Failed to close old connection after failover: {}", e.getMessage()); - LOG.info("Full stack when closing old connection after failover", e); + final PhoenixConnection oldConn = connection; + connection = newConn; + if (oldConn != null) { + // aggregate metrics + previousMutationMetrics = oldConn.getMutationMetrics(); + previousReadMetrics = oldConn.getReadMetrics(); + oldConn.clearMetrics(); + + // close old connection + if (!oldConn.isClosed()) { + // TODO: what happens to in-flight edits/mutations? + // Can we copy into the new connection we do not allow this failover? + // MutationState state = oldConn.getMutationState(); + try { + oldConn.close(new SQLExceptionInfo.Builder(SQLExceptionCode.HA_CLOSED_AFTER_FAILOVER) + .setMessage("Phoenix connection got closed due to failover") + .setHaGroupInfo(context.getHAGroup().getGroupInfo().toString()).build() + .buildException()); + } catch (SQLException e) { + LOG.error("Failed to close old connection after failover: {}", e.getMessage()); + LOG.info("Full stack when closing old connection after failover", e); + } } } + LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(), + connection.getURL()); + } finally { + GLOBAL_HA_FAILOVER_DURATION_MS + .update(EnvironmentEdgeManager.currentTimeMillis() - failoverStartMs); } - LOG.info("Connection {} failed over to {}", context.getHAGroup().getGroupInfo(), - connection.getURL()); } /** @@ -324,6 +336,7 @@ T wrapActionDuringFailover(SupplierWithSQLException s) throws SQLExceptio return s.get(); } catch (Exception e) { if (isStaleClusterRoleRecordExceptionExistsInThrowable(e)) { + GLOBAL_HA_STALE_CRR_DETECTED_COUNT.increment(); // If we receive StaleClusterRoleRecordException, that means Operation was // supposed to be executed on Active Cluster but was in reality was sent to // STANDBY Cluster, that can happen only when Failover is in Progress, So we @@ -348,6 +361,9 @@ T wrapActionDuringFailover(SupplierWithSQLException s) throws SQLExceptio context.getHAGroup(), e)) .build().buildException(); } + if (isMutationBlockedIOExceptionExistsInThrowable(e)) { + GLOBAL_HA_MUTATION_BLOCKED_COUNT.increment(); + } if (policy.shouldFailover(e, ++failoverCount)) { failover(timeoutMs); } else { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java index 6b9728e7970..7181b11c5f5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.jdbc; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_COUNT; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; @@ -629,6 +632,7 @@ public void init() throws IOException, SQLException { LOG.info("Initial cluster role for HA group {} is {}", info, roleRecordFromEndpoint); roleRecord = roleRecordFromEndpoint; + lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); state = State.READY; } @@ -644,6 +648,11 @@ public Connection connect(Properties properties, HAURLInfo haurlInfo) throws SQL .setMessage("HA group is not ready!").setHaGroupInfo(info.toString()).build() .buildException(); } + // GAUGE: most-recent-sample of "milliseconds since the last successful CRR refresh". + // Use set(...) to overwrite the prior sample; do NOT increment/update — that would turn + // the gauge into an accumulator and break "current age" semantics. + GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric() + .set(System.currentTimeMillis() - lastClusterRoleRecordRefreshTime); return roleRecord.getPolicy().provide(this, properties, haurlInfo); } @@ -1035,6 +1044,7 @@ private ClusterRoleRecord getClusterRoleRecordFromEndpoint() throws SQLException * @throws SQLException if there is an error getting the ClusterRoleRecord */ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLException { + GLOBAL_HA_CRR_REFRESH_COUNT.increment(); // Allow concurrent reads to return fast in case refresh is not needed readLock.lock(); try { @@ -1057,6 +1067,8 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio ClusterRoleRecord newRoleRecord = getClusterRoleRecordFromEndpoint(); if (roleRecord == null) { + // First-load init path: no prior cache state to compare against, so this is not a + // failover transition and HA_FAILOVER_COUNT is intentionally NOT incremented here. roleRecord = newRoleRecord; lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); state = State.READY; @@ -1124,6 +1136,16 @@ public boolean refreshClusterRoleRecord(boolean forceRefresh) throws SQLExceptio // The goal here is to gain higher availability even though existing resources against // previous ACTIVE cluster may have not been closed cleanly. } + // Count the transition as a failover only when an active cluster is established + // or moves between peers. Operator-driven transitions to a no-active state + // (both clusters STANDBY) are not counted as failovers; recovery from no-active + // back to having an ACTIVE peer is counted. + if ( + !oldRecord.getActiveUrl().equals(newRoleRecord.getActiveUrl()) + && newRoleRecord.getActiveUrl().isPresent() + ) { + GLOBAL_HA_FAILOVER_COUNT.increment(); + } // Update the role record and the last refresh time roleRecord = newRoleRecord; lastClusterRoleRecordRefreshTime = System.currentTimeMillis(); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java index 1893ad3169c..386529a6a75 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityUtil.java @@ -18,6 +18,7 @@ package org.apache.phoenix.jdbc; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.phoenix.exception.MutationBlockedIOException; import org.apache.phoenix.exception.StaleClusterRoleRecordException; /** @@ -45,7 +46,40 @@ public static boolean isStaleClusterRoleRecordExceptionExistsInThrowable(Throwab return false; } + /** + * Walks the throwable chain (including {@link RetriesExhaustedWithDetailsException} causes) to + * detect a {@link MutationBlockedIOException} surface. Mirrors the structure of + * {@link #isStaleClusterRoleRecordExceptionExistsInThrowable(Throwable)} so that batched mutation + * rejections wrapped at varying depth are still attributable to the mutation-block gate. + */ + public static boolean isMutationBlockedIOExceptionExistsInThrowable(Throwable e) { + if (e == null) { + return false; + } + if (isGivenThrowableMutationBlockedException(e)) { + return true; + } + + if (e instanceof RetriesExhaustedWithDetailsException) { + for (Throwable t : ((RetriesExhaustedWithDetailsException) e).getCauses()) { + if (isGivenThrowableMutationBlockedException(t)) { + return true; + } + } + } + + if (e.getCause() != null) { + return isMutationBlockedIOExceptionExistsInThrowable(e.getCause()); + } + + return false; + } + private static boolean isGivenThrowableStaleException(Throwable t) { return t instanceof StaleClusterRoleRecordException; } + + private static boolean isGivenThrowableMutationBlockedException(Throwable t) { + return t instanceof MutationBlockedIOException; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index 89f04f8827d..bbd23bcbc52 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -35,6 +35,11 @@ import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS; import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES; import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS; +import static org.apache.phoenix.monitoring.MetricType.HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.MetricType.HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.MetricType.HA_MUTATION_BLOCKED_COUNT; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_CREATED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_ERROR_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_CONNECTION_FALLBACK_COUNTER; @@ -49,6 +54,9 @@ import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_QUEUE_WAIT_TIME; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_POOL2_TASK_REJECTED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.HA_PARALLEL_TASK_TIMEOUT_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_COUNT; +import static org.apache.phoenix.monitoring.MetricType.HA_POLLER_TICK_FAILURES; +import static org.apache.phoenix.monitoring.MetricType.HA_STALE_CRR_DETECTED_COUNT; import static org.apache.phoenix.monitoring.MetricType.HCONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE; import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; @@ -163,6 +171,17 @@ public enum GlobalClientMetrics { GLOBAL_HA_PARALLEL_CONNECTION_ERROR_COUNTER(HA_PARALLEL_CONNECTION_ERROR_COUNTER), GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER(HA_PARALLEL_CONNECTION_CREATED_COUNTER), + GLOBAL_HA_FAILOVER_COUNT(HA_FAILOVER_COUNT), + GLOBAL_HA_FAILOVER_DURATION_MS(HA_FAILOVER_DURATION_MS), + GLOBAL_HA_MUTATION_BLOCKED_COUNT(HA_MUTATION_BLOCKED_COUNT), + GLOBAL_HA_STALE_CRR_DETECTED_COUNT(HA_STALE_CRR_DETECTED_COUNT), + GLOBAL_HA_CRR_REFRESH_COUNT(HA_CRR_REFRESH_COUNT), + // GAUGE: most-recent-sample. Use getMetric().set(ageMs) at the sampling site; + // do NOT increment or update — that would accumulate and break gauge semantics. + GLOBAL_HA_CRR_CACHE_AGE_MS(HA_CRR_CACHE_AGE_MS), + GLOBAL_HA_POLLER_TICK_COUNT(HA_POLLER_TICK_COUNT), + GLOBAL_HA_POLLER_TICK_FAILURES(HA_POLLER_TICK_FAILURES), + GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER), GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER), GLOBAL_CLIENT_METADATA_CACHE_EVICTION_COUNTER(CLIENT_METADATA_CACHE_EVICTION_COUNTER), diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java index ff80705c0d4..22a9ecbdcf3 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -353,6 +353,35 @@ public enum MetricType { LogLevel.DEBUG, PLong.INSTANCE), HA_PARALLEL_CONNECTION_CREATED_COUNTER("hpccc", "Counter for the number of parallel phoenix connections that were created", LogLevel.DEBUG, + PLong.INSTANCE), + HA_FAILOVER_COUNT("hafc", + "Counter for cluster-level failover transitions (active URL flips between peers or recovers " + + "from no-active state) recorded at the CRR write site", + LogLevel.DEBUG, PLong.INSTANCE), + HA_FAILOVER_DURATION_MS("hafd", + "Total time in milliseconds spent in connection-level failover transitions, summed across " + + "all observing connections (per-connection observation, not per-cluster-event)", + LogLevel.DEBUG, PLong.INSTANCE), + HA_MUTATION_BLOCKED_COUNT("hambc", + "Counter for MutationBlockedIOException surfaces caught by wrapActionDuringFailover", + LogLevel.DEBUG, PLong.INSTANCE), + HA_STALE_CRR_DETECTED_COUNT("hascd", + "Counter for StaleClusterRoleRecordException surfaces caught by wrapActionDuringFailover", + LogLevel.DEBUG, PLong.INSTANCE), + HA_CRR_REFRESH_COUNT("hcrc", "Counter for refreshClusterRoleRecord invocations", LogLevel.DEBUG, + PLong.INSTANCE), + // GAUGE SEMANTICS: most-recent-sample, NOT an accumulator. Callers MUST use + // getMetric().set(ageMs) to overwrite the previous sample, never increment/update. + // Misuse will produce a monotonically-growing accumulator, which is the wrong shape for an + // "age since last refresh" gauge. + HA_CRR_CACHE_AGE_MS("hccg", + "Gauge: most-recent-sample of milliseconds since the last successful CRR refresh " + + "(use getMetric().set(ageMs))", + LogLevel.DEBUG, PLong.INSTANCE), + HA_POLLER_TICK_COUNT("hptc", "Counter for non-active CRR poller ticks executed", LogLevel.DEBUG, + PLong.INSTANCE), + HA_POLLER_TICK_FAILURES("hptf", + "Counter for non-active CRR poller tick failures (caught SQLException)", LogLevel.DEBUG, PLong.INSTANCE); private final String description; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java index 5adbedb1692..44d859fedf4 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java @@ -42,6 +42,7 @@ import org.apache.phoenix.jdbc.HighAvailabilityGroup; import org.apache.phoenix.jdbc.HighAvailabilityPolicy; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,6 +217,7 @@ private static void schedulePoller(String url1, String url2, String haGroupName, Runnable pollingTask = () -> { // Increment unconditionally so a failed tick still alternates next iteration. long tick = tickCount.getAndIncrement(); + GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_COUNT.increment(); String tickUrl = selectUrlForTick(url1, url2, tick); try { ClusterRoleRecord polledCrr = @@ -242,6 +244,7 @@ private static void schedulePoller(String url1, String url2, String haGroupName, } } } catch (SQLException e) { + GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_FAILURES.increment(); LOGGER.error( "Exception found while polling for ClusterRoleRecord on {} for HA group" + " {}: {}", tickUrl, haGroupName, e.getMessage()); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 25477196950..0ba7044037a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -115,6 +115,7 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceFactory; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; @@ -746,6 +747,20 @@ public void preBatchMutate(ObserverContext c, // Extract HAGroupName from the mutations Optional logGroup = getHAGroupFromBatch(c.getEnvironment(), miniBatchOp); + // Mutation batches that arrive without a resolvable HA group cannot be evaluated against + // the cluster-role-based mutation-block gate. Track the bypass globally (not per-table) + // so operators can spot regressions where a write path forgets to attach the + // _HAGroupName attribute. Scope is intentionally !logGroup.isPresent() regardless of + // dataTableName — system-HA-group writes WITH a haGroup are an *intended* gate exemption + // (state writes must proceed during a block window) and are not counted as bypasses. + if (!logGroup.isPresent()) { + try { + MetricsHaBypassSourceFactory.getInstance().incrementBypassedMutationBlockCount(); + } catch (Throwable t) { + LOG.warn("Failed to increment bypassed mutation block count metric; continuing", t); + } + } + // We don't want to check for mutation blocking for the system ha group table if (!dataTableName.equals(SYSTEM_HA_GROUP_NAME) && logGroup.isPresent()) { // Check if mutation is blocked for the HA Group diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java new file mode 100644 index 00000000000..a44dcb072da --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSource.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import org.apache.hadoop.hbase.metrics.BaseSource; + +/** + * Server-side JMX metrics source for tracking mutations that bypass the cluster-role-based + * mutation-block gate inside {@code IndexRegionObserver.preBatchMutate}. A bypass occurs when the + * mutation batch reaches the gate without an associated HA group attribute, so the gate has no + * haGroupName to evaluate state against and skips the block check entirely. + *

+ * A non-zero counter post-deploy can indicate that some write paths reach the gate without carrying + * the {@code _HAGroupName} attribute, which in turn means those writes can proceed during a + * mutation-block window and bypass the safety property the gate exists to enforce. Operators should + * treat sustained non-zero values as a regression signal. + */ +public interface MetricsHaBypassSource extends BaseSource { + + String METRICS_NAME = "HaBypass"; + String METRICS_CONTEXT = "phoenix"; + String METRICS_DESCRIPTION = + "Metrics for cluster-role-based mutation-block bypass events on the RegionServer"; + String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + String BYPASSED_MUTATION_BLOCK_COUNT = "bypassedMutationBlockCount"; + String BYPASSED_MUTATION_BLOCK_COUNT_DESC = + "Counter for mutation batches that reached preBatchMutate without an associated HA group " + + "(no _HAGroupName attribute), causing the cluster-role-based mutation-block gate to be " + + "skipped"; + + /** + * Increments the bypass counter. Called from {@code IndexRegionObserver.preBatchMutate} when the + * resolved {@code Optional} is empty (i.e., the mutation batch carries no HA + * group attribute and the mutation-block gate cannot be evaluated). + */ + void incrementBypassedMutationBlockCount(); +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java new file mode 100644 index 00000000000..66e6f3aadd9 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +/** + * Factory for the per-RegionServer {@link MetricsHaBypassSource} singleton. Unlike the per-haGroup + * factories elsewhere in the codebase, the bypass counter is a single global (per-JVM, + * per-RegionServer) counter, so this factory holds one eagerly-initialized instance rather than a + * {@code ConcurrentHashMap} keyed on haGroupName. + */ +public final class MetricsHaBypassSourceFactory { + + private static final MetricsHaBypassSource INSTANCE = new MetricsHaBypassSourceImpl(); + + private MetricsHaBypassSourceFactory() { + } + + /** + * Returns the process-wide {@link MetricsHaBypassSource} singleton. The instance is initialized + * eagerly at class-load time, so this method is thread-safe without any additional + * synchronization. + * @return the singleton {@link MetricsHaBypassSource} for this RegionServer JVM + */ + public static MetricsHaBypassSource getInstance() { + return INSTANCE; + } +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java new file mode 100644 index 00000000000..5963b11fae7 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; + +/** + * Implementation of {@link MetricsHaBypassSource} backed by a single {@link MutableFastCounter} on + * the RegionServer JMX registry. The counter is per-RegionServer (not keyed by HA group, by design + * — a bypass event has no haGroupName to attribute against). + */ +public class MetricsHaBypassSourceImpl extends BaseSourceImpl implements MetricsHaBypassSource { + + private final MutableFastCounter bypassedMutationBlockCount; + + /** + * Default constructor used by {@link MetricsHaBypassSourceFactory}. Registers the source under + * the standard {@code HaBypass} JMX context shared with operator dashboards. + */ + public MetricsHaBypassSourceImpl() { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); + } + + /** + * Test-friendly constructor that lets callers override the metrics-registry naming, so unit tests + * can register an isolated source without colliding with the production singleton. + * @param metricsName short name reported to the metrics registry + * @param metricsDescription human-readable description for the registry + * @param metricsContext hadoop-metrics2 context name + * @param metricsJmxContext JMX context (typically {@code "RegionServer,sub=" + metricsName}) + */ + public MetricsHaBypassSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + bypassedMutationBlockCount = getMetricsRegistry().newCounter(BYPASSED_MUTATION_BLOCK_COUNT, + BYPASSED_MUTATION_BLOCK_COUNT_DESC, 0L); + } + + @Override + public void incrementBypassedMutationBlockCount() { + bypassedMutationBlockCount.incr(); + } + + /** + * Test-only accessor returning the current counter value. Public so unit/integration tests + * outside this package can assert increments without going through JMX read scaffolding. + */ + public long getBypassedMutationBlockCountForTesting() { + return bypassedMutationBlockCount.value(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java new file mode 100644 index 00000000000..4db777d2877 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BypassedMutationBlockMetricsIT.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.query.BaseTest.generateUniqueName; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceFactory; +import org.apache.phoenix.hbase.index.metrics.MetricsHaBypassSourceImpl; +import org.apache.phoenix.jdbc.HABaseIT; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Integration test for the server-side {@code bypassedMutationBlockCount} JMX counter. A "bypass" + * is a mutation batch that reaches {@code IndexRegionObserver.preBatchMutate} without an associated + * HA group attribute, causing the cluster-role-based mutation-block gate to be skipped. This IT + * drives a write that does not carry {@code _HAGroupName} and asserts the counter increments. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class BypassedMutationBlockMetricsIT extends HABaseIT { + + @BeforeClass + public static synchronized void doSetup() throws Exception { + CLUSTERS.getHBaseCluster1().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.getHBaseCluster2().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @Test(timeout = 300000) + public void testBypassedMutationBlockCount() throws Exception { + MetricsHaBypassSourceImpl source = + (MetricsHaBypassSourceImpl) MetricsHaBypassSourceFactory.getInstance(); + long before = source.getBypassedMutationBlockCountForTesting(); + + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + + // Connect directly to cluster1 (single-cluster JDBC URL — no _HAGroupName attribute). + // Use the master/RPC address rather than the ZK url so the rpc URL parser doesn't + // choke on the `=` token that ZK urls embed. + Properties props = new Properties(); + try (Connection conn = + DriverManager.getConnection(CLUSTERS.getJdbcUrl(CLUSTERS.getMasterAddress1()), props)) { + conn.createStatement().execute( + "CREATE TABLE " + dataTableName + " (id VARCHAR PRIMARY KEY, name VARCHAR, age INTEGER)"); + conn.createStatement() + .execute("CREATE INDEX " + indexName + " ON " + dataTableName + "(name)"); + conn.createStatement().execute("UPSERT INTO " + dataTableName + " VALUES ('1', 'A', 1)"); + conn.commit(); + } + + long after = source.getBypassedMutationBlockCountForTesting(); + assertTrue( + "bypassedMutationBlockCount should increment for mutations without _HAGroupName attribute", + after > before); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java new file mode 100644 index 00000000000..e1b65d317ba --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupMetricsIT.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_CACHE_AGE_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_CRR_REFRESH_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_FAILOVER_DURATION_MS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_MUTATION_BLOCKED_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_COUNT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_POLLER_TICK_FAILURES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HA_STALE_CRR_DETECTED_COUNT; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.MutationBlockedIOException; +import org.apache.phoenix.execute.CommitException; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the client-side HA observability metrics. Each test resets the relevant + * {@link org.apache.phoenix.monitoring.GlobalClientMetrics} entries before exercising the code path + * that should emit them, then asserts the post-condition counter/gauge value. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HAGroupMetricsIT extends HABaseIT { + private static final Logger LOG = LoggerFactory.getLogger(HAGroupMetricsIT.class); + + @Rule + public final TestName testName = new TestName(); + + private Properties clientProperties; + private HighAvailabilityGroup haGroup; + private String tableName; + private String haGroupName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.getHBaseCluster1().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.getHBaseCluster2().getConfiguration() + .setBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, true); + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setUp() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = HighAvailabilityTestingUtility.getHATestProperties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcHAUrl(), clientProperties); + tableName = testName.getMethodName().toUpperCase(); + CLUSTERS.createTableOnClusterPair(haGroup, tableName); + resetAllHaMetrics(); + } + + @After + public void tearDown() throws Exception { + try { + haGroup.close(); + } catch (Exception e) { + LOG.error("Fail to tear down HA group; ignoring", e); + } + } + + @Test(timeout = 300000) + public void testFailoverCountAndDuration() throws Exception { + long countBefore = GLOBAL_HA_FAILOVER_COUNT.getMetric().getValue(); + long durationBefore = GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().getValue(); + + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (1, 1)"); + conn.commit(); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + } + + long countAfter = GLOBAL_HA_FAILOVER_COUNT.getMetric().getValue(); + long durationAfter = GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().getValue(); + assertTrue("HA_FAILOVER_COUNT should increment on cluster role transition", + countAfter > countBefore); + assertTrue("HA_FAILOVER_DURATION_MS sum should grow on transition", + durationAfter >= durationBefore); + } + + @Test(timeout = 300000) + public void testStaleCrrDetectedCount() throws Exception { + long before = GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().getValue(); + // Drive a cluster-role transition that the wrapped connection observes as + // stale-CRR — failover() flags STALE_CLUSTER_ROLE_RECORD and increments the counter. + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (2, 2)"); + conn.commit(); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY, ClusterRole.STANDBY); + // doRefreshHAGroup=false: keep haGroup's CRR snapshot stale on purpose so that + // the next mutation drives StaleClusterRoleRecordException through wrapActionDuringFailover, + // exercising the GLOBAL_HA_STALE_CRR_DETECTED_COUNT increment path. + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE, false); + // Issue another mutation while the connection still holds the pre-transition CRR snapshot. + // The server's HAGroupStoreManager will detect the version mismatch and throw + // StaleClusterRoleRecordException, which wrapActionDuringFailover catches to increment the + // counter (and rethrow FAILOVER_IN_PROGRESS). + try { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 3)"); + conn.commit(); + } catch (Exception expected) { + // Stale-CRR surfaces as FAILOVER_IN_PROGRESS after the wrap-and-rethrow path; the + // increment-side-effect is what we are asserting on, not the exception type itself. + } + } + long after = GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().getValue(); + assertTrue("HA_STALE_CRR_DETECTED_COUNT should strictly increment when CRR is detected stale " + + "(before=" + before + ", after=" + after + ")", after > before); + } + + @Test(timeout = 300000) + public void testMutationBlockedCount() throws Exception { + long before = GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().getValue(); + String peerZkUrl = CLUSTERS.getZkUrl2(); + PhoenixHAAdmin haAdmin = CLUSTERS.getHaAdmin1(); + + try (FailoverPhoenixConnection conn = (FailoverPhoenixConnection) DriverManager + .getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 3)"); + conn.commit(); + + HAGroupStoreRecord blocking = + new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, 0L, + HighAvailabilityPolicy.FAILOVER.toString(), peerZkUrl, CLUSTERS.getMasterAddress1(), + CLUSTERS.getMasterAddress2(), CLUSTERS.getHdfsUrl1(), CLUSTERS.getHdfsUrl2(), 0L); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, blocking, -1); + Thread.sleep(1000L); + + try { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (4, 4)"); + conn.commit(); + fail("Expected MutationBlockedIOException during ACTIVE_TO_STANDBY"); + } catch (CommitException e) { + Throwable cause = e.getCause(); + assertTrue("Expected MutationBlockedIOException in chain", + cause instanceof RetriesExhaustedWithDetailsException + && ((RetriesExhaustedWithDetailsException) cause) + .getCause(0) instanceof MutationBlockedIOException); + } + } + long after = GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().getValue(); + assertTrue("HA_MUTATION_BLOCKED_COUNT should increment when MBE is observed", after > before); + } + + @Test(timeout = 300000) + public void testCrrRefreshCount() throws Exception { + long before = GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().getValue(); + haGroup.refreshClusterRoleRecord(false); + haGroup.refreshClusterRoleRecord(false); + long after = GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().getValue(); + assertTrue("HA_CRR_REFRESH_COUNT should increment on each refresh", after - before >= 2); + } + + @Test(timeout = 300000) + public void testCrrCacheAgeMs() throws Exception { + // Pre-refresh assertion: a connect() against a freshly init()-ed HA group must sample a + // bounded age; if init() forgets to seed lastClusterRoleRecordRefreshTime the gauge will + // record currentTimeMillis() - 0 (~1.7e12 ms) and this assertion will fail. + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (4, 4)"); + conn.commit(); + } + long ageBeforeRefresh = GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().getValue(); + assertTrue("HA_CRR_CACHE_AGE_MS gauge must be bounded on init path (was " + ageBeforeRefresh + + " ms, expected < 60_000)", ageBeforeRefresh < 60_000L); + + // Force a refresh so lastClusterRoleRecordRefreshTime is recent, then sleep so the + // gauge sample (taken on connect()) records a non-trivial age. + haGroup.refreshClusterRoleRecord(false); + Thread.sleep(50L); + try (Connection conn = DriverManager.getConnection(CLUSTERS.getJdbcHAUrl(), clientProperties)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (5, 5)"); + conn.commit(); + } + long ageMs = GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().getValue(); + // Gauge holds the most-recent set() sample; should be > 0 and within a sane bound. + assertTrue("HA_CRR_CACHE_AGE_MS gauge should be > 0 after a connect()", ageMs > 0L); + assertTrue("HA_CRR_CACHE_AGE_MS gauge should be < 5 minutes for a fresh HA group", + ageMs < 5 * 60 * 1000L); + } + + @Test(timeout = 300000) + public void testPollerTickCount() throws Exception { + // The poller starts only when fetchClusterRoleRecord observes both roles non-active under + // FAILOVER policy. Drive that state, await a couple of ticks, and verify the counter moved. + long beforeTicks = GLOBAL_HA_POLLER_TICK_COUNT.getMetric().getValue(); + long beforeFailures = GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().getValue(); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE_TO_STANDBY, ClusterRole.STANDBY); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.STANDBY); + + // Allow at least 2 poller ticks at default interval to land. + Thread.sleep(15_000L); + + long afterTicks = GLOBAL_HA_POLLER_TICK_COUNT.getMetric().getValue(); + long afterFailures = GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().getValue(); + assertTrue("HA_POLLER_TICK_COUNT should advance once poller is scheduled", + afterTicks > beforeTicks); + // Failures may or may not occur in mini-cluster; just assert non-decreasing. + assertTrue("HA_POLLER_TICK_FAILURES should be monotonic", afterFailures >= beforeFailures); + } + + private void resetAllHaMetrics() { + GLOBAL_HA_FAILOVER_COUNT.getMetric().reset(); + GLOBAL_HA_FAILOVER_DURATION_MS.getMetric().reset(); + GLOBAL_HA_MUTATION_BLOCKED_COUNT.getMetric().reset(); + GLOBAL_HA_STALE_CRR_DETECTED_COUNT.getMetric().reset(); + GLOBAL_HA_CRR_REFRESH_COUNT.getMetric().reset(); + GLOBAL_HA_CRR_CACHE_AGE_MS.getMetric().reset(); + GLOBAL_HA_POLLER_TICK_COUNT.getMetric().reset(); + GLOBAL_HA_POLLER_TICK_FAILURES.getMetric().reset(); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java new file mode 100644 index 00000000000..18110784eab --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/metrics/MetricsHaBypassSourceFactoryTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +import org.junit.Test; + +/** + * Unit tests for {@link MetricsHaBypassSourceFactory}. Asserts the eager-init singleton contract so + * a future refactor that swaps the field for a non-singleton (or a non-thread-safe) shape gets + * caught here rather than in production JMX double-registration failures. + */ +public class MetricsHaBypassSourceFactoryTest { + + @Test + public void testGetInstanceReturnsNonNull() { + assertNotNull("getInstance() must return a non-null singleton", + MetricsHaBypassSourceFactory.getInstance()); + } + + @Test + public void testGetInstanceIsIdempotent() { + MetricsHaBypassSource first = MetricsHaBypassSourceFactory.getInstance(); + MetricsHaBypassSource second = MetricsHaBypassSourceFactory.getInstance(); + assertSame("getInstance() must return the same singleton across calls", first, second); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java new file mode 100644 index 00000000000..0f28104a2c7 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/HighAvailabilityUtilTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Row; +import org.apache.phoenix.exception.MutationBlockedIOException; +import org.junit.Test; + +/** + * Unit tests for {@link HighAvailabilityUtil#isMutationBlockedIOExceptionExistsInThrowable}. Walks + * the four detection branches (null guard, direct match, RetriesExhaustedWithDetailsException + * causes, recursive {@code getCause()}) since the helper is invoked from commit-path error handling + * where regression of any branch would silently mis-attribute mutation-block rejections. + */ +public class HighAvailabilityUtilTest { + + @Test + public void testNullThrowableReturnsFalse() { + assertFalse("null Throwable must short-circuit to false", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(null)); + } + + @Test + public void testDirectMutationBlockedIOExceptionMatches() { + Throwable e = new MutationBlockedIOException("blocked"); + assertTrue("direct MutationBlockedIOException must match", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(e)); + } + + @Test + public void testRetriesExhaustedCausesAreScanned() { + MutationBlockedIOException blocked = new MutationBlockedIOException("blocked-in-batch"); + IOException unrelated = new IOException("unrelated"); + RetriesExhaustedWithDetailsException rewde = + new RetriesExhaustedWithDetailsException(Arrays. asList(unrelated, blocked), + Collections. emptyList(), Collections. emptyList()); + assertTrue("RetriesExhaustedWithDetailsException causes must be scanned for MBIOE", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(rewde)); + } + + @Test + public void testRecursiveGetCauseFindsNestedMutationBlocked() { + MutationBlockedIOException root = new MutationBlockedIOException("root-cause"); + Throwable mid = new RuntimeException("mid", root); + Throwable outer = new RuntimeException("outer", mid); + assertTrue("recursive getCause() walk must surface a deeply-nested MBIOE", + HighAvailabilityUtil.isMutationBlockedIOExceptionExistsInThrowable(outer)); + } +}