Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class IndirectMessageReference implements QueueMessageReference {
private boolean dropped;
/** Has the message been acked? */
private boolean acked;
/** Has the message been delivered? */
private boolean delivered;
/** Direct reference to the message */
private final Message message;
private final MessageId messageId;
Expand Down Expand Up @@ -197,16 +199,26 @@ public boolean isExpired() {

@Override
public synchronized int getSize() {
return message.getSize();
return message.getSize();
}

@Override
public boolean isAdvisory() {
return message.isAdvisory();
return message.isAdvisory();
}

@Override
public boolean canProcessAsExpired() {
return message.canProcessAsExpired();
}

@Override
public synchronized boolean isDelivered() {
return delivered;
}

@Override
public synchronized void setDelivered(final boolean delivered) {
this.delivered = delivered;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,13 @@ public boolean canProcessAsExpired() {
return false;
}

@Override
public boolean isDelivered() {
return false;
}

@Override
public void setDelivered(boolean delivered) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,15 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
final MessageReference node = iter.next();
Destination nodeDest = (Destination) node.getRegionDestination();
if (ack.getLastMessageId().equals(node.getMessageId())) {

for (final MessageReference node : dispatched) {
final MessageId messageId = node.getMessageId();
if (node instanceof QueueMessageReference) {
((QueueMessageReference) node).setDelivered(true);
}
if (ack.getLastMessageId().equals(messageId)) {
expandPrefetchExtension(ack.getMessageCount());
destination = nodeDest;
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
}
Expand Down Expand Up @@ -396,7 +398,7 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a
}

protected void processExpiredAck(final ConnectionContext context, final Destination dest,
final MessageReference node) {
final MessageReference node) {
dest.messageExpired(context, this, node);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ public interface QueueMessageReference extends MessageReference {
boolean unlock();

LockOwner getLockOwner();

boolean isDelivered();

void setDelivered(boolean delivered);
}
Loading