Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import java.util.stream.Collectors;
Expand Down Expand Up @@ -87,6 +88,7 @@ public class Topic extends BaseDestination implements Task {
private final ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final TaskRunner taskRunner;
private final TaskRunnerFactory taskRunnerFactor;
private final ReentrantLock sendLock = new ReentrantLock();
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@Override
Expand All @@ -99,7 +101,7 @@ public void run() {
};

public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store,
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
this.topicStore = store;
subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
Expand Down Expand Up @@ -521,40 +523,59 @@ public void run() {
}

/**
* do send the message - this needs to be synchronized to ensure messages
* are stored AND dispatched in the right order
* Sends a message to this topic. Uses a ReentrantLock instead of
* synchronized to avoid lock convoys and biased locking overhead under
* high contention (many concurrent producers with slow persistence or
* many subscribers).
*
* The write lock is held only during persistence (to guarantee message
* ordering via brokerSequenceId). Dispatch to subscribers and persistence
* completion wait happen outside the lock, allowing concurrent dispatch
* for messages from different producers.
*
* This is valid per Jakarta Messaging 3.1 Section 6.2.9: message ordering
* is guaranteed per-session/per-producer only. A JMS Session is not
* thread-safe, so a single producer cannot have concurrent send() calls.
*
* @param producerExchange
* @param message
* @throws IOException
* @throws Exception
*/
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
Future<Object> result = null;

if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new jakarta.jms.ResourceAllocationException(logMessage);
}
// Write lock: serialize persistence for message ordering
sendLock.lock();
try {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());

waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new jakarta.jms.ResourceAllocationException(logMessage);
}

waitForSpace(context, producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
result = topicStore.asyncAddTopicMessage(context, message, isOptimizeStorage());
}
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());

//Moved the reduceMemoryfootprint clearing to the dispatch method
message.incrementReferenceCount();
} finally {
sendLock.unlock();
}

message.incrementReferenceCount();

// Dispatch and persistence wait outside the lock — concurrent for
// messages from different producers
if (context.isInTransaction() && (context.getTransaction() != null)) {
final Future<Object> pendingResult = result;
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
Expand All @@ -574,6 +595,7 @@ public void afterCommit() throws Exception {
} finally {
message.decrementReferenceCount();
}
awaitPersistence(pendingResult);
}

@Override
Expand All @@ -588,8 +610,11 @@ public void afterRollback() throws Exception {
} finally {
message.decrementReferenceCount();
}
awaitPersistence(result);
}
}

private void awaitPersistence(final Future<Object> result) throws Exception {
if (result != null && !result.isCancelled()) {
try {
result.get();
Expand All @@ -611,14 +636,14 @@ public String toString() {

@Override
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
final MessageReference node) throws IOException {
final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
if (sub instanceof DurableTopicSubscription) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(),
node.getMessageId(),
convertToNonRangedAck(ack, node));
node.getMessageId(),
convertToNonRangedAck(ack, node));
}
}
messageConsumed(context, node);
Expand Down Expand Up @@ -873,8 +898,8 @@ public boolean isDuplicate(MessageId ref) {

// get the sub keys that should be checked for expired messages
final var subs = durableSubscribers.entrySet().stream()
.filter(entry -> isEligibleForExpiration(entry.getValue()))
.map(Entry::getKey).collect(Collectors.toSet());
.filter(entry -> isEligibleForExpiration(entry.getValue()))
.map(Entry::getKey).collect(Collectors.toSet());

if (subs.isEmpty()) {
LOG.debug("Skipping topic expiration check for {}, no eligible subscriptions to check", destination);
Expand All @@ -884,7 +909,7 @@ public boolean isDuplicate(MessageId ref) {
// For each eligible subscription, return the messages in the store that are expired
// The same message refs are shared between subs if duplicated so this is efficient
var expired = store.recoverExpired(subs, getMaxExpirePageSize(),
expiryListener);
expiryListener);

final ConnectionContext connectionContext = createConnectionContext();
// Go through any expired messages and remove for each sub
Expand Down Expand Up @@ -962,25 +987,25 @@ protected boolean isOptimizeStorage(){
boolean result = false;

if (isDoOptimzeMessageStorage() && durableSubscribers.isEmpty()==false){
result = true;
for (DurableTopicSubscription s : durableSubscribers.values()) {
if (s.isActive()== false){
result = false;
break;
}
if (s.getPrefetchSize()==0){
result = false;
break;
}
if (s.isSlowConsumer()){
result = false;
break;
}
if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
result = false;
break;
}
result = true;
for (DurableTopicSubscription s : durableSubscribers.values()) {
if (s.isActive()== false){
result = false;
break;
}
if (s.getPrefetchSize()==0){
result = false;
break;
}
if (s.isSlowConsumer()){
result = false;
break;
}
if (s.getInFlightUsage() > getOptimizeMessageStoreInFlightLimit()){
result = false;
break;
}
}
}
return result;
}
Expand Down
14 changes: 14 additions & 0 deletions activemq-unit-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,20 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<!-- JMH for micro-benchmarks (excluded from CI via naming convention) -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.37</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.37</version>
<scope>test</scope>
</dependency>
</dependencies>

<reporting>
Expand Down
Loading
Loading