From f790615bf9fb8d6c856206ed95f6c7b175a2ae5f Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Tue, 17 Mar 2026 00:14:04 +0100 Subject: [PATCH 1/2] feat: add message delivery tracking to QueueMessageReference Add isDelivered()/setDelivered() to QueueMessageReference interface with implementations in IndirectMessageReference (boolean field) and NullMessageReference (no-op). PrefetchSubscription.acknowledge() now marks messages as delivered=true when processing DeliveredAck, enabling downstream consumers to distinguish delivered-but-unacked messages. --- .../region/IndirectMessageReference.java | 15 ++++++++++-- .../broker/region/NullMessageReference.java | 9 ++++++++ .../broker/region/PrefetchSubscription.java | 23 ++++++++++++++++++- .../broker/region/QueueMessageReference.java | 4 ++++ 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 104dcb0a4a5..37dfaf198d6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -35,6 +35,8 @@ public class IndirectMessageReference implements QueueMessageReference { private boolean dropped; /** Has the message been acked? */ private boolean acked; + /** Has the message been acked? */ + private boolean delivered; /** Direct reference to the message */ private final Message message; private final MessageId messageId; @@ -197,16 +199,25 @@ public boolean isExpired() { @Override public synchronized int getSize() { - return message.getSize(); + return message.getSize(); } @Override public boolean isAdvisory() { - return message.isAdvisory(); + return message.isAdvisory(); } @Override public boolean canProcessAsExpired() { return message.canProcessAsExpired(); } + + @Override + public boolean isDelivered() { + return delivered; + } + + public void setDelivered(final boolean delivered) { + this.delivered = delivered; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java index e8d26a84fca..8d2edf8895d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java @@ -159,4 +159,13 @@ public boolean canProcessAsExpired() { return false; } + @Override + public boolean isDelivered() { + return false; + } + + @Override + public void setDelivered(boolean delivered) { + + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index b2adf56a679..ee95053139b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -270,6 +270,27 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a } else if (ack.isDeliveredAck()) { // Message was delivered but not acknowledged: update pre-fetch // counters. + + boolean inAckRange = false; + for (final MessageReference node : dispatched) { + MessageId messageId = node.getMessageId(); + if (ack.getFirstMessageId() == null + || ack.getFirstMessageId().equals(messageId)) { + inAckRange = true; + } + if (inAckRange) { + if (node instanceof QueueMessageReference) { + ((QueueMessageReference) node).setDelivered(true); + } + + if (ack.getLastMessageId().equals(messageId)) { + destination = (Destination) node.getRegionDestination(); + callDispatchMatched = true; + break; + } + } + } + int index = 0; for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) { final MessageReference node = iter.next(); @@ -396,7 +417,7 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a } protected void processExpiredAck(final ConnectionContext context, final Destination dest, - final MessageReference node) { + final MessageReference node) { dest.messageExpired(context, this, node); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java index 89bcd6a5179..e78e106c93e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java @@ -42,4 +42,8 @@ public interface QueueMessageReference extends MessageReference { boolean unlock(); LockOwner getLockOwner(); + + boolean isDelivered(); + + void setDelivered(boolean delivered); } From 99917514fbcfe72e4a40cb426856ba762dc51143 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Thu, 19 Mar 2026 20:11:55 +0100 Subject: [PATCH 2/2] fix: incorporate feedback --- .../region/IndirectMessageReference.java | 7 +++-- .../broker/region/PrefetchSubscription.java | 29 ++++--------------- 2 files changed, 9 insertions(+), 27 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 37dfaf198d6..047c34f8602 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -35,7 +35,7 @@ public class IndirectMessageReference implements QueueMessageReference { private boolean dropped; /** Has the message been acked? */ private boolean acked; - /** Has the message been acked? */ + /** Has the message been delivered? */ private boolean delivered; /** Direct reference to the message */ private final Message message; @@ -213,11 +213,12 @@ public boolean canProcessAsExpired() { } @Override - public boolean isDelivered() { + public synchronized boolean isDelivered() { return delivered; } - public void setDelivered(final boolean delivered) { + @Override + public synchronized void setDelivered(final boolean delivered) { this.delivered = delivered; } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index ee95053139b..2fa6203bc51 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -271,33 +271,14 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a // Message was delivered but not acknowledged: update pre-fetch // counters. - boolean inAckRange = false; for (final MessageReference node : dispatched) { - MessageId messageId = node.getMessageId(); - if (ack.getFirstMessageId() == null - || ack.getFirstMessageId().equals(messageId)) { - inAckRange = true; - } - if (inAckRange) { - if (node instanceof QueueMessageReference) { - ((QueueMessageReference) node).setDelivered(true); - } - - if (ack.getLastMessageId().equals(messageId)) { - destination = (Destination) node.getRegionDestination(); - callDispatchMatched = true; - break; - } + final MessageId messageId = node.getMessageId(); + if (node instanceof QueueMessageReference) { + ((QueueMessageReference) node).setDelivered(true); } - } - - int index = 0; - for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) { - final MessageReference node = iter.next(); - Destination nodeDest = (Destination) node.getRegionDestination(); - if (ack.getLastMessageId().equals(node.getMessageId())) { + if (ack.getLastMessageId().equals(messageId)) { expandPrefetchExtension(ack.getMessageCount()); - destination = nodeDest; + destination = (Destination) node.getRegionDestination(); callDispatchMatched = true; break; }