From 6a5e3902fec975379deeb6694ecf1480da69cbd0 Mon Sep 17 00:00:00 2001 From: Will Ezell Date: Thu, 4 Jun 2026 22:53:12 -0400 Subject: [PATCH] perf(cache): migrate H22 cache async commits to virtual threads Replace the fixed 5-thread ThreadPoolExecutor that handles async H22 cache commits (put/remove to the embedded H2 store) with a virtual-thread-per-task executor. Each commit now runs on its own virtual thread that blocks cheaply on JDBC/disk I/O instead of contending for a small pool of platform threads. Java 25's JEP 491 means synchronized blocks in Hikari/H2 no longer pin the carrier thread, so the classic "virtual threads + JDBC pinning" concern does not apply. Behavior preserved: - The real backpressure was always the shouldAsync() pre-check, which falls back to a synchronous commit on the caller; that path is unchanged. The old CallerRunsPolicy was effectively dead code (unbounded queue never rejected). - Since a virtual-thread-per-task executor has no shared queue, the queue-depth backpressure metric now reads an AtomicInteger in-flight counter instead of asyncTaskQueue.size(); isAllocationWithinTolerance()/shouldAsync() semantics are otherwise unchanged (and the @Ignore'd H22CacheTest still compiles). - A Semaphore (sized by the existing cache_h22_async_threads knob, default 5) acquired inside each task bounds how many commits hit the Hikari pool at once, preventing connection-timeout storms and spurious shard rebuilds under load. Part of #35991. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../business/cache/provider/h22/H22Cache.java | 68 +++++++++++++------ 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/dotCMS/src/main/java/com/dotmarketing/business/cache/provider/h22/H22Cache.java b/dotCMS/src/main/java/com/dotmarketing/business/cache/provider/h22/H22Cache.java index 4392afe8aab7..f5c905ccea98 100644 --- a/dotCMS/src/main/java/com/dotmarketing/business/cache/provider/h22/H22Cache.java +++ b/dotCMS/src/main/java/com/dotmarketing/business/cache/provider/h22/H22Cache.java @@ -11,7 +11,6 @@ import com.dotmarketing.util.UtilMethods; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException; import io.vavr.control.Try; import java.io.BufferedInputStream; @@ -34,12 +33,13 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.comparator.LastModifiedFileComparator; import org.apache.commons.io.filefilter.DirectoryFileFilter; @@ -53,8 +53,13 @@ public class H22Cache extends CacheProvider { final boolean shouldAsync=Config.getBooleanProperty("cache_h22_async", true); - final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("H22-ASYNC-COMMIT-%d").build(); - final private LinkedBlockingQueue asyncTaskQueue = new LinkedBlockingQueue<>(); + // Async cache commits run on virtual threads: they block cheaply on JDBC/disk I/O instead of + // pinning a small pool of platform threads. numberOfAsyncThreads now sizes a Semaphore that caps + // how many commits hit the H2/Hikari pool concurrently (embedded H2 writes don't scale linearly), + // while inFlightTasks tracks the total async backlog that isAllocationWithinTolerance() reads. + final ThreadFactory namedThreadFactory = Thread.ofVirtual().name("H22-ASYNC-COMMIT-", 0).factory(); + private final Semaphore dbWorkPermits = new Semaphore(Math.max(1, numberOfAsyncThreads), true); + private final AtomicInteger inFlightTasks = new AtomicInteger(0); private ExecutorService executorService; @@ -115,16 +120,10 @@ public boolean isDistributed() { private ExecutorService spawnNewThreadPool() { - - if (Config.getBooleanProperty("cache.h22.async.caller.runs.policy", true)) { - return new ThreadPoolExecutor(numberOfAsyncThreads, numberOfAsyncThreads, 10, - TimeUnit.SECONDS, asyncTaskQueue, namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy() - ); - } - - return new ThreadPoolExecutor(numberOfAsyncThreads, numberOfAsyncThreads, 10, - TimeUnit.SECONDS, asyncTaskQueue, namedThreadFactory - ); + // One virtual thread per submitted commit. Backpressure is handled up-front by shouldAsync() + // (which falls back to running the commit synchronously on the caller), and dbWorkPermits caps + // how many of these virtual threads touch the H2/Hikari pool at the same time. + return Executors.newThreadPerTaskExecutor(namedThreadFactory); } @@ -183,15 +182,43 @@ public void put(final String group, final String key, final Object content) { void putAsync(final Fqn fqn, final Object content) { - - executorService.submit(()-> { + submitAsync(() -> { try { // Add the given content to the group and for a given key doUpsert(fqn, (Serializable) content); } catch (Exception e) { handleError(e, fqn); } - }); + }); + } + + /** + * Submits a cache commit to run on a virtual thread. The {@link #inFlightTasks} counter is bumped + * before submission and cleared in a {@code finally} so {@link #isAllocationWithinTolerance()} sees + * the real backlog, and the task acquires a {@link #dbWorkPermits} permit so only a bounded number + * of commits hit the H2/Hikari pool at once. + */ + private void submitAsync(final Runnable dbTask) { + inFlightTasks.incrementAndGet(); + try { + executorService.submit(() -> { + try { + dbWorkPermits.acquire(); + try { + dbTask.run(); + } finally { + dbWorkPermits.release(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + inFlightTasks.decrementAndGet(); + } + }); + } catch (RejectedExecutionException e) { + // Executor is shutting down; keep the counter balanced. + inFlightTasks.decrementAndGet(); + } } @@ -291,7 +318,7 @@ public void remove(final String group, final String key) { * @return */ boolean isAllocationWithinTolerance() { - final int size = asyncTaskQueue.size(); + final int size = inFlightTasks.get(); final float allocation = (float) size / (float) asyncTaskQueueSize; Logger.debug(H22Cache.class, () -> " size is " + size + ", allocation is " + allocation + ", tolerance is :" @@ -309,8 +336,7 @@ boolean shouldAsync() { } void removeAsync(final Fqn fqn) { - - executorService.submit(()-> { + submitAsync(() -> { try { // Invalidates from Cache a key from a given group doDelete(fqn);