diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index 3c512bbbb2931..7d7ad406059fd 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -858,7 +858,7 @@ private void onSubmitCompletion(final String key, final Exchange exchange) { executorService.execute(() -> { ExchangeHelper.prepareOutToIn(exchange); - Runnable task = () -> processor.process(exchange, done -> { + AsyncCallback completionCallback = done -> { // log exception if there was a problem if (exchange.getException() != null) { // if there was an exception then let the exception handler handle it @@ -867,14 +867,19 @@ private void onSubmitCompletion(final String key, final Exchange exchange) { } else { LOG.trace("Processing aggregated exchange: {} complete.", exchange); } - }); - // execute the task using this thread sync (similar to multicast eip in parallel mode) - if (exchange.isTransacted()) { - reactiveExecutor.scheduleQueue(task); - } else if (executorService instanceof SynchronousExecutorService) { - reactiveExecutor.schedule(task); + }; + + if (executorService instanceof SynchronousExecutorService) { + // CAMEL-23281: process inline to avoid deadlock with transacted exchanges + processor.process(exchange, completionCallback); } else { - reactiveExecutor.scheduleSync(task); + Runnable task = () -> processor.process(exchange, completionCallback); + // execute the task using this thread sync (similar to multicast eip in parallel mode) + if (exchange.isTransacted()) { + reactiveExecutor.scheduleQueue(task); + } else { + reactiveExecutor.scheduleSync(task); + } } }); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java new file mode 100644 index 0000000000000..6c3f512d5a099 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.AggregationStrategies; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.concurrent.SynchronousExecutorService; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.camel.Exchange.SPLIT_COMPLETE; +import static org.apache.camel.Exchange.SPLIT_INDEX; + +/** + * Reproducer for CAMEL-23281: split/aggregator deadlock when the aggregate uses SynchronousExecutorService with + * completionSize + completionPredicate(SPLIT_COMPLETE) and the exchange is transacted. + * + * Root cause: when a transacted exchange triggers aggregate completion, AggregateProcessor.onSubmitCompletion() queues + * the completion task via reactiveExecutor.scheduleQueue(). This task is later drained by + * DefaultAsyncProcessorAwaitManager.await() via executeFromQueue(). The drained task re-enters + * CamelInternalProcessor.processTransacted() which calls processor.process(exchange) (sync version), triggering another + * DefaultAsyncProcessorAwaitManager.process() → await() → executeFromQueue() cycle. When the reactive queue is + * exhausted, the innermost await() blocks on CountDownLatch.await() forever — deadlock. + */ +public class SplitAggregateInChoiceSynchronousExecutorTest extends ContextTestSupport { + + @Test + public void testSplitAggregateInChoiceNonTransacted() throws Exception { + StringBuilder sb = new StringBuilder(); + sb.append("HEADER\n"); + for (int i = 1; i <= 11; i++) { + sb.append("Line ").append(i).append("\n"); + } + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(3); + + template.sendBody("direct:start", sb.toString()); + + MockEndpoint.assertIsSatisfied(10, SECONDS, result); + } + + @Test + @Timeout(30) + public void testSplitAggregateTransactedDeadlock() throws Exception { + // Transacted split + aggregate: deadlocks due to recursive processTransacted → await → executeFromQueue cycle + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 11; i++) { + sb.append("Line ").append(i).append("\n"); + } + + MockEndpoint result = getMockEndpoint("mock:aggregated"); + result.expectedMessageCount(3); + + template.request("direct:transacted-simple", e -> { + e.getIn().setBody(sb.toString()); + }); + + MockEndpoint.assertIsSatisfied(10, SECONDS, result); + } + + @Test + @Timeout(30) + public void testSplitAggregateTransactedInChoiceDeadlock() throws Exception { + // Same as the JIRA reproducer: transacted split with aggregate inside choice/when + StringBuilder sb = new StringBuilder(); + sb.append("HEADER\n"); + for (int i = 1; i <= 11; i++) { + sb.append("Line ").append(i).append("\n"); + } + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(3); + + template.request("direct:transacted", e -> { + e.getIn().setBody(sb.toString()); + }); + + MockEndpoint.assertIsSatisfied(10, SECONDS, result); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // Non-transacted: split CSV, skip header, aggregate inside choice/when — works fine + from("direct:start") + .split(body().tokenize("\n")).streaming().stopOnException() + .choice() + .when(exchangeProperty(SPLIT_INDEX).isGreaterThan(0)) + .aggregate(constant(true), AggregationStrategies.groupedBody()) + .eagerCheckCompletion() + .executorService(new SynchronousExecutorService()) + .completionSize(5) + .completionPredicate(exchangeProperty(SPLIT_COMPLETE)) + .to("mock:result"); + + // Transacted split + aggregate (no choice/when) — deadlocks + from("direct:transacted-simple") + .process(e -> e.getExchangeExtension().setTransacted(true)) + .split(body().tokenize("\n")).streaming().stopOnException() + .aggregate(constant(true), AggregationStrategies.groupedBody()) + .eagerCheckCompletion() + .executorService(new SynchronousExecutorService()) + .completionSize(5) + .completionPredicate(exchangeProperty(SPLIT_COMPLETE)) + .to("mock:aggregated"); + + // Transacted split + aggregate inside choice/when (CAMEL-23281 pattern) — deadlocks + from("direct:transacted") + .process(e -> e.getExchangeExtension().setTransacted(true)) + .split(body().tokenize("\n")).streaming().stopOnException() + .choice() + .when(exchangeProperty(SPLIT_INDEX).isGreaterThan(0)) + .aggregate(constant(true), AggregationStrategies.groupedBody()) + .eagerCheckCompletion() + .executorService(new SynchronousExecutorService()) + .completionSize(5) + .completionPredicate(exchangeProperty(SPLIT_COMPLETE)) + .to("mock:result"); + } + }; + } +}