Skip to content

CAMEL-23602: Honor maxQueueSize in threads EIP with virtual threads#23480

Open
Croway wants to merge 2 commits into
apache:mainfrom
Croway:CAMEL-23602
Open

CAMEL-23602: Honor maxQueueSize in threads EIP with virtual threads#23480
Croway wants to merge 2 commits into
apache:mainfrom
Croway:CAMEL-23602

Conversation

@Croway
Copy link
Copy Markdown
Contributor

@Croway Croway commented May 22, 2026

Summary

The Problem

When virtual threads are enabled (camel.threads.virtual.enabled=true), DefaultThreadPoolFactory.VIRTUAL.newThreadPool() discards all parameters — including maxQueueSize — and returns an unbounded Executors.newThreadPerTaskExecutor(). This destroys the backpressure mechanism: polling consumers (SQS, JMS, etc.) pull messages without limit.

This PR adds BoundedExecutorService, a semaphore-based ExecutorService wrapper that enforces a flat concurrency cap on delegated tasks. The implementation follows the pattern recommended by JEP 444 for limiting concurrency with virtual threads.

The implementation is based on JEP 444, but I've added support for some Camel internal details, like RejectedPolicy, therefore, a route like:

from(..).threads().maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.*)

will behave similarly with Virtual Threads and Non Virtual Threads. Similarly becuase virtual threads are simply different, even the implementation differs from the non virtual threads, but from the user perspective in most of the use case (I hope!) will be the same.

Changes

  • BoundedExecutorService (camel-util): wraps any ExecutorService with a Semaphore that limits the maximum number of concurrently delegated tasks. Supports three saturation policies via ThreadPoolRejectedPolicy:
    • CallerRuns (default): blocks up to keepAliveTime, then runs on caller's thread
    • Abort: blocks up to keepAliveTime, then throws RejectedExecutionException
    • Block (new): blocks indefinitely until a permit is available
  • DefaultThreadPoolFactory.VIRTUAL: wraps newThreadPerTaskExecutor with BoundedExecutorService when maxQueueSize > 0, using maxPoolSize + maxQueueSize as the concurrency cap and keepAliveTime as the acquisition timeout
  • ThreadPoolRejectedPolicy: adds Block policy — blocks the caller until capacity is available, for message broker and batch workloads
  • Documentation: updates threading-model.adoc and virtual-threads.adoc with rejected policy reference, bounded concurrency semantics, and virtual thread specifics

Behavioral notes

  • Unlike ThreadPoolExecutor where pool threads and queued tasks are distinct, the semaphore enforces a flat concurrency cap — all permitted tasks execute immediately on virtual threads
  • CallerRuns tasks execute outside semaphore accounting, so total system concurrency may temporarily exceed maxConcurrent (same as platform thread CallerRunsPolicy)
  • keepAliveTime is repurposed as the semaphore acquisition timeout (pool sizing parameters are not applicable to virtual threads)
  • Exposes operational metrics: activeCount, availablePermits, waitingCount, callerRunsCount, rejectedCount, delegatedTaskCount

Test plan

  • Unit tests: BoundedExecutorServiceTest
  • JMH benchmarks: throughput (no regression vs platform threads), concurrency bounding (strict cap enforced), memory (no leaks, stable GC allocation), full Camel route end-to-end

@Croway Croway requested review from davsclaus and gnodet May 22, 2026 13:17
When virtual threads are enabled, DefaultThreadPoolFactory.VIRTUAL discards
all parameters and returns an unbounded newThreadPerTaskExecutor, ignoring
maxQueueSize and destroying the backpressure mechanism.

Add BoundedExecutorService that wraps the virtual thread executor with a
Semaphore-based concurrency cap, following the pattern recommended by
JEP 444 for limiting concurrency with virtual threads. The semaphore
enforces a flat cap of maxPoolSize + maxQueueSize on delegated tasks.

Wire the existing rejectedPolicy (CallerRuns, Abort) through to the
wrapper and add a new Block policy that blocks indefinitely until a
permit is available. keepAliveTime is reused as the semaphore acquisition
timeout.

Expose operational metrics: activeCount, availablePermits, waitingCount,
callerRunsCount, rejectedCount, delegatedTaskCount.
@Croway Croway requested a review from oscerd May 22, 2026 13:31
Copy link
Copy Markdown
Contributor

@gnodet gnodet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: CAMEL-23602 — Honor maxQueueSize in threads EIP with virtual threads

Overall, the core idea is sound — wrapping newThreadPerTaskExecutor with a semaphore to enforce bounded concurrency when maxQueueSize > 0 is the right approach and aligns with the JEP 444 guidance. The BoundedExecutorService implementation is well-structured and the documentation is thorough. However, there are several issues that need to be addressed before merging.


1. Wildcard import in DefaultThreadPoolFactory

The PR replaces the five explicit imports with import org.apache.camel.util.concurrent.*;. The project style is to always use explicit imports (no wildcard imports are used anywhere in core/camel-support). This will likely be caught by impsort in CI.

Please replace with explicit imports:

import org.apache.camel.util.concurrent.BoundedExecutorService;
import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
import org.apache.camel.util.concurrent.ThreadFactoryTypeAware;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.apache.camel.util.concurrent.ThreadType;

2. Missing @Metadata(enums=...) updates — Block not usable from XML/YAML DSL

Adding Block to the ThreadPoolRejectedPolicy enum is only half the story. The model classes that expose this to DSLs still enumerate only Abort,CallerRuns:

  • core/camel-core-model/.../ThreadsDefinition.java line 70: enums = "Abort,CallerRuns"
  • core/camel-core-model/.../ThreadPoolProfileDefinition.java line 61: enums = "Abort,CallerRuns"
  • core/camel-core-xml/.../AbstractCamelThreadPoolFactoryBean.java line 61: enums = "Abort,CallerRuns"

Without updating these, Block will not appear in the generated catalog metadata (threads.json, threadPoolProfile.json), the YAML DSL schema, or XML schema. Users configuring via YAML or XML won't be able to use the new policy. These annotations and their generated downstream files need to be updated.


3. Missing upgrade guide entry

Adding a new Block enum value to ThreadPoolRejectedPolicy and changing the behavior of threads() EIP with virtual threads are user-visible changes. An upgrade guide entry in docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc is needed documenting:

  • The new Block rejected policy
  • That maxQueueSize is now honored with virtual threads (behavioral change)
  • That keepAliveTime is repurposed as semaphore timeout with virtual threads

4. resolvePolicy uses toString() for Block detection — fragile

private static ThreadPoolRejectedPolicy resolvePolicy(RejectedExecutionHandler handler) {
    if (handler == null || handler instanceof ThreadPoolExecutor.CallerRunsPolicy) {
        return ThreadPoolRejectedPolicy.CallerRuns;
    }
    if ("Block".equals(handler.toString())) {
        return ThreadPoolRejectedPolicy.Block;
    }
    return ThreadPoolRejectedPolicy.Abort;
}

Using toString() to reverse-map a handler back to a policy is fragile. The CallerRuns case uses instanceof (which works because asRejectedExecutionHandler() creates a CallerRunsPolicy subclass), but Block relies on the anonymous class's toString(). A better approach would be either:

(a) Pass the ThreadPoolRejectedPolicy directly through the internal method (you have access to it from ThreadPoolProfile.getRejectedPolicy() in the newThreadPool(ThreadPoolProfile, ThreadFactory) entry point), or

(b) Create a tagged interface/class for the Block handler so instanceof can be used consistently.

The newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) override already has the profile, and the profile carries the ThreadPoolRejectedPolicy enum. The internal delegation could pass both the handler (for platform) and the policy enum (for virtual), avoiding the reverse-mapping entirely.


5. Potential semaphore permit leak on non-RejectedExecutionException failures

In BoundedExecutorService.execute():

delegate.execute(() -> {
    try {
        command.run();
    } finally {
        delegatedTaskCount.increment();
        semaphore.release();
    }
});

The delegate.execute() call itself could throw something other than RejectedExecutionException — e.g., an OutOfMemoryError from thread creation failure. The catch block only handles RejectedExecutionException:

} catch (RejectedExecutionException e) {
    if (acquired) {
        semaphore.release();
    }
    throw e;
}

Any other throwable would leave the permit acquired but never released. Consider using a broader catch:

} catch (Throwable e) {
    if (acquired) {
        semaphore.release();
    }
    if (e instanceof RejectedExecutionException ree) {
        throw ree;
    }
    throw new RejectedExecutionException("Failed to delegate task", e);
}

Or restructure to use a flag that tracks whether the lambda was submitted:

boolean submitted = false;
try {
    delegate.execute(() -> { ... });
    submitted = true;
} finally {
    if (acquired && !submitted) {
        semaphore.release();
    }
}

6. Tests use Thread.sleep() instead of Awaitility

The project guidelines require using Awaitility instead of Thread.sleep() in tests. The BoundedExecutorServiceTest has three Thread.sleep() calls:

  • Thread.sleep(200) in testBlockForeverPolicy
  • Thread.sleep(50) in testConcurrencyBounded
  • Thread.sleep(50) in testPermitsReleasedAfterCompletion

Please replace these with Awaitility.await() assertions. For example, the sleep in testPermitsReleasedAfterCompletion waiting for permits to be released could use:

await().atMost(5, TimeUnit.SECONDS)
       .untilAsserted(() -> assertEquals(2, sized.getAvailablePermits()));

7. No test coverage for submit() path

ThreadsProcessor uses executorService.submit(call), not execute(). All tests in BoundedExecutorServiceTest use execute() directly. While submit() delegates to execute() via AbstractExecutorService, testing the submit() path would verify that the newTaskFor() override works correctly with Rejectable tasks and that the Future semantics are correct when caller-runs activates.


Minor notes

  • The fix for the stale Discard, DiscardOldest reference in threading-model.adoc (line 32) is a good cleanup — those were removed in CAMEL-19091 but the docs were never updated.
  • The Block policy's handler for platform threads (executor.getQueue().put(r)) has a standard TOCTOU race with shutdown, but this matches well-known blocking handler patterns and is acceptable.
  • The ThreadsProcessor.handleException() method (line 140) checks instanceof ThreadPoolExecutor — with BoundedExecutorService, this falls through to the else branch which sets the exception on the exchange. This is functionally correct for the Abort case but worth a brief comment in the code.

Claude Code on behalf of Guillaume Nodet

@github-actions
Copy link
Copy Markdown
Contributor

🌟 Thank you for your contribution to the Apache Camel project! 🌟
🤖 CI automation will test this PR automatically.

🐫 Apache Camel Committers, please review the following items:

  • First-time contributors require MANUAL approval for the GitHub Actions to run
  • You can use the command /component-test (camel-)component-name1 (camel-)component-name2.. to request a test from the test bot although they are normally detected and executed by CI.
  • You can label PRs using skip-tests and test-dependents to fine-tune the checks executed by this PR.
  • Build and test logs are available in the summary page. Only Apache Camel committers have access to the summary.

⚠️ Be careful when sharing logs. Review their contents before sharing them publicly.

@Croway Croway force-pushed the CAMEL-23602 branch 2 times, most recently from 6b50532 to 33164bb Compare May 22, 2026 15:19
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 22, 2026

🧪 CI tested the following changed modules:

  • catalog/camel-catalog
  • components/camel-spring-parent/camel-spring-xml
  • core/camel-core-model
  • core/camel-core-xml
  • core/camel-support
  • core/camel-util
  • docs
  • dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers
  • dsl/camel-yaml-dsl/camel-yaml-dsl

ℹ️ Dependent modules were not tested because the total number of affected modules exceeded the threshold (50). Use the test-dependents label to force testing all dependents.

POM dependency changes: targeted tests included

Modules affected by dependency changes (9)
  • :camel-catalog
  • :camel-core-model
  • :camel-core-xml
  • :camel-spring-xml
  • :camel-support
  • :camel-util
  • :camel-yaml-dsl
  • :camel-yaml-dsl-deserializers
  • :docs
Build reactor — dependencies compiled but only changed modules were tested (9 modules)
  • Camel :: Catalog :: Camel Catalog
  • Camel :: Core Model
  • Camel :: Core XML
  • Camel :: Docs
  • Camel :: Spring XML
  • Camel :: Support
  • Camel :: Util
  • Camel :: YAML DSL
  • Camel :: YAML DSL :: Deserializers

⚙️ View full build and test results

}
return new BoundedExecutorService(
ThreadPoolFactoryType.newThreadPerTaskExecutor(factory),
profile.getMaxPoolSize() + profile.getMaxQueueSize(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if I wanted to limit to say 50, then I would have to pass in a profile of maxPoolSize=0 and maxQueueSize=50 since the default of maxPoolSize=20 would be used otherwise?

I would find it easier to only use profile.getMaxQueueSize()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, sounds reasonable, the previous approach was mimicking the normal Thread scenario, but in the context of Virtual Thread is just confusing. I've updated with only profile.getMaxQueueSize()

- Replace wildcard import with explicit imports
- Add Block to @metadata enums in ThreadsDefinition,
  ThreadPoolProfileDefinition, AbstractCamelThreadPoolFactoryBean
- Add upgrade guide entry for maxQueueSize and Block policy
- Pass ThreadPoolRejectedPolicy directly from profile instead of
  reverse-mapping from RejectedExecutionHandler via toString()
- Fix permit leak on non-RejectedExecutionException failures using
  submitted flag pattern
- Replace Thread.sleep() with Awaitility in tests
- Add submit() path test for Future semantics
Copy link
Copy Markdown
Contributor

@gnodet gnodet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All seven issues from the prior review have been addressed:

  1. Wildcard import -- replaced with explicit imports.
  2. Missing @Metadata(enums=...) updates -- Block added to ThreadsDefinition, ThreadPoolProfileDefinition, and AbstractCamelThreadPoolFactoryBean; all generated downstream files (catalog JSON, YAML DSL schemas, ModelDeserializers) updated.
  3. Missing upgrade guide entry -- added to camel-4x-upgrade-guide-4_21.adoc covering both the virtual threads maxQueueSize behavioral change and the new Block policy.
  4. Fragile resolvePolicy using toString() -- eliminated entirely by passing ThreadPoolRejectedPolicy directly from the profile in the virtual threads path, avoiding the reverse-mapping.
  5. Semaphore permit leak -- fixed with the submitted flag pattern, so permits are released in the finally block if delegate.execute() fails for any reason.
  6. Thread.sleep() in tests -- replaced with Awaitility throughout.
  7. No submit() test coverage -- added testSubmitReturnsFuture().

The simon-ras feedback (simplifying the concurrency cap from maxPoolSize + maxQueueSize to just maxQueueSize) was also incorporated, which makes the virtual threads semantics clearer and avoids confusion with pool sizing parameters that don't apply to virtual threads.

The documentation updates in threading-model.adoc and virtual-threads.adoc are thorough and accurately describe the new behavior.

Claude Code on behalf of Guillaume Nodet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants