diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index 104dcb0a4a5..047c34f8602 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -35,6 +35,8 @@ public class IndirectMessageReference implements QueueMessageReference { private boolean dropped; /** Has the message been acked? */ private boolean acked; + /** Has the message been delivered? */ + private boolean delivered; /** Direct reference to the message */ private final Message message; private final MessageId messageId; @@ -197,16 +199,26 @@ public boolean isExpired() { @Override public synchronized int getSize() { - return message.getSize(); + return message.getSize(); } @Override public boolean isAdvisory() { - return message.isAdvisory(); + return message.isAdvisory(); } @Override public boolean canProcessAsExpired() { return message.canProcessAsExpired(); } + + @Override + public synchronized boolean isDelivered() { + return delivered; + } + + @Override + public synchronized void setDelivered(final boolean delivered) { + this.delivered = delivered; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java index e8d26a84fca..8d2edf8895d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java @@ -159,4 +159,13 @@ public boolean canProcessAsExpired() { return false; } + @Override + public boolean isDelivered() { + return false; + } + + @Override + public void setDelivered(boolean delivered) { + + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index b2adf56a679..2fa6203bc51 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -270,13 +270,15 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a } else if (ack.isDeliveredAck()) { // Message was delivered but not acknowledged: update pre-fetch // counters. - int index = 0; - for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) { - final MessageReference node = iter.next(); - Destination nodeDest = (Destination) node.getRegionDestination(); - if (ack.getLastMessageId().equals(node.getMessageId())) { + + for (final MessageReference node : dispatched) { + final MessageId messageId = node.getMessageId(); + if (node instanceof QueueMessageReference) { + ((QueueMessageReference) node).setDelivered(true); + } + if (ack.getLastMessageId().equals(messageId)) { expandPrefetchExtension(ack.getMessageCount()); - destination = nodeDest; + destination = (Destination) node.getRegionDestination(); callDispatchMatched = true; break; } @@ -396,7 +398,7 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a } protected void processExpiredAck(final ConnectionContext context, final Destination dest, - final MessageReference node) { + final MessageReference node) { dest.messageExpired(context, this, node); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java index 89bcd6a5179..e78e106c93e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java @@ -42,4 +42,8 @@ public interface QueueMessageReference extends MessageReference { boolean unlock(); LockOwner getLockOwner(); + + boolean isDelivered(); + + void setDelivered(boolean delivered); }