Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
"timeUnit": { "index": 7, "kind": "attribute", "displayName": "Time Unit", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the time unit to use for keep alive time By default SECONDS is used." },
"maxQueueSize": { "index": 8, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 9, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether idle core threads is allowed to timeout and therefore can shrink the pool size below the core pool size Is by default true" },
"rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." }
"rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"maxQueueSize": { "index": 9, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 10, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether idle core threads are allowed to timeout and therefore can shrink the pool size below the core pool size Is by default false" },
"threadName": { "index": 11, "kind": "attribute", "displayName": "Thread Name", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Threads", "description": "Sets the thread name to use." },
"rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." },
"rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." },
"callerRunsWhenRejected": { "index": 13, "kind": "attribute", "displayName": "Caller Runs When Rejected", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether or not to use as caller runs as fallback when a task is rejected being added to the thread pool (when its full). This is only used as fallback if no rejectedPolicy has been configured, or the thread pool has no configured rejection handler. Is by default true" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"timeUnit": { "index": 4, "kind": "attribute", "displayName": "Time Unit", "group": "common", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "SECONDS", "description": "Sets the time unit used for keep alive time" },
"maxQueueSize": { "index": 5, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 6, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether to allow core threads to timeout" },
"rejectedPolicy": { "index": 7, "kind": "attribute", "displayName": "Rejected Policy", "group": "common", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "CallerRuns", "description": "Sets the handler for tasks which cannot be executed by the thread pool." },
"rejectedPolicy": { "index": 7, "kind": "attribute", "displayName": "Rejected Policy", "group": "common", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "CallerRuns", "description": "Sets the handler for tasks which cannot be executed by the thread pool." },
"threadName": { "index": 8, "kind": "attribute", "displayName": "Thread Name", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom thread name \/ pattern" },
"scheduled": { "index": 9, "kind": "attribute", "displayName": "Scheduled", "group": "common", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to use a scheduled thread pool" },
"camelContextId": { "index": 10, "kind": "attribute", "displayName": "Camel Context Id", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Id of CamelContext to use if there are multiple CamelContexts in the same JVM" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
"timeUnit": { "index": 7, "kind": "attribute", "displayName": "Time Unit", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the time unit to use for keep alive time By default SECONDS is used." },
"maxQueueSize": { "index": 8, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 9, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether idle core threads is allowed to timeout and therefore can shrink the pool size below the core pool size Is by default true" },
"rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." }
"rejectedPolicy": { "index": 10, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"maxQueueSize": { "index": 9, "kind": "attribute", "displayName": "Max Queue Size", "group": "common", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the maximum number of tasks in the work queue. Use -1 or Integer.MAX_VALUE for an unbounded queue" },
"allowCoreThreadTimeOut": { "index": 10, "kind": "attribute", "displayName": "Allow Core Thread Time Out", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether idle core threads are allowed to timeout and therefore can shrink the pool size below the core pool size Is by default false" },
"threadName": { "index": 11, "kind": "attribute", "displayName": "Thread Name", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Threads", "description": "Sets the thread name to use." },
"rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." },
"rejectedPolicy": { "index": 12, "kind": "attribute", "displayName": "Rejected Policy", "group": "advanced", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy", "enum": [ "Abort", "CallerRuns", "Block" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the handler for tasks which cannot be executed by the thread pool." },
"callerRunsWhenRejected": { "index": 13, "kind": "attribute", "displayName": "Caller Runs When Rejected", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether or not to use as caller runs as fallback when a task is rejected being added to the thread pool (when its full). This is only used as fallback if no rejectedPolicy has been configured, or the thread pool has no configured rejection handler. Is by default true" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class ThreadPoolProfileDefinition extends OptionalIdentifiedDefinition<Th
private String allowCoreThreadTimeOut;
@XmlAttribute
@Metadata(label = "advanced", javaType = "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
enums = "Abort,CallerRuns")
enums = "Abort,CallerRuns,Block")
private String rejectedPolicy;

public ThreadPoolProfileDefinition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ThreadsDefinition extends NoOutputDefinition<ThreadsDefinition>
private String threadName;
@XmlAttribute
@Metadata(label = "advanced", javaType = "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
enums = "Abort,CallerRuns")
enums = "Abort,CallerRuns,Block")
private String rejectedPolicy;
@XmlAttribute
@Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue = "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public abstract class AbstractCamelThreadPoolFactoryBean extends AbstractCamelFa
@XmlAttribute
@Metadata(description = "Sets the handler for tasks which cannot be executed by the thread pool.",
defaultValue = "CallerRuns", javaType = "org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy",
enums = "Abort,CallerRuns")
enums = "Abort,CallerRuns,Block")
private String rejectedPolicy = ThreadPoolRejectedPolicy.CallerRuns.name();
@XmlAttribute(required = true)
@Metadata(description = "To use a custom thread name / pattern")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import org.apache.camel.spi.ThreadPoolFactory;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.concurrent.BoundedExecutorService;
import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
import org.apache.camel.util.concurrent.ThreadFactoryTypeAware;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.apache.camel.util.concurrent.ThreadType;

/**
Expand All @@ -64,6 +66,19 @@ public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {

@Override
public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
// Virtual threads: use the policy enum directly from the profile to avoid reverse-mapping
if (profile.getMaxQueueSize() > 0
&& ThreadPoolFactoryType.from(factory, profile) == ThreadPoolFactoryType.VIRTUAL) {
ThreadPoolRejectedPolicy policy = profile.getRejectedPolicy();
if (policy == null) {
policy = ThreadPoolRejectedPolicy.CallerRuns;
}
return new BoundedExecutorService(
ThreadPoolFactoryType.newThreadPerTaskExecutor(factory),
profile.getMaxQueueSize(),
profile.getKeepAliveTime(), profile.getTimeUnit(),
false, policy);
}
// allow core thread timeout is default true if not configured
boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : true;
return newThreadPool(profile.getPoolSize(),
Expand Down Expand Up @@ -199,7 +214,7 @@ static ThreadPoolFactoryType from(ThreadFactory threadFactory, int maxPoolSize)
}

@SuppressWarnings("unchecked")
private static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFactory) {
static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFactory) {
try {
return (ExecutorService) Executors.class
.getMethod("newThreadPerTaskExecutor", ThreadFactory.class)
Expand Down
6 changes: 6 additions & 0 deletions core/camel-util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Loading