diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index b9f0eebf9ac..c1fe221256e 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -2031,13 +2031,15 @@ public void onClientInternalException(final Throwable error) { public void onAsyncException(Throwable error) { if (!closed.get() && !closing.get()) { if (this.exceptionListener != null) { - if (!(error instanceof JMSException)) { error = JMSExceptionSupport.create(error); } final JMSException e = (JMSException) error; - executeAsync(() -> exceptionListener.onException(e)); - + try { + executor.execute(() -> exceptionListener.onException(e)); + } catch (final RejectedExecutionException re) { + LOG.debug("Could not notify exception listener asynchronously (executor terminated): {}", error.getMessage()); + } } else { LOG.debug("Async exception with no exception listener: {}", error, error); } @@ -2046,20 +2048,40 @@ public void onAsyncException(Throwable error) { @Override public void onException(final IOException error) { - onAsyncException(error); - executeAsync(() -> { - transportFailed(error); - ServiceSupport.dispose(ActiveMQConnection.this.transport); - brokerInfoReceived.countDown(); + if (!closed.get() && !closing.get()) { + // Combine JMS ExceptionListener and TransportListener notifications into a single + // async task to prevent a race condition where the ExceptionListener (e.g. + // ConnectionPool) closes the connection and shuts down the executor before the + // TransportListener task can be queued. + final Runnable exceptionTask = () -> { + if (exceptionListener != null) { + try { + final JMSException jmsError = JMSExceptionSupport.create(error); + exceptionListener.onException(jmsError); + } catch (final Exception e) { + LOG.debug("Exception during JMS ExceptionListener notification", e); + } + } + + transportFailed(error); + ServiceSupport.dispose(ActiveMQConnection.this.transport); + brokerInfoReceived.countDown(); + try { + doCleanup(true); + } catch (JMSException e) { + LOG.warn("Exception during connection cleanup, " + e, e); + } + for (final TransportListener listener : transportListeners) { + listener.onException(error); + } + }; + try { - doCleanup(true); - } catch (JMSException e) { - LOG.warn("Exception during connection cleanup, " + e, e); + executor.execute(exceptionTask); + } catch (final RejectedExecutionException e) { + LOG.debug("Could not execute exception task (executor terminated, connection closing)"); } - for (final TransportListener listener : transportListeners) { - listener.onException(error); - } - }); + } } @Override