diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json index 152d7a9935c14..fa8a0a18b9b49 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threadPoolProfile.json @@ -22,6 +22,6 @@ "timeUnit": { "index": 7, "kind": "attribute", "displayName": "Time Unit", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the time unit to use for keep alive time By default SECONDS is used." }, "maxQueueSize": { "index": 8, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" }, "allowCoreThreadTimeOut": { "index": 9, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether idle core threads is allowed to timeout and therefore can shrink the pool size below the core pool size Is by default true" }, - "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." } + "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." } } } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json index aae4d04db0d89..cb1c9325c8ee6 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/threads.json @@ -24,7 +24,7 @@ "maxQueueSize": { "index": 9, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" }, "allowCoreThreadTimeOut": { "index": 10, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether idle core threads are allowed to timeout and therefore can shrink the pool size below the core pool size Is by default false" }, "threadName": { "index": 11, "kind": "attribute", "displayName": "Thread Name", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Threads", "description": "Sets the thread name to use." }, - "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." }, + "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." }, "callerRunsWhenRejected": { "index": 13, "kind": "attribute", "displayName": "Caller Runs When Rejected", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether or not to use as caller runs as fallback when a task is rejected being added to the thread pool (when its full). This is only used as fallback if no rejectedPolicy has been configured, or the thread pool has no configured rejection handler. Is by default true" } } } diff --git a/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json b/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json index ac99539187c92..46284132427cb 100644 --- a/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json +++ b/components/camel-spring-parent/camel-spring-xml/src/generated/resources/META-INF/org/apache/camel/spring/xml/threadPool.json @@ -19,7 +19,7 @@ "timeUnit": { "index": 4, "kind": "attribute", "displayName": "Time Unit", "group": "common", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "SECONDS", "description": "Sets the time unit used for keep alive time" }, "maxQueueSize": { "index": 5, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 for an unbounded queue" }, "allowCoreThreadTimeOut": { "index": 6, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether to allow core threads to timeout" }, - "rejectedPolicy": { "index": 7, "kind": "attribute", "displayName": "Rejected Policy", "group": "common", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "CallerRuns", "description": "Sets the handler for tasks which cannot be executed by the thread pool." }, + "rejectedPolicy": { "index": 7, "kind": "attribute", "displayName": "Rejected Policy", "group": "common", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "CallerRuns", "description": "Sets the handler for tasks which cannot be executed by the thread pool." }, "threadName": { "index": 8, "kind": "attribute", "displayName": "Thread Name", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom thread name \/ pattern" }, "scheduled": { "index": 9, "kind": "attribute", "displayName": "Scheduled", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to use a scheduled thread pool" }, "camelContextId": { "index": 10, "kind": "attribute", "displayName": "Camel Context Id", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Id of CamelContext to use if there are multiple CamelContexts in the same JVM" } diff --git a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json index 152d7a9935c14..fa8a0a18b9b49 100644 --- a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json +++ b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threadPoolProfile.json @@ -22,6 +22,6 @@ "timeUnit": { "index": 7, "kind": "attribute", "displayName": "Time Unit", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the time unit to use for keep alive time By default SECONDS is used." }, "maxQueueSize": { "index": 8, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" }, "allowCoreThreadTimeOut": { "index": 9, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether idle core threads is allowed to timeout and therefore can shrink the pool size below the core pool size Is by default true" }, - "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." } + "rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." } } } diff --git a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json index aae4d04db0d89..cb1c9325c8ee6 100644 --- a/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json +++ b/core/camel-core-model/src/generated/resources/META-INF/org/apache/camel/model/threads.json @@ -24,7 +24,7 @@ "maxQueueSize": { "index": 9, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" }, "allowCoreThreadTimeOut": { "index": 10, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether idle core threads are allowed to timeout and therefore can shrink the pool size below the core pool size Is by default false" }, "threadName": { "index": 11, "kind": "attribute", "displayName": "Thread Name", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Threads", "description": "Sets the thread name to use." }, - "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." }, + "rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." }, "callerRunsWhenRejected": { "index": 13, "kind": "attribute", "displayName": "Caller Runs When Rejected", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether or not to use as caller runs as fallback when a task is rejected being added to the thread pool (when its full). This is only used as fallback if no rejectedPolicy has been configured, or the thread pool has no configured rejection handler. Is by default true" } } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java index 50533e929eefe..1c26ee020cdf7 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ThreadPoolProfileDefinition.java @@ -58,7 +58,7 @@ public class ThreadPoolProfileDefinition extends OptionalIdentifiedDefinition private String threadName; @XmlAttribute @Metadata(label = "advanced", javaType = "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", - enums = "Abort,CallerRuns") + enums = "Abort,CallerRuns,Block") private String rejectedPolicy; @XmlAttribute @Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue = "true") diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java index 4b73caaa1abff..a33721b1b4d6e 100644 --- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java +++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelThreadPoolFactoryBean.java @@ -58,7 +58,7 @@ public abstract class AbstractCamelThreadPoolFactoryBean extends AbstractCamelFa @XmlAttribute @Metadata(description = "Sets the handler for tasks which cannot be executed by the thread pool.", defaultValue = "CallerRuns", javaType = "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", - enums = "Abort,CallerRuns") + enums = "Abort,CallerRuns,Block") private String rejectedPolicy = ThreadPoolRejectedPolicy.CallerRuns.name(); @XmlAttribute(required = true) @Metadata(description = "To use a custom thread name / pattern") diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java index 70e6f5fb5c509..33110ef996908 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultThreadPoolFactory.java @@ -34,10 +34,12 @@ import org.apache.camel.spi.ThreadPoolFactory; import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.concurrent.BoundedExecutorService; import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor; import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor; import org.apache.camel.util.concurrent.SizedScheduledExecutorService; import org.apache.camel.util.concurrent.ThreadFactoryTypeAware; +import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy; import org.apache.camel.util.concurrent.ThreadType; /** @@ -64,6 +66,19 @@ public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { @Override public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) { + // Virtual threads: use the policy enum directly from the profile to avoid reverse-mapping + if (profile.getMaxQueueSize() > 0 + && ThreadPoolFactoryType.from(factory, profile) == ThreadPoolFactoryType.VIRTUAL) { + ThreadPoolRejectedPolicy policy = profile.getRejectedPolicy(); + if (policy == null) { + policy = ThreadPoolRejectedPolicy.CallerRuns; + } + return new BoundedExecutorService( + ThreadPoolFactoryType.newThreadPerTaskExecutor(factory), + profile.getMaxQueueSize(), + profile.getKeepAliveTime(), profile.getTimeUnit(), + false, policy); + } // allow core thread timeout is default true if not configured boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true; return newThreadPool(profile.getPoolSize(), @@ -199,7 +214,7 @@ static ThreadPoolFactoryType from(ThreadFactory threadFactory, int maxPoolSize) } @SuppressWarnings("unchecked") - private static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFactory) { + static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFactory) { try { return (ExecutorService) Executors.class .getMethod("newThreadPerTaskExecutor", ThreadFactory.class) diff --git a/core/camel-util/pom.xml b/core/camel-util/pom.xml index 41a1bed2bcc93..7da60eb85dfb6 100644 --- a/core/camel-util/pom.xml +++ b/core/camel-util/pom.xml @@ -51,6 +51,12 @@ junit-jupiter test + + org.awaitility + awaitility + ${awaitility-version} + test + org.assertj assertj-core diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java new file mode 100644 index 0000000000000..6111377dcce50 --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/BoundedExecutorService.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +/** + * An {@link ExecutorService} wrapper that enforces bounded concurrency via a {@link Semaphore}. + *

+ * When virtual threads are enabled, Camel replaces the traditional {@link java.util.concurrent.ThreadPoolExecutor} with + * {@code Executors.newThreadPerTaskExecutor()}, which accepts every task immediately (unbounded). This wrapper limits + * the maximum number of tasks delegated to the underlying executor. Unlike {@code ThreadPoolExecutor} there is no + * distinction between pool threads and queued tasks — the semaphore enforces a flat concurrency cap on delegated tasks. + *

+ * When the semaphore has no available permits, behavior depends on the configured {@link ThreadPoolRejectedPolicy}: + *

+ *

+ * Caller thread blocking: while waiting for a permit, the calling thread is blocked. When callers are virtual + * threads this is inexpensive (the carrier thread is released). When callers are platform threads (e.g., HTTP server + * threads) the blocked thread is unavailable for other work — this is standard backpressure behavior but worth noting + * for capacity planning. + * + */ +public class BoundedExecutorService extends AbstractExecutorService { + + private final ExecutorService delegate; + private final Semaphore semaphore; + private final int maxConcurrent; + private final long timeoutNanos; + private final ThreadPoolRejectedPolicy rejectedPolicy; + private final LongAdder callerRunsCount = new LongAdder(); + private final LongAdder rejectedCount = new LongAdder(); + private final LongAdder delegatedTaskCount = new LongAdder(); + + /** + * @param delegate the underlying executor (typically {@code newThreadPerTaskExecutor}) + * @param maxConcurrent the maximum number of tasks delegated to the underlying executor concurrently + * @param acquireTimeout the maximum time to wait for a permit (ignored when policy is {@code Block}) + * @param timeUnit the time unit for {@code acquireTimeout} + * @param fair {@code true} for FIFO permit ordering (predictable latency), {@code false} for barging + * (higher throughput) + * @param rejectedPolicy the policy to apply when no permit is available + */ + public BoundedExecutorService(ExecutorService delegate, int maxConcurrent, + long acquireTimeout, TimeUnit timeUnit, + boolean fair, ThreadPoolRejectedPolicy rejectedPolicy) { + this.delegate = delegate; + this.maxConcurrent = maxConcurrent; + this.semaphore = new Semaphore(maxConcurrent, fair); + this.timeoutNanos = timeUnit.toNanos(acquireTimeout); + this.rejectedPolicy = rejectedPolicy; + } + + @Override + public void execute(Runnable command) { + if (delegate.isShutdown()) { + throw new RejectedExecutionException("Executor has been shut down"); + } + + boolean acquired = false; + try { + if (rejectedPolicy == ThreadPoolRejectedPolicy.Block) { + semaphore.acquire(); + acquired = true; + } else { + acquired = semaphore.tryAcquire(timeoutNanos, TimeUnit.NANOSECONDS); + } + + if (!acquired) { + if (rejectedPolicy == ThreadPoolRejectedPolicy.CallerRuns) { + callerRunsCount.increment(); + command.run(); + return; + } + rejectedCount.increment(); + throw new RejectedExecutionException("Executor saturated: timed out waiting for a permit"); + } + + boolean submitted = false; + try { + delegate.execute(() -> { + try { + command.run(); + } finally { + delegatedTaskCount.increment(); + semaphore.release(); + } + }); + submitted = true; + } finally { + if (!submitted) { + semaphore.release(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Interrupted while waiting for permit", e); + } + } + + // -- Metrics -- + + /** + * The maximum number of tasks that can be delegated to the underlying executor concurrently. CallerRuns tasks + * execute outside this limit. + */ + public int getMaxConcurrent() { + return maxConcurrent; + } + + /** + * The number of permits currently available. + */ + public int getAvailablePermits() { + return semaphore.availablePermits(); + } + + /** + * The number of tasks currently delegated to the underlying executor. + */ + public int getActiveCount() { + return maxConcurrent - semaphore.availablePermits(); + } + + /** + * The number of threads currently blocked waiting for a permit. + */ + public int getWaitingCount() { + return semaphore.getQueueLength(); + } + + /** + * The number of times the timeout expired and a task fell back to running on the caller's thread. + */ + public long getCallerRunsCount() { + return callerRunsCount.sum(); + } + + /** + * The number of tasks rejected because no permit was available within the timeout. + */ + public long getRejectedCount() { + return rejectedCount.sum(); + } + + /** + * The total number of tasks that completed via the underlying executor (excludes caller-runs). + */ + public long getDelegatedTaskCount() { + return delegatedTaskCount.sum(); + } + + @Override + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + if (runnable instanceof Rejectable) { + return new RejectableFutureTask<>(runnable, value); + } + return super.newTaskFor(runnable, value); + } + + @Override + protected RunnableFuture newTaskFor(Callable callable) { + if (callable instanceof Rejectable) { + return new RejectableFutureTask<>(callable); + } + return super.newTaskFor(callable); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public String toString() { + return "BoundedExecutorService[active=" + getActiveCount() + + ", max=" + maxConcurrent + + ", waiting=" + getWaitingCount() + + ", callerRuns=" + callerRunsCount.sum() + + ", rejected=" + rejectedCount.sum() + + ", delegated=" + delegatedTaskCount.sum() + "]"; + } +} diff --git a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java index 1d0f47a5bb090..113ac74f69def 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/ThreadPoolRejectedPolicy.java @@ -31,7 +31,8 @@ public enum ThreadPoolRejectedPolicy { Abort, - CallerRuns; + CallerRuns, + Block; public RejectedExecutionHandler asRejectedExecutionHandler() { if (this == Abort) { @@ -57,6 +58,25 @@ public String toString() { return "CallerRuns"; } }; + } else if (this == Block) { + return new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (!executor.isShutdown()) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Interrupted while waiting for queue space", e); + } + } + } + + @Override + public String toString() { + return "Block"; + } + }; } throw new IllegalArgumentException("Unknown ThreadPoolRejectedPolicy: " + this); } diff --git a/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java new file mode 100644 index 0000000000000..1b58012f35d9b --- /dev/null +++ b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/BoundedExecutorServiceTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BoundedExecutorServiceTest { + + @Test + public void testCallerRunsOnTimeout() throws Exception { + var delegate = Executors.newCachedThreadPool(); + var sized = new BoundedExecutorService( + delegate, 1, 200, TimeUnit.MILLISECONDS, false, ThreadPoolRejectedPolicy.CallerRuns); + try { + CountDownLatch blockTask = new CountDownLatch(1); + sized.execute(() -> { + try { + blockTask.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + AtomicBoolean ranOnCallerThread = new AtomicBoolean(); + String callerName = Thread.currentThread().getName(); + + sized.execute(() -> ranOnCallerThread + .set(Thread.currentThread().getName().equals(callerName))); + + assertTrue(ranOnCallerThread.get(), + "After timeout, task should run on the caller's thread"); + assertEquals(1, sized.getCallerRunsCount()); + assertEquals(0, sized.getRejectedCount()); + + blockTask.countDown(); + } finally { + sized.shutdown(); + delegate.shutdown(); + } + } + + @Test + public void testAbortOnTimeout() throws Exception { + var delegate = Executors.newCachedThreadPool(); + var sized = new BoundedExecutorService( + delegate, 1, 200, TimeUnit.MILLISECONDS, false, ThreadPoolRejectedPolicy.Abort); + try { + CountDownLatch blockTask = new CountDownLatch(1); + sized.execute(() -> { + try { + blockTask.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + assertThrows(RejectedExecutionException.class, + () -> sized.execute(() -> { + }), + "Should reject after timeout with Abort policy"); + assertEquals(0, sized.getCallerRunsCount()); + assertEquals(1, sized.getRejectedCount()); + + blockTask.countDown(); + } finally { + sized.shutdown(); + delegate.shutdown(); + } + } + + @Test + public void testBlockForeverPolicy() throws Exception { + var delegate = Executors.newCachedThreadPool(); + var sized = new BoundedExecutorService( + delegate, 1, 60, TimeUnit.SECONDS, false, ThreadPoolRejectedPolicy.Block); + try { + CountDownLatch blockTask = new CountDownLatch(1); + CountDownLatch secondStarted = new CountDownLatch(1); + CountDownLatch submitterBlocked = new CountDownLatch(1); + + sized.execute(() -> { + try { + blockTask.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + Thread submitter = new Thread(() -> { + submitterBlocked.countDown(); + sized.execute(secondStarted::countDown); + }); + submitter.start(); + + assertTrue(submitterBlocked.await(5, TimeUnit.SECONDS)); + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertTrue(sized.getWaitingCount() > 0, + "Submitter should be blocked waiting for a permit")); + + blockTask.countDown(); + + assertTrue(secondStarted.await(5, TimeUnit.SECONDS), + "Second task should run after first completes"); + submitter.join(5000); + } finally { + sized.shutdown(); + delegate.shutdown(); + } + } + + @Test + public void testConcurrencyBounded() throws Exception { + var delegate = Executors.newCachedThreadPool(); + int maxConcurrent = 3; + var sized = new BoundedExecutorService( + delegate, maxConcurrent, 60, TimeUnit.SECONDS, false, ThreadPoolRejectedPolicy.Block); + try { + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger peak = new AtomicInteger(); + int totalTasks = 20; + CountDownLatch holdTasks = new CountDownLatch(1); + CountDownLatch allDone = new CountDownLatch(totalTasks); + + ExecutorService senders = Executors.newFixedThreadPool(totalTasks); + for (int i = 0; i < totalTasks; i++) { + senders.submit(() -> sized.execute(() -> { + int current = inFlight.incrementAndGet(); + peak.accumulateAndGet(current, Math::max); + try { + holdTasks.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + inFlight.decrementAndGet(); + allDone.countDown(); + })); + } + + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(maxConcurrent, sized.getActiveCount())); + + holdTasks.countDown(); + assertTrue(allDone.await(30, TimeUnit.SECONDS), "All tasks should complete"); + assertTrue(peak.get() <= maxConcurrent, + "Peak concurrency (" + peak.get() + ") should be <= " + maxConcurrent); + senders.shutdown(); + } finally { + sized.shutdown(); + delegate.shutdown(); + } + } + + @Test + public void testPermitsReleasedAfterCompletion() throws Exception { + var delegate = Executors.newCachedThreadPool(); + var sized = new BoundedExecutorService( + delegate, 2, 60, TimeUnit.SECONDS, false, ThreadPoolRejectedPolicy.Block); + try { + CountDownLatch firstBatch = new CountDownLatch(2); + for (int i = 0; i < 2; i++) { + sized.execute(firstBatch::countDown); + } + assertTrue(firstBatch.await(5, TimeUnit.SECONDS), "First batch should complete"); + + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(2, sized.getAvailablePermits())); + + CountDownLatch secondBatch = new CountDownLatch(2); + for (int i = 0; i < 2; i++) { + sized.execute(secondBatch::countDown); + } + assertTrue(secondBatch.await(5, TimeUnit.SECONDS), + "Second batch should succeed after permits are released"); + } finally { + sized.shutdown(); + delegate.shutdown(); + } + } + + @Test + public void testSubmitAfterShutdown() { + var delegate = Executors.newCachedThreadPool(); + var sized = new BoundedExecutorService( + delegate, 5, 60, TimeUnit.SECONDS, false, ThreadPoolRejectedPolicy.CallerRuns); + + sized.shutdown(); + assertTrue(sized.isShutdown()); + assertTrue(delegate.isShutdown()); + + assertThrows(RejectedExecutionException.class, + () -> sized.execute(() -> { + }), + "Should reject after shutdown"); + } + + @Test + public void testSubmitReturnsFuture() throws Exception { + var delegate = Executors.newCachedThreadPool(); + var sized = new BoundedExecutorService( + delegate, 5, 60, TimeUnit.SECONDS, false, ThreadPoolRejectedPolicy.CallerRuns); + try { + AtomicBoolean executed = new AtomicBoolean(); + Future future = sized.submit(() -> executed.set(true)); + + future.get(5, TimeUnit.SECONDS); + assertTrue(executed.get(), "Task submitted via submit() should execute"); + assertTrue(future.isDone()); + } finally { + sized.shutdown(); + delegate.shutdown(); + } + } +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc index 132102d0006c8..186b9892c5d5c 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc @@ -28,6 +28,21 @@ your own code or tooling, add `org.jspecify:jspecify` explicitly to your project The `org.apache.camel.support.DefaultHeaderFilterStrategy` changed default setting for lowercase from `false` to `true`. +==== Virtual threads: `maxQueueSize` now honored in `threads()` EIP + +When virtual threads are enabled (`camel.threads.virtual.enabled=true`), the `threads()` EIP now honors `maxQueueSize` +for backpressure. Previously, `maxQueueSize` was silently ignored and tasks were accepted unboundedly. + +The virtual thread executor is wrapped with a semaphore-based concurrency limit (`BoundedExecutorService`) that enforces +a flat cap of `maxQueueSize` on delegated tasks. The `keepAliveTime` parameter is repurposed as the +semaphore acquisition timeout (pool sizing parameters are not applicable to virtual threads). + +==== New `Block` rejected policy + +A new `Block` value has been added to `ThreadPoolRejectedPolicy`. With `Block`, the caller blocks indefinitely until +capacity becomes available. This applies to both platform and virtual threads. The existing `CallerRuns` (default) and +`Abort` policies are unchanged. + The type converters for Java serialized objects with types `java.io.ObjectInput` and `java.io.ObjectOutput` has been removed. Java object serialization is a recurring source of security issues and therefore these converters has been removed. These converters are not used at all by Camel itself. To restore compatibility then end users can add these type converters back as custom converters in their own Camel applications. @@ -67,6 +82,26 @@ auto-disables `contentCache` on resource-based components (such as `xslt`) whose the route. Set `camel.component..contentCache=true` (or pass `?contentCache=true` on the URI) to opt back in to caching during dev mode. +==== Unified `--packaging` option for `camel export` + +A new `--packaging` option has been added to `camel export` that works across all three runtimes +(Camel Main, Spring Boot, and Quarkus). It replaces the Quarkus-specific `--quarkus-package-type` +option, which is now deprecated. + +Accepted values: + +- `layered` or `fast-jar` — container-optimized packaging with separate dependency layers (default) +- `fat-jar` or `uber-jar` — single executable JAR + +The default is `layered`, which produces Dockerfiles optimized for container image layer caching. +Each runtime implements layered packaging using its native mechanism: + +- **Camel Main**: thin JAR with dependencies in a `lib/` folder +- **Spring Boot**: multi-stage Dockerfile using Spring Boot's built-in layer extraction +- **Quarkus**: fast-jar packaging (unchanged from the previous `--quarkus-package-type=fast-jar` default) + +The deprecated `--quarkus-package-type` option continues to work for backward compatibility. + ==== Improved default `--quarkus-version` The default behavior of commands requiring Quarkus Platform version has improved. diff --git a/docs/user-manual/modules/ROOT/pages/threading-model.adoc b/docs/user-manual/modules/ROOT/pages/threading-model.adoc index fb2afc1ed5b50..b419a3a1410a3 100644 --- a/docs/user-manual/modules/ROOT/pages/threading-model.adoc +++ b/docs/user-manual/modules/ROOT/pages/threading-model.adoc @@ -28,8 +28,8 @@ The default profile is pre-configured out of the box with the following settings | *maxPoolSize* | `20` | Sets the default maximum pool size | *maxQueueSize* | `1000` | Sets the default maximum number of tasks in the work queue. Use -1 for an unbounded queue. | *allowCoreThreadTimeOut* | `true` | Sets default whether to allow core threads to timeout -| *rejectedPolicy* | `CallerRuns` | Sets the default handler for tasks which cannot be executed by the thread pool. Has four options: -`Abort, CallerRuns, Discard, DiscardOldest` which corresponds to the same four options provided out of the box in the JDK. +| *rejectedPolicy* | `CallerRuns` | Sets the default handler for tasks which cannot be executed by the thread pool. Has three options: +`Abort`, `CallerRuns`, `Block`. See <> for details. |=== What that means is that for example when you use @@ -310,15 +310,63 @@ To hook in custom thread pool providers a `ThreadPoolFactory` interface can be implemented. The implementation can be set in the `ExecutorServiceManager`. +[[rejected-policy]] +== Rejected Policy + +The `rejectedPolicy` option controls what happens when a thread pool cannot accept a new task +(i.e., the pool and its work queue are full). The available policies are: + +[width="100%",cols="20%,80%",options="header"] +|=== +| Policy | Description +| *CallerRuns* | The task runs on the caller's thread. This provides natural backpressure — the caller is blocked +doing useful work and cannot submit more tasks until it finishes. Tasks are never lost. This is the default. +| *Abort* | The task is rejected with a `RejectedExecutionException`. Use this for HTTP APIs or latency-sensitive +systems where failing fast is preferred over blocking. +| *Block* | The caller blocks indefinitely until capacity becomes available. No timeout, no rejection. Use this for +message broker consumers and batch workloads where losing a task is unacceptable and latency is less critical. +|=== + +With platform threads, these policies apply when the `ThreadPoolExecutor` work queue is full. With virtual threads, +the same policies apply when the concurrency semaphore has no available permits (see <>). + +[[virtual-threads]] == Virtual Threads Starting from Java 21, the default `ThreadPoolFactory` can build `ExecutorService` and `ScheduledExecutorService` that -use https://openjdk.org/jeps/425[virtual threads] instead of platform threads. +use https://openjdk.org/jeps/444[virtual threads] instead of platform threads. -But as it is an experimental feature, it is not enabled by default, you need to set the System property `camel.threads.virtual.enabled` -to `true` and run Camel using Java 21 or above to enable it. +To enable virtual threads, set the System property `camel.threads.virtual.enabled` to `true` and run Camel using +Java 21 or above. Be aware that even if it is enabled, there are some use cases where platform threads are still used, for example, if the thread factory is configured to create non-daemon threads since virtual threads can only be daemons, or when the `ExecutorService` or `ScheduledExecutorService` to build cannot have more than one thread or finally when `corePoolSize` is set to zero and `maxQueueSize` is set to a value less or equal to `0`. + +=== Bounded Concurrency with Virtual Threads + +When `maxQueueSize` is set to a positive value, Camel wraps the virtual thread executor with a semaphore-based +concurrency limit. This ensures that `maxQueueSize` is honored for backpressure, even though virtual threads do not +use a traditional work queue. + +Unlike `ThreadPoolExecutor` where pool threads and queued tasks are distinct concepts, the virtual thread executor +enforces a flat concurrency cap: the maximum number of concurrently executing tasks equals `maxQueueSize`. +Pool sizing parameters (`poolSize`, `maxPoolSize`) are ignored since virtual threads are not pooled. +All permitted tasks execute immediately on virtual threads — there is no queue of waiting tasks. + +The `rejectedPolicy` controls what happens when the concurrency limit is reached: + +* *CallerRuns* (default): the caller blocks up to `keepAliveTime` waiting for a permit. If the timeout expires, +the task runs on the caller's thread. +* *Abort*: the caller blocks up to `keepAliveTime` waiting for a permit. If the timeout expires, +a `RejectedExecutionException` is thrown. +* *Block*: the caller blocks indefinitely until a permit becomes available. + +While waiting for a permit, the calling thread is blocked. When callers are virtual threads this is inexpensive +(the carrier thread is released). When callers are platform threads (e.g., HTTP server threads) the blocked thread +is unavailable for other work. + +Pool sizing parameters (`poolSize`, `maxPoolSize`, `keepAliveTime`) do not control thread reuse with virtual threads +since virtual threads are cheap to create and are never pooled. However, `keepAliveTime` is reused as the timeout +for permit acquisition with `CallerRuns` and `Abort` policies. diff --git a/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc b/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc index 06d8501a018ed..0991ff14b7b4f 100644 --- a/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc +++ b/docs/user-manual/modules/ROOT/pages/virtual-threads.adoc @@ -91,15 +91,24 @@ When virtual threads are enabled, Camel's `DefaultThreadPoolFactory` (JDK 21+ va | `Executors.newCachedThreadPool()` | `Executors.newThreadPerTaskExecutor()` -| `newThreadPool()` (poolSize > 1) -| `ThreadPoolExecutor` -| `Executors.newThreadPerTaskExecutor()` +| `newThreadPool()` (maxQueueSize > 0) +| `ThreadPoolExecutor` with bounded queue +| `Executors.newThreadPerTaskExecutor()` wrapped with semaphore-based concurrency limit + +| `newThreadPool()` (maxQueueSize ≤ 0) +| `ThreadPoolExecutor` with `SynchronousQueue` +| `Executors.newThreadPerTaskExecutor()` (unbounded) | `newScheduledThreadPool()` | `ScheduledThreadPoolExecutor` | `Executors.newScheduledThreadPool(0, factory)` |=== +When `maxQueueSize` is set to a positive value, the virtual thread executor is wrapped with a semaphore that enforces +a flat concurrency cap of `maxQueueSize`. Unlike `ThreadPoolExecutor` where pool threads and queued tasks +are distinct, all permitted tasks execute immediately on virtual threads. The `rejectedPolicy` controls what happens +when the concurrency limit is reached — see xref:threading-model.adoc#rejected-policy[Rejected Policy] for details. + [NOTE] ==== Single-threaded executors and scheduled tasks still use platform threads, as virtual threads are optimized for concurrent I/O-bound work, not scheduled or sequential tasks. diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java index 4d3797581b51c..45bf777d70f5b 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java @@ -17646,7 +17646,7 @@ protected boolean setProperty(TemplatedRouteParameterDefinition target, String p @YamlProperty(name = "maxQueueSize", type = "number", description = "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue", displayName = "Max Queue Size"), @YamlProperty(name = "note", type = "string", description = "Sets the note of this node", displayName = "Note"), @YamlProperty(name = "poolSize", type = "number", description = "Sets the core pool size", displayName = "Pool Size"), - @YamlProperty(name = "rejectedPolicy", type = "enum:Abort,CallerRuns", description = "Sets the handler for tasks which cannot be executed by the thread pool.", displayName = "Rejected Policy"), + @YamlProperty(name = "rejectedPolicy", type = "enum:Abort,CallerRuns,Block", description = "Sets the handler for tasks which cannot be executed by the thread pool.", displayName = "Rejected Policy"), @YamlProperty(name = "timeUnit", type = "enum:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS", description = "Sets the time unit to use for keep alive time By default SECONDS is used.", displayName = "Time Unit") } ) @@ -17747,7 +17747,7 @@ protected boolean setProperty(ThreadPoolProfileDefinition target, String propert @YamlProperty(name = "maxQueueSize", type = "number", description = "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue", displayName = "Max Queue Size"), @YamlProperty(name = "note", type = "string", description = "Sets the note of this node", displayName = "Note"), @YamlProperty(name = "poolSize", type = "number", description = "Sets the core pool size", displayName = "Pool Size"), - @YamlProperty(name = "rejectedPolicy", type = "enum:Abort,CallerRuns", description = "Sets the handler for tasks which cannot be executed by the thread pool.", displayName = "Rejected Policy"), + @YamlProperty(name = "rejectedPolicy", type = "enum:Abort,CallerRuns,Block", description = "Sets the handler for tasks which cannot be executed by the thread pool.", displayName = "Rejected Policy"), @YamlProperty(name = "threadName", type = "string", defaultValue = "Threads", description = "Sets the thread name to use.", displayName = "Thread Name"), @YamlProperty(name = "timeUnit", type = "enum:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS", description = "Sets the keep alive time unit. By default SECONDS is used.", displayName = "Time Unit") } diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json index eaaeefe3187f3..8438e7a74b375 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl-canonical.json @@ -4842,7 +4842,7 @@ "type" : "string", "title" : "Rejected Policy", "description" : "Sets the handler for tasks which cannot be executed by the thread pool.", - "enum" : [ "Abort", "CallerRuns" ] + "enum" : [ "Abort", "CallerRuns", "Block" ] }, "timeUnit" : { "type" : "string", @@ -4920,7 +4920,7 @@ "type" : "string", "title" : "Rejected Policy", "description" : "Sets the handler for tasks which cannot be executed by the thread pool.", - "enum" : [ "Abort", "CallerRuns" ] + "enum" : [ "Abort", "CallerRuns", "Block" ] }, "threadName" : { "type" : "string", diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json index 26fe66fb3b611..9baa494c457ed 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json @@ -7528,7 +7528,7 @@ "type" : "string", "title" : "Rejected Policy", "description" : "Sets the handler for tasks which cannot be executed by the thread pool.", - "enum" : [ "Abort", "CallerRuns" ] + "enum" : [ "Abort", "CallerRuns", "Block" ] }, "timeUnit" : { "type" : "string", @@ -7606,7 +7606,7 @@ "type" : "string", "title" : "Rejected Policy", "description" : "Sets the handler for tasks which cannot be executed by the thread pool.", - "enum" : [ "Abort", "CallerRuns" ] + "enum" : [ "Abort", "CallerRuns", "Block" ] }, "threadName" : { "type" : "string",