diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java index 29f50e69183..845cd17eff1 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.SocketFactory; import javax.net.ssl.SSLEngine; @@ -28,10 +29,15 @@ import org.apache.activemq.transport.nio.NIOSSLTransport; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MQTTNIOSSLTransport extends NIOSSLTransport { + private static final Logger LOG = LoggerFactory.getLogger(MQTTNIOSSLTransport.class); + private MQTTCodec codec; + private final AtomicInteger concurrentParseCount = new AtomicInteger(0); public MQTTNIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); @@ -57,10 +63,19 @@ protected void initializeStreams() throws IOException { @Override protected void processCommand(ByteBuffer plain) throws Exception { - byte[] fill = new byte[plain.remaining()]; - plain.get(fill); - DataByteArrayInputStream dis = new DataByteArrayInputStream(fill); - codec.parse(dis, fill.length); + final int concurrent = concurrentParseCount.incrementAndGet(); + try { + if (concurrent > 1) { + LOG.error("CONCURRENT MQTT codec access detected! count={}, thread={}", + concurrent, Thread.currentThread().getName()); + } + final byte[] fill = new byte[plain.remaining()]; + plain.get(fill); + final DataByteArrayInputStream dis = new DataByteArrayInputStream(fill); + codec.parse(dis, fill.length); + } finally { + concurrentParseCount.decrementAndGet(); + } } /* (non-Javadoc) diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java index f3bf94e6cf7..696510bbebb 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java @@ -274,15 +274,21 @@ public void onResponse(MQTTProtocolConverter converter, Response response) throw } protected void restoreDurableSubs(List subs) { + LOG.info("Restoring {} MQTT durable subscription(s) for clientId={}", + subs.size(), protocol.getClientId()); try { for (SubscriptionInfo sub : subs) { - String name = sub.getSubcriptionName(); - String[] split = name.split(":", 2); - QoS qoS = QoS.valueOf(split[0]); + final String name = sub.getSubcriptionName(); + final String[] split = name.split(":", 2); + final QoS qoS = QoS.valueOf(split[0]); + LOG.debug("Restoring MQTT durable sub: name={}, qos={}, topic={}", + name, qoS, split[1]); onSubscribe(new Topic(split[1], qoS)); // mark this durable subscription as restored by Broker restoredDurableSubs.add(MQTTProtocolSupport.convertMQTTToActiveMQ(split[1])); + LOG.debug("Successfully restored MQTT durable sub: {}", name); } + LOG.info("All {} MQTT durable subscription(s) restored successfully", subs.size()); } catch (IOException e) { LOG.warn("Could not restore the MQTT durable subs.", e); } diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 27786f6b124..ab49881c99a 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -48,6 +48,9 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; @@ -1736,15 +1739,37 @@ private boolean isSubscriptionInactive(Topic topic, String clientId) throws Exce private boolean isSubscriptionActive(Topic topic, String clientId) throws Exception { if (isVirtualTopicSubscriptionStrategy()) { - String queueName = buildVirtualTopicQueueName(topic, clientId); + final String queueName = buildVirtualTopicQueueName(topic, clientId); try { return getProxyToQueue(queueName).getConsumerCount() > 0; } catch (Exception ignore) { return false; } } else { - return brokerService.getAdminView().getDurableTopicSubscribers().length >= 1 && - brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0; + final int activeSubs = brokerService.getAdminView().getDurableTopicSubscribers().length; + final int inactiveSubs = brokerService.getAdminView().getInactiveDurableTopicSubscribers().length; + final boolean jmxActive = activeSubs >= 1 && inactiveSubs == 0; + + // Diagnostic: also check the actual broker-level subscription state + // to determine if the flakiness is a JMX registration issue or a real broker bug + boolean brokerLevelActive = false; + try { + final RegionBroker regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); + final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); + final String subName = QoS.values()[topic.qos().ordinal()] + ":" + topic.name().toString(); + final DurableTopicSubscription sub = topicRegion.lookupSubscription(subName, clientId); + brokerLevelActive = sub != null && sub.isActive(); + } catch (Exception e) { + LOG.debug("Could not check broker-level subscription state", e); + } + + if (jmxActive != brokerLevelActive) { + LOG.warn("MQTT subscription state MISMATCH: JMX says active={} (active={}, inactive={}), " + + "broker-level says active={} for clientId={}, topic={}", + jmxActive, activeSubs, inactiveSubs, brokerLevelActive, clientId, topic.name()); + } + + return jmxActive; } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java index 6f355ae53c1..fc5c152f089 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java @@ -17,9 +17,13 @@ package org.apache.activemq.network; import java.io.File; +import java.lang.reflect.Field; import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import jakarta.jms.Connection; @@ -30,6 +34,8 @@ import junit.framework.Test; import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.DurableTopicSubscription; @@ -134,6 +140,23 @@ public void testDurablePropagationBrokerRestart() throws Exception { startAllBrokers(); waitForBridgeFormation(); + // Wait for the async durable sync (syncDurableSubs=true) to complete across all bridges. + // After restart with persistent data, NC durable subs must re-establish to match Phase 1 + // counts before we proceed with unsubscribes, otherwise the sync can re-create NC durable + // subs AFTER the unsubscribe advisory has already propagated. + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); + + // Wait for all bridge sync executors to finish populating durableRemoteSubs. + // setupStaticDestinations() creates DemandSubscriptions with empty durableRemoteSubs, + // and the syncExecutor populates them asynchronously. Without this wait, unsubscribe + // advisories may race with the sync executor: the advisory finds nothing to remove + // (durableRemoteSubs is empty), then the sync populates it, leaving a stale NC durable sub. + waitForAllBridgeSyncCompletion(); + conn = brokers.get("Broker_A_A").factory.createConnection(); conn.setClientID("clientId1"); conn.start(); @@ -854,6 +877,59 @@ protected void configureBroker(BrokerService broker) { broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest"); } + /** + * Wait for all bridge sync executors (both initiator and duplex bridges) to complete + * processing their BrokerSubscriptionInfo tasks on all brokers. + * Uses reflection to access the private syncExecutor, following the same pattern + * as {@link DynamicNetworkTestSupport#findDuplexBridge}. + */ + private void waitForAllBridgeSyncCompletion() throws Exception { + final Field syncExecutorField = DemandForwardingBridgeSupport.class.getDeclaredField("syncExecutor"); + syncExecutorField.setAccessible(true); + final Field duplexBridgeField = TransportConnection.class.getDeclaredField("duplexBridge"); + duplexBridgeField.setAccessible(true); + + for (final BrokerItem item : brokers.values()) { + final BrokerService broker = item.broker; + // Initiator bridges (accessible via network connectors) + for (final NetworkConnector nc : broker.getNetworkConnectors()) { + for (final NetworkBridge bridge : nc.activeBridges()) { + if (bridge instanceof DemandForwardingBridgeSupport) { + flushSyncExecutor(syncExecutorField, (DemandForwardingBridgeSupport) bridge); + } + } + } + // Duplex bridges (accessible via transport connections) + for (final TransportConnector tc : broker.getTransportConnectors()) { + for (final TransportConnection conn : tc.getConnections()) { + if (conn.getConnectionId() != null && conn.getConnectionId().startsWith("networkConnector_")) { + final DemandForwardingBridgeSupport duplexBridge = + (DemandForwardingBridgeSupport) duplexBridgeField.get(conn); + if (duplexBridge != null) { + flushSyncExecutor(syncExecutorField, duplexBridge); + } + } + } + } + } + } + + private void flushSyncExecutor(final Field syncExecutorField, + final DemandForwardingBridgeSupport bridge) throws Exception { + final ExecutorService syncExecutor = (ExecutorService) syncExecutorField.get(bridge); + if (syncExecutor.isShutdown()) { + return; + } + final CountDownLatch latch = new CountDownLatch(1); + try { + syncExecutor.execute(latch::countDown); + } catch (final RejectedExecutionException e) { + return; + } + assertTrue("Sync executor should complete on " + bridge, + latch.await(30, TimeUnit.SECONDS)); + } + protected void startNetworkConnectors(NetworkConnector... connectors) throws Exception { for (final NetworkConnector connector : connectors) { connector.start(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index a2d26c3bbc8..56cbe45af47 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -226,7 +226,12 @@ public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exce //Test that on successful reconnection of the bridge that //the NC sub will be removed restartBroker(broker2, true); - assertNCDurableSubsCount(broker2, topic, 1); + // In REVERSE flow, broker2=localBroker has the bridge and broker1 (remoteBroker) + // is already running, so the sync may have already cleaned up the NC durable sub. + // This "before sync" assertion is only valid in FORWARD flow. + if (flow == FLOW.FORWARD) { + assertNCDurableSubsCount(broker2, topic, 1); + } restartBroker(broker1, true); assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 0); @@ -253,7 +258,9 @@ public void testSubscriptionRemovedAfterIncludedChanged() throws Exception { //the NC sub will be removed because even though the local subscription exists, //it no longer matches the included filter restartBroker(broker2, true); - assertNCDurableSubsCount(broker2, topic, 1); + if (flow == FLOW.FORWARD) { + assertNCDurableSubsCount(broker2, topic, 1); + } restartBroker(broker1, true); assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 0); @@ -291,7 +298,9 @@ public void testSubscriptionRemovedAfterStaticChanged() throws Exception { //the NC sub will be removed because even though the local subscription exists, //it no longer matches the included static filter restartBroker(broker2, true); - assertNCDurableSubsCount(broker2, topic, 1); + if (flow == FLOW.FORWARD) { + assertNCDurableSubsCount(broker2, topic, 1); + } restartBroker(broker1, true); assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 0); @@ -320,10 +329,13 @@ public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Ex //Test that on successful reconnection of the bridge that //the NC sub will be removed for topic1 but will stay for topic2 - //before sync, the old NC should exist + //before sync, the old NC should exist (only verifiable in FORWARD flow; + //in REVERSE, broker2=localBroker has the bridge and sync may already have run) restartBroker(broker2, true); - assertNCDurableSubsCount(broker2, topic, 1); - assertNCDurableSubsCount(broker2, topic2, 0); + if (flow == FLOW.FORWARD) { + assertNCDurableSubsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic2, 0); + } //After sync, remove old NC and create one for topic 2 restartBroker(broker1, true); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index 31aee0938db..9c8a4d474b0 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -1357,11 +1357,8 @@ public void testToTopicWithDurable() throws Exception { includedProducer.send(test); assertNotNull(bridgeConsumer.receive(5000)); - assertTrue("dequeues not updated", - Wait.waitFor(() -> 1 == destinationStatistics.getDequeues().getCount())); - - assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount()); - assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount()); + waitForDispatchFromLocalBroker(destinationStatistics, 1); + assertLocalBrokerStatistics(destinationStatistics, 1); assertRemoteAdvisoryCount(advisoryConsumer, 1); assertAdvisoryBrokerCounts(1,1,0);