Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2031,13 +2031,15 @@ public void onClientInternalException(final Throwable error) {
public void onAsyncException(Throwable error) {
if (!closed.get() && !closing.get()) {
if (this.exceptionListener != null) {

if (!(error instanceof JMSException)) {
error = JMSExceptionSupport.create(error);
}
final JMSException e = (JMSException) error;
executeAsync(() -> exceptionListener.onException(e));

try {
executor.execute(() -> exceptionListener.onException(e));
} catch (final RejectedExecutionException re) {
LOG.debug("Could not notify exception listener asynchronously (executor terminated): {}", error.getMessage());
}
} else {
LOG.debug("Async exception with no exception listener: {}", error, error);
}
Expand All @@ -2046,20 +2048,40 @@ public void onAsyncException(Throwable error) {

@Override
public void onException(final IOException error) {
onAsyncException(error);
executeAsync(() -> {
transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
if (!closed.get() && !closing.get()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onException() doesn't call onAsyncException() anymore.

With the exception listener "inline" here, it bypass the onAsyncException() completely. I'm not sure we have anyone overridden onAsyncException(), but it has an impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the purpose of the change.
We can have the 2 listeners executed in their own thread, because we can't guarantee the order and we can't guarantee that the exception listener will be triggered before the transport failed.
The only to guarantee the order is to run the 2 in the same async thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I got it, I'm just curious about the impact. If nobody overriddes onAsyncException() that's all good to me.

// Combine JMS ExceptionListener and TransportListener notifications into a single
// async task to prevent a race condition where the ExceptionListener (e.g.
// ConnectionPool) closes the connection and shuts down the executor before the
// TransportListener task can be queued.
final Runnable exceptionTask = () -> {
if (exceptionListener != null) {
try {
final JMSException jmsError = JMSExceptionSupport.create(error);
exceptionListener.onException(jmsError);
} catch (final Exception e) {
LOG.debug("Exception during JMS ExceptionListener notification", e);
}
}

transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
try {
doCleanup(true);
} catch (JMSException e) {
LOG.warn("Exception during connection cleanup, " + e, e);
}
for (final TransportListener listener : transportListeners) {
listener.onException(error);
}
};

try {
doCleanup(true);
} catch (JMSException e) {
LOG.warn("Exception during connection cleanup, " + e, e);
executor.execute(exceptionTask);
} catch (final RejectedExecutionException e) {
LOG.debug("Could not execute exception task (executor terminated, connection closing)");
}
for (final TransportListener listener : transportListeners) {
listener.onException(error);
}
});
}
}

@Override
Expand Down