From be61f633e52ac295cd3d676b387d8dd428cebdfe Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Wed, 11 Feb 2026 21:30:22 +0100 Subject: [PATCH 1/2] Fix race condition in async exception handling by ensuring exception notifications are not dropped when executor is terminated --- .../apache/activemq/ActiveMQConnection.java | 55 ++++++++++++++----- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index b9f0eebf9ac..cf1f0bdd075 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -2029,25 +2029,45 @@ public void onClientInternalException(final Throwable error) { * @param error */ public void onAsyncException(Throwable error) { - if (!closed.get() && !closing.get()) { - if (this.exceptionListener != null) { - - if (!(error instanceof JMSException)) { - error = JMSExceptionSupport.create(error); + if (this.exceptionListener != null) { + if (!(error instanceof JMSException)) { + error = JMSExceptionSupport.create(error); + } + final JMSException e = (JMSException) error; + // Submit directly to executor bypassing closed/closing guards + // to ensure exception notifications are never silently dropped + try { + executor.execute(() -> exceptionListener.onException(e)); + } catch (final RejectedExecutionException re) { + LOG.debug("Could not notify exception listener asynchronously (executor terminated), notifying inline: {}", error.getMessage()); + try { + exceptionListener.onException(e); + } catch (final Exception ex) { + LOG.debug("Exception during inline ExceptionListener notification", ex); } - final JMSException e = (JMSException) error; - executeAsync(() -> exceptionListener.onException(e)); - - } else { - LOG.debug("Async exception with no exception listener: {}", error, error); } + } else { + LOG.debug("Async exception with no exception listener: {}", error, error); } } @Override public void onException(final IOException error) { - onAsyncException(error); - executeAsync(() -> { + // Combine JMS ExceptionListener and TransportListener notifications + // into a single async task to prevent a race condition where the + // ExceptionListener (e.g. ConnectionPool) closes the connection and + // shuts down the executor before the TransportListener task is queued. + final Runnable exceptionTask = () -> { + // Notify JMS ExceptionListener first (same as onAsyncException) + if (exceptionListener != null) { + try { + final JMSException jmsError = JMSExceptionSupport.create(error); + exceptionListener.onException(jmsError); + } catch (final Exception e) { + LOG.debug("Exception during JMS ExceptionListener notification", e); + } + } + transportFailed(error); ServiceSupport.dispose(ActiveMQConnection.this.transport); brokerInfoReceived.countDown(); @@ -2059,7 +2079,16 @@ public void onException(final IOException error) { for (final TransportListener listener : transportListeners) { listener.onException(error); } - }); + }; + + // Submit directly to executor bypassing closed/closing guards + // to ensure transport failure handling is never silently dropped + try { + executor.execute(exceptionTask); + } catch (final RejectedExecutionException e) { + LOG.debug("Could not execute exception task asynchronously (executor terminated), executing inline"); + exceptionTask.run(); + } } @Override From a5b9308b38b85b64840a6ea27747ca8c08364c0a Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Fri, 27 Feb 2026 12:26:24 +0100 Subject: [PATCH 2/2] Fix race condition in async exception handling by adding checks for connection state before notifying listeners to avoid deadlock --- .../apache/activemq/ActiveMQConnection.java | 85 +++++++++---------- 1 file changed, 39 insertions(+), 46 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index cf1f0bdd075..c1fe221256e 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -2029,65 +2029,58 @@ public void onClientInternalException(final Throwable error) { * @param error */ public void onAsyncException(Throwable error) { - if (this.exceptionListener != null) { - if (!(error instanceof JMSException)) { - error = JMSExceptionSupport.create(error); - } - final JMSException e = (JMSException) error; - // Submit directly to executor bypassing closed/closing guards - // to ensure exception notifications are never silently dropped - try { - executor.execute(() -> exceptionListener.onException(e)); - } catch (final RejectedExecutionException re) { - LOG.debug("Could not notify exception listener asynchronously (executor terminated), notifying inline: {}", error.getMessage()); + if (!closed.get() && !closing.get()) { + if (this.exceptionListener != null) { + if (!(error instanceof JMSException)) { + error = JMSExceptionSupport.create(error); + } + final JMSException e = (JMSException) error; try { - exceptionListener.onException(e); - } catch (final Exception ex) { - LOG.debug("Exception during inline ExceptionListener notification", ex); + executor.execute(() -> exceptionListener.onException(e)); + } catch (final RejectedExecutionException re) { + LOG.debug("Could not notify exception listener asynchronously (executor terminated): {}", error.getMessage()); } + } else { + LOG.debug("Async exception with no exception listener: {}", error, error); } - } else { - LOG.debug("Async exception with no exception listener: {}", error, error); } } @Override public void onException(final IOException error) { - // Combine JMS ExceptionListener and TransportListener notifications - // into a single async task to prevent a race condition where the - // ExceptionListener (e.g. ConnectionPool) closes the connection and - // shuts down the executor before the TransportListener task is queued. - final Runnable exceptionTask = () -> { - // Notify JMS ExceptionListener first (same as onAsyncException) - if (exceptionListener != null) { + if (!closed.get() && !closing.get()) { + // Combine JMS ExceptionListener and TransportListener notifications into a single + // async task to prevent a race condition where the ExceptionListener (e.g. + // ConnectionPool) closes the connection and shuts down the executor before the + // TransportListener task can be queued. + final Runnable exceptionTask = () -> { + if (exceptionListener != null) { + try { + final JMSException jmsError = JMSExceptionSupport.create(error); + exceptionListener.onException(jmsError); + } catch (final Exception e) { + LOG.debug("Exception during JMS ExceptionListener notification", e); + } + } + + transportFailed(error); + ServiceSupport.dispose(ActiveMQConnection.this.transport); + brokerInfoReceived.countDown(); try { - final JMSException jmsError = JMSExceptionSupport.create(error); - exceptionListener.onException(jmsError); - } catch (final Exception e) { - LOG.debug("Exception during JMS ExceptionListener notification", e); + doCleanup(true); + } catch (JMSException e) { + LOG.warn("Exception during connection cleanup, " + e, e); } - } + for (final TransportListener listener : transportListeners) { + listener.onException(error); + } + }; - transportFailed(error); - ServiceSupport.dispose(ActiveMQConnection.this.transport); - brokerInfoReceived.countDown(); try { - doCleanup(true); - } catch (JMSException e) { - LOG.warn("Exception during connection cleanup, " + e, e); - } - for (final TransportListener listener : transportListeners) { - listener.onException(error); + executor.execute(exceptionTask); + } catch (final RejectedExecutionException e) { + LOG.debug("Could not execute exception task (executor terminated, connection closing)"); } - }; - - // Submit directly to executor bypassing closed/closing guards - // to ensure transport failure handling is never silently dropped - try { - executor.execute(exceptionTask); - } catch (final RejectedExecutionException e) { - LOG.debug("Could not execute exception task asynchronously (executor terminated), executing inline"); - exceptionTask.run(); } }