From 2b1341c385fd2e725769b3071d8b095a90b2b053 Mon Sep 17 00:00:00 2001 From: Aman Poonia Date: Wed, 20 May 2026 00:22:17 +0530 Subject: [PATCH 1/2] PHOENIX-7859 Make ParallelPhoenixConnectionFallbackIT deterministic by checking queue state directly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test ParallelPhoenixConnectionFallbackIT.testParallelConnectionBackoff times out intermittently in CI when polling hasCapacity() to detect when executor queues fill up. Root cause: hasCapacity() performs a multi-step calculation (read queue size, read capacity, divide, compare threshold) which creates a race condition. Tasks can enter queues during calculation steps, causing the check to miss state transitions. Solution: Check queue.size() >= 1 directly (single atomic operation), then verify hasCapacity() matches expected state as an assertion. Benefits: - Eliminates race condition (atomic read vs multi-step calculation) - More deterministic (checks actual state, not derived value) - Maintains 5s timeout (no increase needed) - Validates both queue state and hasCapacity() logic - Adds debug logging for troubleshooting Testing: Passed locally with HBase 2.6.5. Queues filled in ~105ms (2 checks), well under 5s timeout. State transition [0,0] → [1,1] detected reliably. hasCapacity() correctly returned [false, false]. Related: PHOENIX-6840 (flaky ParallelPhoenix tests) Co-authored-by: Claude Code --- .../ParallelPhoenixConnectionFallbackIT.java | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java index 3a1ae7d3119..57ba3cd945d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java @@ -23,16 +23,19 @@ import static org.apache.phoenix.jdbc.HighAvailabilityPolicy.PARALLEL; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Supplier; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.junit.AfterClass; @@ -108,9 +111,27 @@ public void testParallelConnectionBackoff() throws Exception { Future futureConnB = executor.submit(() -> DriverManager.getConnection(jdbcUrl, PROPERTIES)); - // The previous call of connection creation should fill the queue by half. - waitFor(() -> !PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(0) - && !PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(1), 100, 5000); + // PHOENIX-7859: Poll actual queue state, not the hasCapacity() composite — the multi-step + // calculation (size/capacity < threshold) had a race window. We now check queue.size() + // directly, then verify hasCapacity() matches expectations. + // Note: queueSize >= 1 triggers !hasCapacity() because HA_MAX_QUEUE_SIZE=2 and + // HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD=0.5, so 1/2 = 0.5 which is NOT < 0.5. + waitFor(() -> { + List services = + PhoenixHAExecutorServiceProvider.get(PROPERTIES); + int queueSize1 = ((ThreadPoolExecutor) services.get(0).getExecutorService()).getQueue().size(); + int queueSize2 = ((ThreadPoolExecutor) services.get(1).getExecutorService()).getQueue().size(); + + LOG.debug("Waiting for queues to fill: cluster1 queue={}, cluster2 queue={}", + queueSize1, queueSize2); + + return queueSize1 >= 1 && queueSize2 >= 1; + }, 100, 5000); + + // Verify that hasCapacity() now correctly reports no capacity + List capacity = PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES); + assertFalse("Cluster 1 should have no capacity after queues filled", capacity.get(0).booleanValue()); + assertFalse("Cluster 2 should have no capacity after queues filled", capacity.get(1).booleanValue()); // This should be backed off now, as the capacity is not available. Connection connC = DriverManager.getConnection(jdbcUrl, PROPERTIES); From 1422e69be8f5de95e5d7ef1cf676aa69a05dcef2 Mon Sep 17 00:00:00 2001 From: Aman Poonia Date: Tue, 9 Jun 2026 15:56:24 +0530 Subject: [PATCH 2/2] PHOENIX-7859 Increase waitFor timeouts to tolerate CI host CPU starvation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 5s timeouts were too tight for busy Yetus workers where newly created thread pools may not be scheduled for several seconds. waitFor() exits as soon as the condition is true, so raising the ceiling to 60s costs nothing on fast machines while giving CI adequate headroom. Also reverts the queue.size() polling approach — the hasCapacity() race window is at most one poll tick (~100ms), not the cause of 5s timeouts. --- .../ParallelPhoenixConnectionFallbackIT.java | 39 +++++-------------- 1 file changed, 9 insertions(+), 30 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java index 57ba3cd945d..c56e96f17fd 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/ParallelPhoenixConnectionFallbackIT.java @@ -23,19 +23,16 @@ import static org.apache.phoenix.jdbc.HighAvailabilityPolicy.PARALLEL; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Supplier; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.junit.AfterClass; @@ -97,12 +94,12 @@ public void testParallelConnectionBackoff() throws Exception { CountDownLatch cdl1 = new CountDownLatch(1); CountDownLatch cdl2 = new CountDownLatch(1); ParallelPhoenixContext contextA = ((ParallelPhoenixConnection) connA).getContext(); - waitFor(() -> contextA.getChainOnConn1().isDone(), 100, 5000); - waitFor(() -> contextA.getChainOnConn2().isDone(), 100, 5000); + waitFor(() -> contextA.getChainOnConn1().isDone(), 100, 60000); + waitFor(() -> contextA.getChainOnConn2().isDone(), 100, 60000); contextA.chainOnConn1(getSuplierWithLatch(cdl1)); contextA.chainOnConn2(getSuplierWithLatch(cdl2)); waitFor(() -> PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(0) - && PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(1), 100, 5000); + && PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(1), 100, 60000); ExecutorService executor = Executors.newFixedThreadPool(1); // Since both the cluster executors are busy, the new connection will be // put in the executor queue, and a connection won't be returned unless @@ -111,27 +108,9 @@ public void testParallelConnectionBackoff() throws Exception { Future futureConnB = executor.submit(() -> DriverManager.getConnection(jdbcUrl, PROPERTIES)); - // PHOENIX-7859: Poll actual queue state, not the hasCapacity() composite — the multi-step - // calculation (size/capacity < threshold) had a race window. We now check queue.size() - // directly, then verify hasCapacity() matches expectations. - // Note: queueSize >= 1 triggers !hasCapacity() because HA_MAX_QUEUE_SIZE=2 and - // HA_THREADPOOL_QUEUE_BACKOFF_THRESHOLD=0.5, so 1/2 = 0.5 which is NOT < 0.5. - waitFor(() -> { - List services = - PhoenixHAExecutorServiceProvider.get(PROPERTIES); - int queueSize1 = ((ThreadPoolExecutor) services.get(0).getExecutorService()).getQueue().size(); - int queueSize2 = ((ThreadPoolExecutor) services.get(1).getExecutorService()).getQueue().size(); - - LOG.debug("Waiting for queues to fill: cluster1 queue={}, cluster2 queue={}", - queueSize1, queueSize2); - - return queueSize1 >= 1 && queueSize2 >= 1; - }, 100, 5000); - - // Verify that hasCapacity() now correctly reports no capacity - List capacity = PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES); - assertFalse("Cluster 1 should have no capacity after queues filled", capacity.get(0).booleanValue()); - assertFalse("Cluster 2 should have no capacity after queues filled", capacity.get(1).booleanValue()); + // The previous call of connection creation should fill the queue by half. + waitFor(() -> !PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(0) + && !PhoenixHAExecutorServiceProvider.hasCapacity(PROPERTIES).get(1), 100, 60000); // This should be backed off now, as the capacity is not available. Connection connC = DriverManager.getConnection(jdbcUrl, PROPERTIES); @@ -143,14 +122,14 @@ public void testParallelConnectionBackoff() throws Exception { cdl2.countDown(); // Once the previous tasks are done, we expect the futureConnB to be picked and be done. - waitFor(() -> futureConnB.isDone(), 1000, 5000); + waitFor(() -> futureConnB.isDone(), 1000, 60000); Connection connB = futureConnB.get(); assertTrue(connB instanceof ParallelPhoenixConnection); doTestBasicOperationsWithConnection(connB, tableName, haGroupName); ParallelPhoenixContext contextB = ((ParallelPhoenixConnection) connB).getContext(); - waitFor(() -> contextB.getChainOnConn1().isDone(), 100, 5000); - waitFor(() -> contextB.getChainOnConn2().isDone(), 100, 5000); + waitFor(() -> contextB.getChainOnConn1().isDone(), 100, 60000); + waitFor(() -> contextB.getChainOnConn2().isDone(), 100, 60000); // Now that the queue has capacity, this should be ParallelLPhoenixConnection. Connection connD = DriverManager.getConnection(jdbcUrl, PROPERTIES);