diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a6bf1b20952..2356e5d4746 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -636,6 +636,9 @@ public void execute() throws Exception { } }); } + if (m.propertyExists(ActiveMQMessage.JMS_DELIVERY_TIME_PROPERTY)) { + m.setJMSDeliveryTime(m.getLongProperty(ActiveMQMessage.JMS_DELIVERY_TIME_PROPERTY)); + } return m; } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index fbd093f10f2..af92ffef7e8 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -27,6 +27,7 @@ import jakarta.jms.JMSException; import jakarta.jms.Message; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerId; @@ -326,6 +327,13 @@ public void send(Destination destination, Message message, int deliveryMode, int } } + long delay = getDeliveryDelay(); + if (delay > 0) { + message.setLongProperty("AMQ_SCHEDULED_DELAY", delay); + long deliveryTime = System.currentTimeMillis() + delay; + message.setLongProperty(ActiveMQMessage.JMS_DELIVERY_TIME_PROPERTY, deliveryTime); + } + this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID, disableMessageTimestamp, producerWindow, sendTimeout, onComplete); stats.onMessage(); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java index 5816d70e30c..8f5c56a9fa7 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java @@ -41,6 +41,8 @@ public abstract class ActiveMQMessageProducerSupport implements MessageProducer, protected long defaultTimeToLive; protected int sendTimeout=0; + private long deliveryDelay = 0; + public ActiveMQMessageProducerSupport(ActiveMQSession session) { this.session = session; disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault(); @@ -56,7 +58,12 @@ public ActiveMQMessageProducerSupport(ActiveMQSession session) { */ @Override public void setDeliveryDelay(long deliveryDelay) throws JMSException { - throw new UnsupportedOperationException("setDeliveryDelay() is not supported"); + checkClosed(); + // This should now compile after the rebase! + if (deliveryDelay < 0 && session.connection.isStrictCompliance()) { + throw new jakarta.jms.JMSException("Delivery delay cannot be negative."); + } + this.deliveryDelay = deliveryDelay; } /** @@ -68,7 +75,8 @@ public void setDeliveryDelay(long deliveryDelay) throws JMSException { */ @Override public long getDeliveryDelay() throws JMSException { - throw new UnsupportedOperationException("getDeliveryDelay() is not supported"); + checkClosed(); + return this.deliveryDelay; } /** diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java index abf74930242..dedff3b0f42 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java @@ -35,6 +35,7 @@ import jakarta.jms.ObjectMessage; import jakarta.jms.TextMessage; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.TypeConversionSupport; @@ -46,6 +47,7 @@ public class ActiveMQProducer implements JMSProducer { // QoS override of defaults on a per-JMSProducer instance basis private String correlationId = null; private byte[] correlationIdBytes = null; + private Long deliveryDelay = null; private Integer deliveryMode = null; private Boolean disableMessageID = false; private Boolean disableMessageTimestamp = false; @@ -90,6 +92,13 @@ public JMSProducer send(Destination destination, Message message) { } } + // [AMQ-8320] Producer setting for deliveryDelay will override user-specified ActiveMQ Scheduled Delay property + if(this.deliveryDelay != null) { + long deliveryTimeMillis = System.currentTimeMillis() + this.deliveryDelay; + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, this.deliveryDelay); + message.setLongProperty(ActiveMQMessage.JMS_DELIVERY_TIME_PROPERTY, deliveryTimeMillis); + } + activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null); } catch (JMSException e) { throw JMSExceptionSupport.convertToJMSRuntimeException(e); @@ -246,12 +255,23 @@ public long getTimeToLive() { @Override public JMSProducer setDeliveryDelay(long deliveryDelay) { - throw new UnsupportedOperationException("setDeliveryDelay(long) is not supported"); + try { + // Tell the internal core producer about the delay + this.activemqMessageProducer.setDeliveryDelay(deliveryDelay); + + // Update the local field in this wrapper for consistency + this.deliveryDelay = deliveryDelay; + + } catch (JMSException e) { + // JMS 2.0 requires converting checked exceptions to RuntimeExceptions + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } + return this; } @Override public long getDeliveryDelay() { - throw new UnsupportedOperationException("getDeliveryDelay() is not supported"); + return this.deliveryDelay; } @Override diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java index 3a02d074c66..7fd05eca111 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java @@ -48,6 +48,7 @@ public class ActiveMQMessage extends Message implements org.apache.activemq.Mess public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE; public static final String DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause"; public static final String BROKER_PATH_PROPERTY = "JMSActiveMQBrokerPath"; + public static final String JMS_DELIVERY_TIME_PROPERTY = "JMSDeliveryTime"; private static final Map JMS_PROPERTY_SETERS = new HashMap(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQDeliveryDelayTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQDeliveryDelayTest.java new file mode 100644 index 00000000000..1733316badb --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQDeliveryDelayTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import jakarta.jms.Connection; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import org.apache.activemq.command.ActiveMQMessage; +import org.junit.Test; + +import static org.apache.activemq.command.DataStructureTestSupport.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ActiveMQDeliveryDelayTest { + + private final String connectionUri = "vm://localhost?broker.persistent=false"; + + @Test + public void testStrictComplianceRejectsNegativeDelay() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + // Turn ON strict compliance (Jakarta 3.1 requirement) + factory.setStrictCompliance(true); + + try (Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + + MessageProducer producer = sess.createProducer(sess.createQueue("TEST.STRICT")); + + try { + producer.setDeliveryDelay(-1000L); + fail("Should have thrown a JMSException for negative delay in strict mode"); + } catch (jakarta.jms.JMSException e) { + // Success: Exception was thrown as required by the spec + } + } + } + + @Test + public void testLegacyBehaviorAllowsNegativeDelay() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + // Turn OFF strict compliance (Legacy ActiveMQ behavior) + factory.setStrictCompliance(false); + + try (Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + + MessageProducer producer = sess.createProducer(sess.createQueue("TEST.LEGACY")); + + // Should NOT throw an exception + producer.setDeliveryDelay(-1000L); + assertEquals(-1000L, producer.getDeliveryDelay()); + } + } + + @Test + public void testDeliveryDelayEffectiveOnMessage() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + try (Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + + MessageProducer producer = sess.createProducer(sess.createQueue("TEST.EFFECTIVE")); + long delay = 5000L; + producer.setDeliveryDelay(delay); + + ActiveMQMessage msg = (ActiveMQMessage) sess.createTextMessage("Hello"); + producer.send(msg); + + // Verify Broker-side scheduling property + assertEquals("Broker delay property missing", + delay, msg.getLongProperty("AMQ_SCHEDULED_DELAY")); + + // Verify Consumer-side visibility property (matching #1157 logic) + assertTrue("JMSDeliveryTime property missing or incorrect", + msg.getLongProperty(ActiveMQMessage.JMS_DELIVERY_TIME_PROPERTY) >= System.currentTimeMillis() + delay - 100); + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java index b5f54679314..427548a91d1 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java @@ -289,14 +289,16 @@ public void testSessionSharedDurableConsumerSelector() throws JMSException { session.createSharedDurableConsumer(session.createTopic("test"), null, null); } - @Test(expected = UnsupportedOperationException.class) + @Test public void testProducerDeliveryDelayGet() throws JMSException { - messageProducer.getDeliveryDelay(); + assertEquals(0, messageProducer.getDeliveryDelay()); } - @Test(expected = UnsupportedOperationException.class) + @Test public void testProducerDeliveryDelaySet() throws JMSException { - messageProducer.setDeliveryDelay(1000l); + long delay = 1000L; + messageProducer.setDeliveryDelay(delay); + assertEquals(delay, messageProducer.getDeliveryDelay()); } @Test(expected = UnsupportedOperationException.class)