Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.SocketFactory;
import javax.net.ssl.SSLEngine;

import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTTNIOSSLTransport extends NIOSSLTransport {

private static final Logger LOG = LoggerFactory.getLogger(MQTTNIOSSLTransport.class);

private MQTTCodec codec;
private final AtomicInteger concurrentParseCount = new AtomicInteger(0);

public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
Expand All @@ -57,10 +63,19 @@ protected void initializeStreams() throws IOException {

@Override
protected void processCommand(ByteBuffer plain) throws Exception {
byte[] fill = new byte[plain.remaining()];
plain.get(fill);
DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
codec.parse(dis, fill.length);
final int concurrent = concurrentParseCount.incrementAndGet();
try {
if (concurrent > 1) {
LOG.error("CONCURRENT MQTT codec access detected! count={}, thread={}",
concurrent, Thread.currentThread().getName());
}
final byte[] fill = new byte[plain.remaining()];
plain.get(fill);
final DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
codec.parse(dis, fill.length);
} finally {
concurrentParseCount.decrementAndGet();
}
}

/* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,21 @@ public void onResponse(MQTTProtocolConverter converter, Response response) throw
}

protected void restoreDurableSubs(List<SubscriptionInfo> subs) {
LOG.info("Restoring {} MQTT durable subscription(s) for clientId={}",
subs.size(), protocol.getClientId());
try {
for (SubscriptionInfo sub : subs) {
String name = sub.getSubcriptionName();
String[] split = name.split(":", 2);
QoS qoS = QoS.valueOf(split[0]);
final String name = sub.getSubcriptionName();
final String[] split = name.split(":", 2);
final QoS qoS = QoS.valueOf(split[0]);
LOG.debug("Restoring MQTT durable sub: name={}, qos={}, topic={}",
name, qoS, split[1]);
onSubscribe(new Topic(split[1], qoS));
// mark this durable subscription as restored by Broker
restoredDurableSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1]));
LOG.debug("Successfully restored MQTT durable sub: {}", name);
}
LOG.info("All {} MQTT durable subscription(s) restored successfully", subs.size());
} catch (IOException e) {
LOG.warn("Could not restore the MQTT durable subs.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
Expand Down Expand Up @@ -1736,15 +1739,37 @@ private boolean isSubscriptionInactive(Topic topic, String clientId) throws Exce

private boolean isSubscriptionActive(Topic topic, String clientId) throws Exception {
if (isVirtualTopicSubscriptionStrategy()) {
String queueName = buildVirtualTopicQueueName(topic, clientId);
final String queueName = buildVirtualTopicQueueName(topic, clientId);
try {
return getProxyToQueue(queueName).getConsumerCount() > 0;
} catch (Exception ignore) {
return false;
}
} else {
return brokerService.getAdminView().getDurableTopicSubscribers().length >= 1 &&
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
final int activeSubs = brokerService.getAdminView().getDurableTopicSubscribers().length;
final int inactiveSubs = brokerService.getAdminView().getInactiveDurableTopicSubscribers().length;
final boolean jmxActive = activeSubs >= 1 && inactiveSubs == 0;

// Diagnostic: also check the actual broker-level subscription state
// to determine if the flakiness is a JMX registration issue or a real broker bug
boolean brokerLevelActive = false;
try {
final RegionBroker regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
final String subName = QoS.values()[topic.qos().ordinal()] + ":" + topic.name().toString();
final DurableTopicSubscription sub = topicRegion.lookupSubscription(subName, clientId);
brokerLevelActive = sub != null && sub.isActive();
} catch (Exception e) {
LOG.debug("Could not check broker-level subscription state", e);
}

if (jmxActive != brokerLevelActive) {
LOG.warn("MQTT subscription state MISMATCH: JMX says active={} (active={}, inactive={}), " +
"broker-level says active={} for clientId={}, topic={}",
jmxActive, activeSubs, inactiveSubs, brokerLevelActive, clientId, topic.name());
}

return jmxActive;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
package org.apache.activemq.network;

import java.io.File;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import jakarta.jms.Connection;
Expand All @@ -30,6 +34,8 @@
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DurableTopicSubscription;
Expand Down Expand Up @@ -134,6 +140,23 @@ public void testDurablePropagationBrokerRestart() throws Exception {
startAllBrokers();
waitForBridgeFormation();

// Wait for the async durable sync (syncDurableSubs=true) to complete across all bridges.
// After restart with persistent data, NC durable subs must re-establish to match Phase 1
// counts before we proceed with unsubscribes, otherwise the sync can re-create NC durable
// subs AFTER the unsubscribe advisory has already propagated.
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);

// Wait for all bridge sync executors to finish populating durableRemoteSubs.
// setupStaticDestinations() creates DemandSubscriptions with empty durableRemoteSubs,
// and the syncExecutor populates them asynchronously. Without this wait, unsubscribe
// advisories may race with the sync executor: the advisory finds nothing to remove
// (durableRemoteSubs is empty), then the sync populates it, leaving a stale NC durable sub.
waitForAllBridgeSyncCompletion();

conn = brokers.get("Broker_A_A").factory.createConnection();
conn.setClientID("clientId1");
conn.start();
Expand Down Expand Up @@ -854,6 +877,59 @@ protected void configureBroker(BrokerService broker) {
broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest");
}

/**
* Wait for all bridge sync executors (both initiator and duplex bridges) to complete
* processing their BrokerSubscriptionInfo tasks on all brokers.
* Uses reflection to access the private syncExecutor, following the same pattern
* as {@link DynamicNetworkTestSupport#findDuplexBridge}.
*/
private void waitForAllBridgeSyncCompletion() throws Exception {
final Field syncExecutorField = DemandForwardingBridgeSupport.class.getDeclaredField("syncExecutor");
syncExecutorField.setAccessible(true);
final Field duplexBridgeField = TransportConnection.class.getDeclaredField("duplexBridge");
duplexBridgeField.setAccessible(true);

for (final BrokerItem item : brokers.values()) {
final BrokerService broker = item.broker;
// Initiator bridges (accessible via network connectors)
for (final NetworkConnector nc : broker.getNetworkConnectors()) {
for (final NetworkBridge bridge : nc.activeBridges()) {
if (bridge instanceof DemandForwardingBridgeSupport) {
flushSyncExecutor(syncExecutorField, (DemandForwardingBridgeSupport) bridge);
}
}
}
// Duplex bridges (accessible via transport connections)
for (final TransportConnector tc : broker.getTransportConnectors()) {
for (final TransportConnection conn : tc.getConnections()) {
if (conn.getConnectionId() != null && conn.getConnectionId().startsWith("networkConnector_")) {
final DemandForwardingBridgeSupport duplexBridge =
(DemandForwardingBridgeSupport) duplexBridgeField.get(conn);
if (duplexBridge != null) {
flushSyncExecutor(syncExecutorField, duplexBridge);
}
}
}
}
}
}

private void flushSyncExecutor(final Field syncExecutorField,
final DemandForwardingBridgeSupport bridge) throws Exception {
final ExecutorService syncExecutor = (ExecutorService) syncExecutorField.get(bridge);
if (syncExecutor.isShutdown()) {
return;
}
final CountDownLatch latch = new CountDownLatch(1);
try {
syncExecutor.execute(latch::countDown);
} catch (final RejectedExecutionException e) {
return;
}
assertTrue("Sync executor should complete on " + bridge,
latch.await(30, TimeUnit.SECONDS));
}

protected void startNetworkConnectors(NetworkConnector... connectors) throws Exception {
for (final NetworkConnector connector : connectors) {
connector.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,12 @@ public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exce
//Test that on successful reconnection of the bridge that
//the NC sub will be removed
restartBroker(broker2, true);
assertNCDurableSubsCount(broker2, topic, 1);
// In REVERSE flow, broker2=localBroker has the bridge and broker1 (remoteBroker)
// is already running, so the sync may have already cleaned up the NC durable sub.
// This "before sync" assertion is only valid in FORWARD flow.
if (flow == FLOW.FORWARD) {
assertNCDurableSubsCount(broker2, topic, 1);
}
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
Expand All @@ -253,7 +258,9 @@ public void testSubscriptionRemovedAfterIncludedChanged() throws Exception {
//the NC sub will be removed because even though the local subscription exists,
//it no longer matches the included filter
restartBroker(broker2, true);
assertNCDurableSubsCount(broker2, topic, 1);
if (flow == FLOW.FORWARD) {
assertNCDurableSubsCount(broker2, topic, 1);
}
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
Expand Down Expand Up @@ -291,7 +298,9 @@ public void testSubscriptionRemovedAfterStaticChanged() throws Exception {
//the NC sub will be removed because even though the local subscription exists,
//it no longer matches the included static filter
restartBroker(broker2, true);
assertNCDurableSubsCount(broker2, topic, 1);
if (flow == FLOW.FORWARD) {
assertNCDurableSubsCount(broker2, topic, 1);
}
restartBroker(broker1, true);
assertBridgeStarted();
assertNCDurableSubsCount(broker2, topic, 0);
Expand Down Expand Up @@ -320,10 +329,13 @@ public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Ex
//Test that on successful reconnection of the bridge that
//the NC sub will be removed for topic1 but will stay for topic2

//before sync, the old NC should exist
//before sync, the old NC should exist (only verifiable in FORWARD flow;
//in REVERSE, broker2=localBroker has the bridge and sync may already have run)
restartBroker(broker2, true);
assertNCDurableSubsCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic2, 0);
if (flow == FLOW.FORWARD) {
assertNCDurableSubsCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic2, 0);
}

//After sync, remove old NC and create one for topic 2
restartBroker(broker1, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,11 +1357,8 @@ public void testToTopicWithDurable() throws Exception {
includedProducer.send(test);
assertNotNull(bridgeConsumer.receive(5000));

assertTrue("dequeues not updated",
Wait.waitFor(() -> 1 == destinationStatistics.getDequeues().getCount()));

assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
waitForDispatchFromLocalBroker(destinationStatistics, 1);
assertLocalBrokerStatistics(destinationStatistics, 1);

assertRemoteAdvisoryCount(advisoryConsumer, 1);
assertAdvisoryBrokerCounts(1,1,0);
Expand Down
Loading