From 48bb204972ff6746c5085b039954e8caeaa40c43 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Tue, 18 Nov 2025 10:39:13 +0100 Subject: [PATCH] AMQ-9809: org.apache.activemq.perf.InactiveDurableTopicTest hanging --- .../perf/InactiveDurableTopicTest.java | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java index df52184b05b..dcec99f35ff 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.perf; +import java.util.concurrent.TimeUnit; + import jakarta.jms.Connection; import jakarta.jms.JMSException; import jakarta.jms.MapMessage; @@ -39,7 +41,11 @@ public class InactiveDurableTopicTest extends TestCase { private static final transient Logger LOG = LoggerFactory.getLogger(InactiveDurableTopicTest.class); - private static final int MESSAGE_COUNT = 2000; + /** + * Keep the payload small so that the test completes quickly but still + * exercises durable subscription behaviour. + */ + private static final int MESSAGE_COUNT = 500; private static final String DEFAULT_PASSWORD = ""; private static final String USERNAME = "testuser"; private static final String CLIENTID = "mytestclient"; @@ -55,21 +61,28 @@ public class InactiveDurableTopicTest extends TestCase { private ActiveMQConnectionFactory connectionFactory; private BrokerService broker; + private static final int SEND_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30); + private static final long SEND_LOOP_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(2); + private static final long RECEIVE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5); + private static final String BROKER_NAME = "inactiveDurableTopicTest"; + @Override protected void setUp() throws Exception { super.setUp(); broker = new BrokerService(); - - //broker.setPersistenceAdapter(new KahaPersistenceAdapter()); - broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); + broker.setUseJmx(false); + broker.setBrokerName(BROKER_NAME); + // broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); broker.start(); - connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL); + // connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL); + connectionFactory = new ActiveMQConnectionFactory("vm://" + BROKER_NAME); /* * Doesn't matter if you enable or disable these, so just leaving them * out for this test case connectionFactory.setAlwaysSessionAsync(true); * connectionFactory.setAsyncDispatch(true); */ connectionFactory.setUseAsyncSend(true); + connectionFactory.setSendTimeout(SEND_TIMEOUT_MILLIS); } @Override @@ -124,9 +137,13 @@ public void test2ProducerTestCase() { assertNotNull(msg); msg.setString("key1", "value1"); int loop; + long start = System.currentTimeMillis(); for (loop = 0; loop < MESSAGE_COUNT; loop++) { msg.setInt("key2", loop); publisher.send(msg, DELIVERY_MODE, DELIVERY_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + if (System.currentTimeMillis() - start > SEND_LOOP_TIMEOUT_MILLIS) { + throw new AssertionFailedError("Timed out sending messages at loop: " + loop); + } if (loop % 5000 == 0) { LOG.info("Sent " + loop + " messages"); } @@ -163,7 +180,10 @@ public void test3CreateSubscription() throws Exception { assertNotNull(subscriber); int loop; for (loop = 0; loop < MESSAGE_COUNT; loop++) { - subscriber.receive(); + Message message = subscriber.receive(RECEIVE_TIMEOUT_MILLIS); + if (message == null) { + throw new AssertionFailedError("Timed out waiting for message " + loop); + } if (loop % 500 == 0) { LOG.debug("Received " + loop + " messages"); }