diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index b05c3ec66d8..db046561a4a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -184,7 +184,7 @@ public class BrokerService implements Service { private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges // to other jms messaging systems private boolean deleteAllMessagesOnStartup; - private boolean advisorySupport = true; + private boolean advisorySupport = false; private boolean anonymousProducerAdvisorySupport = false; private URI vmConnectorURI; private String defaultSocketURIString; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 9079e24d676..6a67ea7f691 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -192,6 +192,7 @@ public TransportConnection(TransportConnector connector, final Transport transpo @Override public void onCommand(Object o) { serviceLock.readLock().lock(); + System.out.println("[PUB_PATH] onCommand " + o.getClass().getName()); try { if (!(o instanceof Command)) { throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); @@ -332,6 +333,7 @@ public Response service(Command command) { Response response = null; boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); + System.out.println("[PUB_PATH] got commandId: " + commandId); try { if (status.get() != PENDING_STOP) { response = command.visit(this); @@ -576,6 +578,9 @@ public Response processRecoverTransactions(TransactionInfo info) throws Exceptio @Override public Response processMessage(Message messageSend) throws Exception { + // [PUB_PATH] + System.out.println("[PUB_PATH] in processMessage"); + System.out.println(messageSend.toString()); ProducerId producerId = messageSend.getProducerId(); ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); if (producerExchange.canDispatch(messageSend)) { @@ -628,6 +633,7 @@ public Response processRemoveDestination(DestinationInfo info) throws Exception @Override public Response processAddProducer(ProducerInfo info) throws Exception { + System.out.println("[PUB_PATH] in processAddProducer]"); SessionId sessionId = info.getProducerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); @@ -683,6 +689,7 @@ public Response processRemoveProducer(ProducerId id) throws Exception { @Override public Response processAddConsumer(ConsumerInfo info) throws Exception { + System.out.println("[SUB_PATH] in processAddConsumer]"); SessionId sessionId = info.getConsumerId().getParentId(); ConnectionId connectionId = sessionId.getParentId(); TransportConnectionState cs = lookupConnectionState(connectionId); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java index 78ab04cc703..8257a50f408 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java @@ -231,7 +231,7 @@ public void onAccept(final Transport transport) { public void run() { try { if (!brokerService.isStopping()) { - Connection connection = createConnection(transport); + Connection connection = createConnection(transport); // [PUB_PATH] connection.start(); } else { throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index af77b1d4498..b56e198b9a3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -143,7 +143,7 @@ public void stop() throws Exception { @Override public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception { - + System.out.println("[SUB_PATH] AbstractRegion addDesitnation: " + destination.toString()); destinationsLock.writeLock().lock(); try { Destination dest = destinations.get(destination); @@ -154,6 +154,7 @@ public Destination addDestination(ConnectionContext context, ActiveMQDestination validateMaxDestinations(destination); LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination); + System.out.println("[SUB_PATH] AbstractRegion addDestination: creating destination: " + destination.toString()); dest = createDestination(context, destination); // intercept if there is a valid interceptor defined DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor(); @@ -238,6 +239,7 @@ protected void validateMaxDestinations(ActiveMQDestination destination) protected List addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { List rc = new ArrayList(); // Add all consumers that are interested in the destination. + System.out.println("[SUB_PATH] AbstractRegion addSubscriptionsForDestination: " + dest.toString()); for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = iter.next(); if (sub.matches(dest.getActiveMQDestination())) { @@ -342,10 +344,11 @@ public Map getDestinationMap() { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { LOG.debug("{} adding consumer: {} for destination: {}", broker.getBrokerName(), info.getConsumerId(), info.getDestination()); + System.out.println("[SUB_PATH] in AbstractRegion addConsumer " + info.getSubscriptionName() + " " + info.getDestination()); ActiveMQDestination destination = info.getDestination(); if (destination != null && !destination.isPattern() && !destination.isComposite()) { // lets auto-create the destination - lookup(context, destination,true); + lookup(context, destination, true); } Object addGuard; @@ -379,7 +382,9 @@ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) th // DestinationFilter.parseFilter(info.getDestination()); + System.out.println("[SUB_PATH] AbstractRegion addConsumer creating subscription"); Subscription sub = createSubscription(context, info); + System.out.println("[SUB_PATH] created subscription: " + sub.toString()); // At this point we're done directly manipulating subscriptions, // but we need to retain the synchronized block here. Consider @@ -404,8 +409,9 @@ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) th List removeList = new ArrayList(); for (Destination dest : addList) { + System.out.println("[SUB_PATH] adding subscription to destination: " + dest.toString()); try { - dest.addSubscription(context, sub); + dest.addSubscription(context, sub); // This is logic that added the subscription removeList.add(dest); } catch (SecurityException e){ if (sub.isWildcard()) { @@ -503,13 +509,14 @@ public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo @Override public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + System.out.println("[PUB_PATH] AbstractRegion send"); final ConnectionContext context = producerExchange.getConnectionContext(); if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) { final Destination regionDestination = lookup(context, messageSend.getDestination(),false); producerExchange.setRegionDestination(regionDestination); } - + System.out.println("[PUB_PATH] AbstractRegion send destination: " + producerExchange.getRegionDestination().toString()); producerExchange.getRegionDestination().send(producerExchange, messageSend); if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){ @@ -551,7 +558,7 @@ protected Destination lookup(ConnectionContext context, ActiveMQDestination dest protected Destination lookup(ConnectionContext context, ActiveMQDestination destination, boolean createTemporary, boolean autoCreate) throws Exception { Destination dest = null; - + System.out.println("[SUB_PATH] AbstractRegion in lookup for desitination: " + destination.toString()); destinationsLock.readLock().lock(); try { dest = destinations.get(destination); @@ -564,7 +571,9 @@ protected Destination lookup(ConnectionContext context, ActiveMQDestination dest // Try to auto create the destination... re-invoke broker // from the // top so that the proper security checks are performed. + System.out.println("[SUB_PATH] AbstractRegion lookup: creating destination"); dest = context.getBroker().addDestination(context, destination, createTemporary); + System.out.println("[SUB_PATH] AbstractRegion lookup: done create destination"); } if (dest == null) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index e34f23a4dda..7b150ec19ce 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -644,6 +644,7 @@ public boolean isDisposed() { * some way - such as to send to a dead letter queue or something.. */ protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { + System.out.println("[PUB_PATH] onMessageWithNoConsumers"); if (!msg.isPersistent()) { if (isSendAdvisoryIfNoConsumers()) { // allow messages with no consumers to be dispatched to a dead diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 6946a33fa57..de039f97357 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -62,6 +62,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us private final AtomicBoolean active = new AtomicBoolean(); private final AtomicLong offlineTimestamp = new AtomicLong(-1); private final HashSet ackedAndPrepared = new HashSet(); + private boolean shared = false; public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws JMSException { @@ -115,6 +116,7 @@ protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDis @Override public void add(ConnectionContext context, Destination destination) throws Exception { + System.out.println("[SUB_PATH] DurableTopicSubscription add: " + destination.toString()); if (!destinations.contains(destination)) { super.add(context, destination); } @@ -381,7 +383,7 @@ public void afterRollback() throws Exception { public synchronized String toString() { return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() - + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension(); + + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension() + ", shared: " + isShared(); } public SubscriptionKey getSubscriptionKey() { @@ -438,4 +440,12 @@ public boolean isKeepDurableSubsActive() { public boolean isEnableMessageExpirationOnActiveDurableSubs() { return enableMessageExpirationOnActiveDurableSubs; } + + public boolean isShared() { + return shared; + } + + public void setShared(boolean shared) { + this.shared = shared; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index d59717444c7..3d995087a26 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -622,6 +622,7 @@ private void addReferencesAndUpdateRedispatch(LinkedList redis // made public so it can be used in MQTTProtocolConverter public void dispatchPending() throws IOException { + System.out.println("[PUB_PATH] PrefetchSubscription dispatch pending"); List slowConsumerTargets = null; synchronized(pendingLock) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 844fa120299..be7ed418f0a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -204,6 +204,7 @@ protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFact } protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { + System.out.println("[PUB_SUB] create topic region"); return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } @@ -427,9 +428,12 @@ public void removeProducer(ConnectionContext context, ProducerInfo info) throws @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + System.out.println("[SUB_PATH] Add a consumer in RegionBroker"); ActiveMQDestination destination = info.getDestination(); if (destinationInterceptor != null) { + System.out.println("[SUB_PATH] RegionBroker creating destinaiton incerceptor for destination: " + destination.toString()); destinationInterceptor.create(this, context, destination); + System.out.println("[SUB_PATH] RegionBroker done creating destinaiton incerceptor for destination: " + destination.toString()); } inactiveDestinationsPurgeLock.readLock().lock(); try { @@ -462,8 +466,9 @@ public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo } @Override - public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { // [PUB_PATH] ActiveMQDestination destination = message.getDestination(); + System.out.println("[PUB_PATH] RegionBroker.send: " + destination.toString()); message.setBrokerInTime(System.currentTimeMillis()); if (producerExchange.isMutable() || producerExchange.getRegion() == null || (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/SharedDurableTopicSubscriptionMetadata.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SharedDurableTopicSubscriptionMetadata.java new file mode 100644 index 00000000000..39a26000269 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SharedDurableTopicSubscriptionMetadata.java @@ -0,0 +1,46 @@ +package org.apache.activemq.broker.region; + +import org.apache.activemq.util.SubscriptionKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class SharedDurableTopicSubscriptionMetadata { + private static final Logger LOG = LoggerFactory.getLogger(SharedDurableTopicSubscriptionMetadata.class); + + private String subscriptionName; + private ArrayList subKeys = new ArrayList<>(); + private ConcurrentMap subMap = new ConcurrentHashMap(); + private int counter; + + public SharedDurableTopicSubscriptionMetadata(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public void addDurableTopicSubscription(SubscriptionKey key, DurableTopicSubscription subscription) { + if (!subMap.containsKey(key)) { + subKeys.add(key); + subMap.put(key, subscription); + } + } + + public void removeDurableTopicSubscription(SubscriptionKey key, DurableTopicSubscription subscription) { + if (subMap.containsKey(key)) { + subMap.remove(key); + subKeys.remove(key); + } + } + + public DurableTopicSubscription getNextDurableTopicSubscription() { + int index = counter % subKeys.size(); + counter++; + return subMap.get(subKeys.get(index)); + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/SharedTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SharedTopicSubscription.java new file mode 100644 index 00000000000..579de66c717 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/SharedTopicSubscription.java @@ -0,0 +1,93 @@ +package org.apache.activemq.broker.region; + +import jakarta.jms.InvalidSelectorException; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.Response; +import org.apache.activemq.usage.UsageListener; + +public class SharedTopicSubscription extends AbstractSubscription { + + public SharedTopicSubscription(Broker broker, ConnectionContext context, + ConsumerInfo info) throws InvalidSelectorException { + super(broker, context, info); + } + + @Override + public void add(MessageReference node) throws Exception { + + } + + @Override + public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { + return null; + } + + @Override + public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { + + } + + @Override + public int getPendingQueueSize() { + return 0; + } + + @Override + public long getPendingMessageSize() { + return 0; + } + + @Override + public int getDispatchedQueueSize() { + return 0; + } + + @Override + public long getDispatchedCounter() { + return 0; + } + + @Override + public long getEnqueueCounter() { + return 0; + } + + @Override + public long getDequeueCounter() { + return 0; + } + + @Override + public boolean isLowWaterMark() { + return false; + } + + @Override + public boolean isHighWaterMark() { + return false; + } + + @Override + public boolean isFull() { + return false; + } + + @Override + public void updateConsumerPrefetch(int newPrefetch) { + + } + + @Override + public void destroy() { + + } + + @Override + public int getInFlightSize() { + return 0; + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index a9e07874e05..fe41f091127 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -36,6 +36,7 @@ import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.SharedSubscriptionDispatchPolicy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; import org.apache.activemq.broker.util.InsertionCountList; @@ -79,8 +80,11 @@ public class Topic extends BaseDestination implements Task { protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock(); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); + private SharedSubscriptionDispatchPolicy sharedSubscriptionDispatchPolicy = new SharedSubscriptionDispatchPolicy(); private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private final ConcurrentMap durableSubscribers = new ConcurrentHashMap(); + private final ConcurrentMap sharedDurableSubscribers = new ConcurrentHashMap<>(); + protected final ConcurrentMap sharedDurableSubscriptionMetadataMap = new ConcurrentHashMap(); private final TaskRunner taskRunner; private final TaskRunnerFactory taskRunnerFactor; private final LinkedList messagesWaitingForSpace = new LinkedList(); @@ -133,8 +137,9 @@ public boolean lock(MessageReference node, LockOwner sub) { @Override public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception { + System.out.println("[SUB_PATH] Topic addSubscription"); if (!sub.getConsumerInfo().isDurable()) { - + // Non durable // Do a retroactive recovery if needed. if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) { @@ -146,6 +151,7 @@ public void addSubscription(ConnectionContext context, final Subscription sub) t synchronized (consumers) { if (!consumers.contains(sub)){ sub.add(context, this); + System.out.println("[SUB_PATH] Topc add to consumers: line 152"); consumers.add(sub); applyRecovery=true; super.addSubscription(context, sub); @@ -162,39 +168,58 @@ public void addSubscription(ConnectionContext context, final Subscription sub) t synchronized (consumers) { if (!consumers.contains(sub)){ sub.add(context, this); + System.out.println("[SUB_PATH] Topc add to consumers: line 169"); consumers.add(sub); super.addSubscription(context, sub); } } } } else { + System.out.println("[SUB_PATH] Topic addSubscription: durable"); DurableTopicSubscription dsub = (DurableTopicSubscription) sub; super.addSubscription(context, sub); sub.add(context, this); if(dsub.isActive()) { - synchronized (consumers) { - boolean hasSubscription = false; - - if (consumers.size() == 0) { - hasSubscription = false; + if (dsub.isShared()) { + System.out.println("[SUB_PATH] Topic addSubscription: adding shared durable subscription"); + sharedDurableSubscribers.put(dsub.getSubscriptionKey(), dsub); + String subscriptionName = dsub.getConsumerInfo().getSubscriptionName(); + SharedDurableTopicSubscriptionMetadata sharedDurableTopicSubscriptionMetadata = null; + if (sharedDurableSubscriptionMetadataMap.containsKey(subscriptionName)) { + sharedDurableTopicSubscriptionMetadata = sharedDurableSubscriptionMetadataMap.get(subscriptionName); } else { - for (Subscription currentSub : consumers) { - if (currentSub.getConsumerInfo().isDurable()) { - DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; - if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { - hasSubscription = true; - break; + sharedDurableTopicSubscriptionMetadata = new SharedDurableTopicSubscriptionMetadata(subscriptionName); + sharedDurableSubscriptionMetadataMap.put(subscriptionName, sharedDurableTopicSubscriptionMetadata); + } + sharedDurableTopicSubscriptionMetadata.addDurableTopicSubscription(dsub.getSubscriptionKey(), dsub); + sharedDurableSubscribers.put(dsub.getSubscriptionKey(), dsub); + } else { + System.out.println("[SUB_PATH] Topic addSubscription: adding non-shared durable subscription"); + synchronized (consumers) { + boolean hasSubscription = false; + + if (consumers.size() == 0) { + hasSubscription = false; + } else { + for (Subscription currentSub : consumers) { + if (currentSub.getConsumerInfo().isDurable()) { + DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; + if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { + hasSubscription = true; + break; + } } } } - } - if (!hasSubscription) { - consumers.add(sub); + if (!hasSubscription) { + System.out.println("[SUB_PATH] Topc add to consumers: line 214"); + consumers.add(sub); + } } + durableSubscribers.put(dsub.getSubscriptionKey(), dsub); } } - durableSubscribers.put(dsub.getSubscriptionKey(), dsub); } } @@ -258,12 +283,14 @@ private boolean hasSelectorChanged(SubscriptionInfo info1, ConsumerInfo info2) { } public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception { + System.out.println("[PUB_PATH] Topic activate"); // synchronize with dispatch method so that no new messages are sent // while we are recovering a subscription to avoid out of order messages. dispatchLock.writeLock().lock(); try { if (topicStore == null) { + System.out.println("[PUB_PATH] Topic topicStore == null"); return; } @@ -274,6 +301,7 @@ public void activate(ConnectionContext context, final DurableTopicSubscription s if (info != null) { // Check to see if selector changed. if (hasDurableSubChanged(info, subscription.getConsumerInfo())) { + System.out.println("[SUB_PATH] Topic activate: hasDurableSubchanged true: adding stuff"); // Need to delete the subscription topicStore.deleteSubscription(clientId, subscriptionName); info = null; @@ -285,9 +313,24 @@ public void activate(ConnectionContext context, final DurableTopicSubscription s consumers.remove(subscription); } } else { - synchronized (consumers) { - if (!consumers.contains(subscription)) { - consumers.add(subscription); + if (subscription.isShared()) { + System.out.println("[SUB_PATH] Topic activate: adding stuff: shared"); + SharedDurableTopicSubscriptionMetadata sharedDurableTopicSubscriptionMetadata = null; + if (sharedDurableSubscriptionMetadataMap.containsKey(subscriptionName)) { + sharedDurableTopicSubscriptionMetadata = sharedDurableSubscriptionMetadataMap.get(subscriptionName); + } else { + sharedDurableTopicSubscriptionMetadata = new SharedDurableTopicSubscriptionMetadata(subscriptionName); + sharedDurableSubscriptionMetadataMap.put(subscriptionName, sharedDurableTopicSubscriptionMetadata); + } + sharedDurableTopicSubscriptionMetadata.addDurableTopicSubscription(subscription.getSubscriptionKey(), subscription); + sharedDurableSubscribers.put(subscription.getSubscriptionKey(), subscription); + } else { + System.out.println("[SUB_PATH] Topic activate: adding stuff: non-shared"); + synchronized (consumers) { + if (!consumers.contains(subscription)) { + System.out.println("[SUB_PATH] Topc add to consumers: line 326"); + consumers.add(subscription); + } } } } @@ -295,6 +338,8 @@ public void activate(ConnectionContext context, final DurableTopicSubscription s // Do we need to create the subscription? if (info == null) { + System.out.println("[SUB_PATH] Topic activate: we need to create the subscription"); + System.out.println("[SUB_PATH] Topic activate: subscription: " + subscription.toString()); info = new SubscriptionInfo(); info.setClientId(clientId); info.setSelector(subscription.getConsumerInfo().getSelector()); @@ -304,9 +349,23 @@ public void activate(ConnectionContext context, final DurableTopicSubscription s // This destination is an actual destination id. info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This destination might be a pattern - synchronized (consumers) { - consumers.add(subscription); - topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); + if (subscription.isShared()) { + System.out.println("[SUB_PATH] Topc add to shared subscription metadata: line 351"); + SharedDurableTopicSubscriptionMetadata sharedDurableTopicSubscriptionMetadata = null; + if (sharedDurableSubscriptionMetadataMap.containsKey(subscriptionName)) { + sharedDurableTopicSubscriptionMetadata = sharedDurableSubscriptionMetadataMap.get(subscriptionName); + } else { + sharedDurableTopicSubscriptionMetadata = new SharedDurableTopicSubscriptionMetadata(subscriptionName); + sharedDurableSubscriptionMetadataMap.put(subscriptionName, sharedDurableTopicSubscriptionMetadata); + } + sharedDurableTopicSubscriptionMetadata.addDurableTopicSubscription(subscription.getSubscriptionKey(), subscription); + sharedDurableSubscribers.put(subscription.getSubscriptionKey(), subscription); + } else { + synchronized (consumers) { + System.out.println("[SUB_PATH] Topc add to consumers: line 347"); + consumers.add(subscription); + topicStore.addSubscription(info, subscription.getConsumerInfo().isRetroactive()); + } } } @@ -364,6 +423,7 @@ public void recoverRetroactiveMessages(ConnectionContext context, Subscription s @Override public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { + System.out.println("[PUB_PATH] in Topic"); final ConnectionContext context = producerExchange.getConnectionContext(); final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); @@ -371,6 +431,7 @@ public void send(final ProducerBrokerExchange producerExchange, final Message me final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode(); + System.out.println(destination.getPhysicalName()); message.setRegionDestination(this); if(getMessageInterceptorStrategy() != null) { @@ -527,6 +588,7 @@ public void run() { */ synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { + System.out.println("[PUB_PATH] in Topic doMessageSend"); final ConnectionContext context = producerExchange.getConnectionContext(); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); Future result = null; @@ -580,7 +642,7 @@ public void afterRollback() throws Exception { } else { try { - dispatch(context, message); + dispatch(context, message); // dispatching to the subscription } finally { message.decrementReferenceCount(); } @@ -777,6 +839,7 @@ protected void dispatch(final ConnectionContext context, Message message) throws // AMQ-2586: Better to leave this stat at zero than to give the user // misleading metrics. // destinationStatistics.getMessages().increment(); + System.out.println("[PUB_PATH] In topic dispatch"); destinationStatistics.getEnqueues().increment(); if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) { @@ -792,7 +855,7 @@ protected void dispatch(final ConnectionContext context, Message message) throws return; } synchronized (consumers) { - if (consumers.isEmpty()) { + if (consumers.isEmpty() && sharedDurableSubscriptionMetadataMap.isEmpty()) { onMessageWithNoConsumers(context, message); return; } @@ -807,7 +870,20 @@ protected void dispatch(final ConnectionContext context, Message message) throws msgContext = context.getMessageEvaluationContext(); msgContext.setDestination(destination); msgContext.setMessageReference(message); + System.out.println("[PUB_PATH] Topic dispatching: dispatching"); + for (Subscription c : consumers) { + System.out.println("[PUB_PATH] Topic dispatching: consumer: " + c.toString()); + } + boolean no_consumers = false; + boolean no_shared_consumers = false; if (!dispatchPolicy.dispatch(message, msgContext, consumers)) { + no_consumers = true; + } + // Dispatch to shared + if (!sharedSubscriptionDispatchPolicy.dispatch(message, msgContext, sharedDurableSubscriptionMetadataMap)) { + no_shared_consumers = true; + } + if (no_consumers && no_shared_consumers) { onMessageWithNoConsumers(context, message); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 68b01f4e07a..02b6cde3fb9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -57,6 +57,8 @@ public class TopicRegion extends AbstractRegion { private static final Logger LOG = LoggerFactory.getLogger(TopicRegion.class); protected final ConcurrentMap durableSubscriptions = new ConcurrentHashMap(); + protected final ConcurrentMap sharedDurableSubscriptions = new ConcurrentHashMap<>(); + protected final ConcurrentMap sharedDurableSubscriptionMetadataMap = new ConcurrentHashMap(); private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator(); private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId()); private boolean keepDurableSubsActive; @@ -120,7 +122,32 @@ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) th ActiveMQDestination destination = info.getDestination(); if (!destination.isPattern()) { // Make sure the destination is created. - lookup(context, destination,true); + System.out.println("[SUB_PATH] TopicRegion addConsumer: looking up destination"); + lookup(context, destination, true); + System.out.println("[SUB_PATH] TopicRegion addConsumer: done looking up destination"); + } + if (info.isShared()) { + System.out.println("[SUB_PATH] TopicRegion addConsumer shared durable"); + String clientId = context.getClientId(); + String subscriptionName = info.getSubscriptionName(); + DurableTopicSubscription dsub = sharedDurableSubscriptions.get(new SubscriptionKey(clientId, subscriptionName)); + if (dsub != null) { + return dsub; + } + SharedDurableTopicSubscriptionMetadata sharedDurableTopicSubscriptionMetadata = sharedDurableSubscriptionMetadataMap.get(subscriptionName); + if (sharedDurableTopicSubscriptionMetadata == null) { + sharedDurableTopicSubscriptionMetadata = new SharedDurableTopicSubscriptionMetadata(subscriptionName); + sharedDurableSubscriptionMetadataMap.put(subscriptionName, sharedDurableTopicSubscriptionMetadata); + } + super.addConsumer(context, info); + SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); + dsub = sharedDurableSubscriptions.get(key); + if (durableSubscriptions.get(key) != null) { + throw new JMSException("Shared durable subscription is accidentally added to durableSubscriptions"); + } + dsub.activate(usageManager, context, info, broker); + sharedDurableTopicSubscriptionMetadata.addDurableTopicSubscription(key, dsub); + return dsub; } String clientId = context.getClientId(); String subscriptionName = info.getSubscriptionName(); @@ -185,6 +212,7 @@ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) th sub.activate(usageManager, context, info, broker); return sub; } else { + System.out.println("[SUB_PATH] in TopicRegion adding non-durable consumer " + info.getSubscriptionName()); return super.addConsumer(context, info); } } @@ -256,6 +284,7 @@ protected List addSubscriptionsForDestination(ConnectionContext co Set dupChecker = new HashSet(rc); TopicMessageStore store = (TopicMessageStore)dest.getMessageStore(); + System.out.println("[SUB_PATH] TopicRegion addSubscriptionsForDestination: " + dest.toString()); // Eagerly recover the durable subscriptions if (store != null) { SubscriptionInfo[] infos = store.getAllSubscriptions(); @@ -336,43 +365,69 @@ protected Subscription createSubscription(ConnectionContext context, ConsumerInf if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { throw new JMSException("Cannot create a durable subscription for an advisory Topic"); } - SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); - DurableTopicSubscription sub = durableSubscriptions.get(key); - - if (sub == null) { + if (info.isShared()) { + System.out.println("[SUB_PATH] TopicRegion createSubscription: creating shared durable"); + SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); + DurableTopicSubscription sub = sharedDurableSubscriptions.get(key); - sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); - - if (destination != null && broker.getDestinationPolicy() != null) { - PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); - if (entry != null) { - entry.configure(broker, usageManager, sub); + if (sub == null) { + sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); + sub.setShared(true); + if (destination != null && broker.getDestinationPolicy() != null) { + PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); + if (entry != null) { + entry.configure(broker, usageManager, sub); + } } + sharedDurableSubscriptions.put(key, sub); + } else { + throw new JMSException("Durable subscription is already active for clientID: " + + context.getClientId() + " and subscriptionName: " + + info.getSubscriptionName()); } - durableSubscriptions.put(key, sub); + return sub; } else { - throw new JMSException("Durable subscription is already active for clientID: " + - context.getClientId() + " and subscriptionName: " + - info.getSubscriptionName()); + SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); + DurableTopicSubscription sub = durableSubscriptions.get(key); + + if (sub == null) { + sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); + if (destination != null && broker.getDestinationPolicy() != null) { + PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); + if (entry != null) { + entry.configure(broker, usageManager, sub); + } + } + durableSubscriptions.put(key, sub); + } else { + throw new JMSException("Durable subscription is already active for clientID: " + + context.getClientId() + " and subscriptionName: " + + info.getSubscriptionName()); + } + return sub; } - return sub; } - try { - TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager); - // lets configure the subscription depending on the destination - if (destination != null && broker.getDestinationPolicy() != null) { - PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); - if (entry != null) { - entry.configure(broker, usageManager, answer); + // Non Durable path + if (info.isShared()){ + throw new JMSException("Creating shared non-durable subscription not implemented"); + } else { + try { + TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager); + // lets configure the subscription depending on the destination + if (destination != null && broker.getDestinationPolicy() != null) { + PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); + if (entry != null) { + entry.configure(broker, usageManager, answer); + } } + answer.init(); + return answer; + } catch (Exception e) { + LOG.debug("Failed to create TopicSubscription ", e); + JMSException jmsEx = new JMSException("Couldn't create TopicSubscription"); + jmsEx.setLinkedException(e); + throw jmsEx; } - answer.init(); - return answer; - } catch (Exception e) { - LOG.debug("Failed to create TopicSubscription ", e); - JMSException jmsEx = new JMSException("Couldn't create TopicSubscription"); - jmsEx.setLinkedException(e); - throw jmsEx; } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedSubscriptionDispatchPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedSubscriptionDispatchPolicy.java new file mode 100644 index 00000000000..fc4bba6bcc4 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SharedSubscriptionDispatchPolicy.java @@ -0,0 +1,35 @@ +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.SharedDurableTopicSubscriptionMetadata; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.filter.MessageEvaluationContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +public class SharedSubscriptionDispatchPolicy { + public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, ConcurrentMap metadataMap) + throws Exception { + + // Round-robin dispatching + System.out.println("[PUB_PATH] In SharedSubscriptionDispatchPolicy"); + int count = 0; + + for (String key : metadataMap.keySet()) { + System.out.println("[PUB_PATH] SharedSubscriptionDispatchPolicy: subscription name " + key); + SharedDurableTopicSubscriptionMetadata metadata = metadataMap.get(key); + Subscription sub = metadata.getNextDurableTopicSubscription(); + if (!sub.matches(node, msgContext)) { + System.out.println("[PUB_PATH] In SharedSubscriptionDispatchPolicy no match"); + sub.unmatched(node); + continue; + } + sub.add(node); + count++; + } + + return count > 0; + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java index 7d8e2f073bb..9d2ff35be2c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchPolicy.java @@ -33,6 +33,8 @@ public class SimpleDispatchPolicy implements DispatchPolicy { public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List consumers) throws Exception { + // Round-robin dispatching + System.out.println("[PUB_PATH] In SimpleDispatchPolicy"); int count = 0; for (Subscription sub : consumers) { // Don't deliver to browsers @@ -44,7 +46,7 @@ public boolean dispatch(MessageReference node, MessageEvaluationContext msgConte sub.unmatched(node); continue; } - + System.out.println("[PUB_PATH] SimpleDispatchPolicy distribute message " + node.toString() + " to " + sub.toString()); sub.add(node); count++; } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index a91349b28b2..681ef9de5eb 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -154,7 +154,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean exclusiveConsumer; private boolean alwaysSyncSend; private int closeTimeout = 15000; - private boolean watchTopicAdvisories = true; + private boolean watchTopicAdvisories = false; private long warnAboutUnstartedConnectionTimeout = 500L; private int sendTimeout =0; private boolean sendAcksAsync=true; diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java index eec3c5cbd01..21948e512de 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java @@ -117,6 +117,12 @@ public boolean isDurable() { return subscriptionName != null; } + public boolean isShared() { + // HACK for now since the OpenWire generator is broken + boolean isShared = subscriptionName != null && subscriptionName.equals("test-shared-subscription"); + return subscriptionName != null && subscriptionName.equals("test-shared-subscription"); + } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java index a49ff6f5a4d..025f05a3c69 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java @@ -40,6 +40,7 @@ public class ConsumerThread extends Thread { boolean running = false; CountDownLatch finished; boolean bytesAsText; + boolean shared; public ConsumerThread(Session session, Destination destination) { this.destination = destination; @@ -54,7 +55,9 @@ public void run() { LOG.info(threadName + " wait until " + messageCount + " messages are consumed"); try { if (durable && destination instanceof Topic) { - consumer = session.createDurableSubscriber((Topic) destination, getName()); + String subscriptionName = shared ? "test-shared-subscription" : getName(); + System.out.println("In consumer thread, the new name is: " + subscriptionName); + consumer = session.createDurableSubscriber((Topic) destination, subscriptionName); } else { consumer = session.createConsumer(destination); } @@ -185,4 +188,8 @@ public boolean isBytesAsText() { public void setBytesAsText(boolean bytesAsText) { this.bytesAsText = bytesAsText; } + + public void setShared(boolean shared) { + this.shared = shared; + } } diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java index 2c8e042400a..cf50c0f5a55 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ConsumerCommand.java @@ -43,6 +43,7 @@ public class ConsumerCommand extends AbstractCommand { int ackMode = Session.AUTO_ACKNOWLEDGE; int parallelThreads = 1; boolean bytesAsText; + boolean shared; @Override protected void runTask(List tokens) throws Exception { @@ -54,11 +55,13 @@ protected void runTask(List tokens) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); Connection conn = null; try { + System.out.println("[SUB_PATH] Connecting to " + brokerUrl); conn = factory.createConnection(user, password); if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) { conn.setClientID(clientId); } conn.start(); + System.out.println("[SUB_PATH] Connected"); CountDownLatch active = new CountDownLatch(parallelThreads); @@ -79,6 +82,7 @@ protected void runTask(List tokens) throws Exception { consumer.setBatchSize(batchSize); consumer.setFinished(active); consumer.setBytesAsText(bytesAsText); + consumer.setShared(shared); consumer.start(); } @@ -216,4 +220,12 @@ public String getName() { public String getOneLineDescription() { return "Receives messages from the broker"; } + + public void setShared(boolean shared) { + this.shared = shared; + } + + public boolean isShared() { + return shared; + } } diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/design_doc.md b/docs/proposal/jakarta_messaging_3_1_shared_subscription/design_doc.md new file mode 100644 index 00000000000..d128ffa33a1 --- /dev/null +++ b/docs/proposal/jakarta_messaging_3_1_shared_subscription/design_doc.md @@ -0,0 +1,184 @@ +# JMS 2.0 / Jakarta Messaging 3.1: Shared Subscriptions + +## Problem statement + +JMS 2.0 / Jakarta Messaging 3.1 introduced a new set of API for creating shared topic subscriptions that is not yet supported by ActiveMQ Classic. Even though the functionality can be achieved using virtual topics in ActiveMQ Classic, to be Jakarta Messaging 3.1 compliant, we will need to support that API. This document proposes several designs and evaluates their tradeoffs. + +## Background + +A JMS durable subscriber MessageConsumer is created with a unique JMS clientID and durable subscriber name. In JMS 1.1 only one JMS connection can be active at any point in time for one JMS clientID, and only one consumer can be active for a clientID and subscriber name. i.e., only one thread can be actively consuming from a given logical topic subscriber. This limits load balancing of messages or fast failover of the subscriber if that one process running that one consumer thread dies. + +Virtual topics were introduced to mitigate these issues. The idea behind virtual topics is that producers send to a topic in the usual JMS way. Consumers can continue to use the Topic semantics in the JMS specification. However if the topic is virtual, consumers can consume from a physical queue for a logical topic subscription, allowing many consumers to be running on many machines & threads to balance the load. + +JMS 2.0 and Jakarta 3.1 support shared subscriptions which enable durable and non-durable topic subscriptions that can be shared between any number of consumers. This allows consumers to share the work of consuming messages from a Topic which improves performance and reliability. This is a simpler solution than Virtual Topics as consumers can directly subscribe to the topic and not to underlying queues + +## High level design approach + +### Option 1: Reuse as much of the virtual topics implementation as possible. + +This approach will implement shared subscription as a wrapper of virtual topics. + +For shared subscriptions, extra logic would be needed to manage the consumer subscription to the queue as the queue internals should not be exposed to the users. An underlying queue could be created for each shared subscription on invocation of *`createSharedDurableConnectionConsumer`*. A mapping would be needed to identify which shared subscription maps to an underlying queue on the virtual topic(perhaps in *`QueueRegion`*). For example when the *`addConsumer`* method of *`AbstractRegion`* is invoked, a new queue would need to be created and the mapping from the subscription key to this queue created. When new shared subscriptions for the consumer are created then the mapping would ensure the subscription is created on the underlying queue for that subscription key. +A *`VirtualTopicInterceptor`* overrides *`send`* to ensure messages get delivered to all destinations matching a wildcard. For shared subscriptions an interceptor could be created which sends messages to only one destination in a group of destinations matching a wildcard. This could be achieved in a round robin fashion. + +**Pros:** + +- Reuses logic from existing virtual topics and stays in line with existing solutions. + +**Cons:** + +- This may not be solving the problem at the right level. Shared subscriptions are applied at a subscription level, not a destination level. +- Extra logic is needed to correlate a shared subscription to a queue. +- The underlying queues would show in the ActiveMQ admin console. This may be confusing for users. It may also require protection to ensure these queues are not deleted or purged. + +### Option 2 **\[Recommended\]**: Handling shared subscription logic separately + +There already exists logic for effectively managing subscriptions in the *`DurableTopicSubscription`* and *`TopicSubscription`* classes. The main difference between what exists here and what is needed for shared subscriptions is the ability for the class to act as a composite subscription. The *`DurableTopicSubscription`* and *`TopicSubscription`* classes have already been optimized to use cursors to effectively page in messages for delivery. Messages could be dispatched from the *`SharedDurableTopicSubscription`* cursor to one of the cursors of the underlying subscriptions via round robin. A map from subscription key to shared subscription would be needed in the *`TopicRegion.`* This is for mappings and also to perform validation checks on the shared subscriptions. This could be managed in a similar way to the existing *`ConcurrentMap durableSubscriptions`* map. + +![option_2](img/img_1.png) + +**Pros:** + +- Cursors reduce the impact on RAM for in transit messages. +- Messages can be passed from producer to consumers directly if they are fast. + +**Cons:** + +- Implements a similar offering as virtual topics in a very different way. + +## Detailed Design + +Option 2 is recommended and in this section we will focus on implementation design detail of option 2, handling shared subscription logic separately. The detailed design is divided into 7 categories. + +### 1\. Shared durable subscription creation + +When *`addConsumer`* is called on *`TopicRegion`* it uses the *`ConsumerInfo`* to determine if the subscription is durable. *`ConsumerInfo`* will need to be updated to include a new boolean *`shared`* property for determining if a consumer is also shared. *`TopicRegion`* maintains a *`ConcurrentMap`* called *`DurableSubscriptions`* which it checks for existing subscriptions and throws a *`JMSException`* if the subscription key is already in use. It will also have a *`ConcurrentMap`* called *`SharedSubscriptions`* which will be used to identify if a subscription is a shared subscription. This map can be updated when *`addConsumer`* invokes *`createSubscription`* on the *`TopicRegion`*. *`createSubscription`* will need to be updated with logic to create a new *`SharedDurableTopicSubscription`* or *`SharedNonDurableTopicSubscription`* . A new interface *`SharedTopicSubscription`* will extend the *`Subscription`* interface. It will be used to define methods for managing the shared consumers on a *`SharedSubscription`*. For example + +```java +public interface SharedTopicSubscription extends Subscription { + public Set getSharedConsumers(); + public void addSharedConsumer(Subscription subscription); + public void removeSharedConsumer(Subscription subscription); +} +``` +*`SharedDurableTopicSubscription`* and *`SharedNonDurableTopicSubscription`* will both implement the *`SharedTopicSubscription`* interface. + +![creation_of_shared_subscription](img/img_2.png) + +The *`DurableSubscriptions`* map on *`TopicRegion`* can be used to track if a shared subscription is also durable. *`addConsumer`* will also invoke its superclass *`AbstractRegion`* and will need to put the new *`SharedTopicSubscription`* to the *`Map`* map called *`subscriptions`*. This method will also call *`addSubscription`* on all relevant Topics to add the new *`SharedTopicSubscription`*. +The *`addConsumer`* method of *`TopicRegion`* can have additional checks for shared subscriptions. If an attempt is made to create a consumer which matches a subscription key in the *`SharedSubscriptions`* map then the request will be validated to ensure the same topic and message selector is used. Since shared subscriptions will also be added to the *`DurableSubscriptions`* map if durable, this map can be used to validate that a shared durable subscription and an unshared durable subscription may not have the same name and client identifier. + +![flow_chart](img/img_3.png) + +Link stealing allows a new client connection with the same client ID as an existing client to steal the connection. The existing client’s session will be closed and its network connection will be terminated. Link stealing will not be supported initially for shared consumers. + +The *`addSubscriptionsForDestination`* method of *`TopicRegion`* restores all durable subscriptions on a Topic when a connection is first added to that destination. A *`SharedDurableTopicSubscription`* should be restored as a *`DurableTopicSubscription`* by default. The *`addSubscriptionsForDestination`* method has logic to determine this based on the restored *`SubscriptionInfo`*. If a shared durable consumer is added using the same subscription key, then the *`DurableTopicSubscription`* can become a *`SharedDurableTopicSubscription`* as long as no active consumers exist. + +Cursors can be overridden in the activemq.xml configuration. If a user overrides the cursor for a *`SharedTopicSubscription`* then the subscription should still behave as expected as all cursors implement the *`PendingMessageCursor`* interface. For example if *` `* is used then the *`SharedTopicSubscription`* should use a *`fileCursor`* instead of a *`vmCursor`*. + +It may be possible for a rogue client to create an excessive number of concurrent consumers on a shared subscription. Hence it’s reasonable to expose a property that users can set which limits the number of consumers on a shared subscription. The default for this property could be \-1 denoting no limit. + + +### 2\. Message delivery to shared durable subscriptions + +A new class *`SharedDurableTopicSubscription`* is created which extends *`DurableTopicSubscription`* . This class can reuse a lot of the logic from a *`DurableTopicSubscription`* for persistent dispatch of messages. It holds a set of *`SharedSubscription`* which are responsible for distributing messages to the shared consumers. +*`SharedSubscription`* is a new class which extends *`PrefetchSubscription`* and implements abstract methods like *`acknowledge`*. +The *`SharedDurableTopicSubscription`* has a *`PendingMessageCursor`* which is a +*`StoreDurableSubscriberCursor`* containing messages for dispatch. Each of the *`SharedSubscriptions`* have their own *`VMPendingMessageCursor`* which contains messages to be dispatched to each consumer. The *`add`* method of *`SharedDurableTopicSubscription`* will add a message to its *`StoreDurableSubscriberCursor`* . It will then do a round robin to decide which subscription gets the message added to their *`VMPendingMessageCursor`*. + +![](img/img_4.png) + +The *`SharedSubscription`* will then invoke *`DispatchPending`* to dispatch messages to the relevant consumer via the appropriate transport connection. When messages are dispatched they are removed from the *`pending` `MessageCursor`* and added to a list of *`MessageReference`* called *`dispatched`*. The *`SharedDurableTopicSubscription`* will also move the dispatched message from it’s *`pending` `MessageCursor`* to it’s *`dispatched`* list. The *`dispatched`* list reflects messages that are dispatched to consumers and awaiting acknowledgement. Should a consumer fail to process the dispatched message, the message can move from *`dispatched`* back onto the *`pending` `MessageCursor`*. + +![](img/img_5.png) + +When *`acknowledge`* is invoked on the *`AbstractRegion`*(which *`TopicRegion`* extends) it gets the relevant subscription from the *`ConsumerBrokerExchange`*. In this case it would be a *`SharedSubscription`* to which the message was sent. The *`acknowledge(final ConnectionContext context,final MessageAck ack)`* method from *`PrefetchSubscription`*(which *`SharedSubscription`* extends) is called to remove the appropriate messages from the *`dispatched`* list. It would then invoke the overridden *`acknowledge`* method on *`SharedSubscription`* which could be used to call back to the *`acknowledge`* method on it’s parent *`SharedDurableTopicSubscription`* . + +![](img/img_6.png) + +When a consumer invokes *`pullMessage`* on a *`SharedSubscription`* then the *`prefetchExtension`* of the subscription is set to the pull quantity. *`dispatchPending`* is then triggered on the parent *`SharedDurableTopicSubscription`* doing a round robin on subscriptions that have space in their prefetch. This means that *`SharedSubscription`s* on other consumers may also receive the messages in pending. This is to maintain balanced message distribution across consumers. + +### 3\. Message delivery to shared non-durable subscriptions + +A new class *`SharedNonDurableTopicSubscription`* is created which extends *`TopicSubscription`* . This class can reuse a lot of the logic from a *`TopicSubscription`* for dispatch of messages in memory. Similar to *`SharedDurableTopicSubscription`*`,` it holds a set of *`SharedSubscription`* which are responsible for distributing messages to the shared consumers. +The *`SharedNonDurableTopicSubscription`* has a *`PendingMessageCursor`* which is a +*`VMPendingMessageCursor`* containing messages for dispatch. Each of the *`SharedSubscriptions`* have their own *`VMPendingMessageCursor`* which contains messages to be dispatched to each consumer. The *`add`* method of *`SharedDurableTopicSubscription`* will add a message to its *`VMPendingMessageCursor`* . It will then do a round robin to decide which subscription gets the message added to their *`VMPendingMessageCursor`*. + +![](img/img_7.png) + +The *`SharedSubscription`* will then invoke *`DispatchPending`* to dispatch messages to the relevant consumer via the appropriate transport connection. When messages are dispatched they are removed from the *`pending`* *`MessageCursor`* and added to a list of *`MessageReference`* called *`dispatched`*. The *`SharedNonDurableTopicSubscription`* will also move the dispatched message from its *`matched`* *`MessageCursor`* to its *`dispatched` `DispatchNode`* list. + +![](img/img_8.png) + +Similar to *`SharedDurableTopicSubscription`* the *`acknowledge(final ConnectionContext context,final MessageAck ack)`* method from *`PrefetchSubscription`*(which *`SharedSubscription`* extends) is called to remove the appropriate messages from the *`dispatched`* list. It would then invoke the overridden *`acknowledge`* method on *`SharedSubscription`* which could be used to call back to the *`acknowledge`* method on its parent *`SharedNonDurableTopicSubscription`* . + +![](img/img_9.png) + +When a consumer invokes *`pullMessage`* on a *`SharedSubscription`* then the *`prefetchExtension`* of the subscription is set to the pull quantity. *`dispatchMatched`* is then triggered on the parent *`SharedNonDurableTopicSubscription`* doing a round robin on subscriptions that have space in their prefetch. This means that *`SharedSubscription`s* on other consumers may also receive the messages in pending. This is to maintain balanced message distribution across consumers. + +![](img/img_10.png) + +### 4\. Shared durable subscription deletion + +When a *`TransportConnection`* stops, it invokes the *`removeConsumer`* method of *`TopicRegion`*. If the consumer matches belong to a shared subscription then it should be removed from the relevant *SharedDurableTopicSubscription*. + +![](img/img_11.png) + +A durable subscription will continue to accumulate messages until it is deleted using a *`RemoveSubscriptionInfo`* command. This will invoke *`removeSubscription`* on the *`TopicRegion`* which removes durable subscriptions from the *`durableSubscriptions`* map. If a durable subscription is still active then this will throw a *`JMSException`*. For a *`SharedDurableTopicSubscription`* it’s *`isActive`* method will check all contained *`SharedSubscription`s* to see if any of them are active. The *`SharedSubscriptions`* map should also be updated to remove the *`SharedTopicSubscription`*. The *`TopicRegion`* also calls *`deleteSubscription`* for every *`Topic`* in it’s *`destinations`* which deletes subscriptions from the *`TopicStore`*. Finally, *`removeConsumer`* is invoked on the superclass *`Abs`t`ractRegion`* which removes the consumer from the *`subscriptions`* map and calls *`removeSubscription`* on all destinations. + +![](img/img_12.png) + +### 5\. Shared non-durable subscription deletion + +A shared non-durable subscription will be deleted when the last consumer on the subscription is closed. When a *`TransportConnection`* stops, it invokes the *`removeConsumer`* method of *`TopicRegion`*. The *`SharedSubscriptions`* map should also be updated to remove the *`SharedTopicSubscription`* if it contains no more consumers. Otherwise it should just remove its consumer matching the *`consumerId`* from *`ConsumerInfo`*. *`removeConsumer`* of the superclass *`AbstractRegion`* is called which will remove the *`SharedNonDurableTopicSubscription`* from the *`subscriptions`* map and calls *`removeSubscription`* on all relevant destinations. + +![](img/img_13.png) + +### 6\. Client side methods changes + +New *`createSharedConsumer`* and *`createSharedDurableConsumer`* methods have been added to *`Session`*, *`TopicSession`* and *`JMSContext`*. + +*`Session`* has 6 implementations: + +* *`ActiveMQSession`* ← Will need to be updated with logic to create shared consumers. +* *`ActiveMQQueueSession`* ← Should continue to throw exceptions as queues don’t support shared consumers. +* *`ActiveMQTopicSession`* ← Currently doesn’t support creation of durable consumers. If durable consumers are not supported then it is unlikely that shared consumers would be too. +* *`PooledSession`* ← Currently doesn’t support creation of durable consumers. If durable consumers are not supported then it is unlikely that shared consumers would be too. +* *`InboundSessionProxy`* ← Currently doesn’t support creation of durable consumers. If durable consumers are not supported then it is unlikely that shared consumers would be too. +* *`ManagedSessionProxy`* ← Currently doesn’t support creation of durable consumers. If durable consumers are not supported then it is unlikely that shared consumers would be too. + +*`TopicSession`* extends *`Session`* and has 2 implementations not covered above: + +* *`ActiveMQTopicSession`* ← Currently doesn’t support creation of durable consumers. If durable consumers are not supported then it is unlikely that shared consumers would be too. Note this API exists for compatibility with the earliest JMS spec JMS 1.0. +* *`ActiveMQXASession`* ← Extends ActiveMQSession so will take advantage of the logic added there for shared consumers. + +*`JMSContext`* has 1 implementation not covered above: + +* *`ActiveMQContext`* ← Will need to be updated with logic to create shared consumers. + +There is little to no information in the spec about support for shared consumers in *`ConnectionConsumer`* but it is still a Jakarta Messaging 3.1 interface. New *`createSharedConnectionConsumer`* and *`createSharedDurableConnectionConsumer`* methods have been added to the jakarta *`Connection`* interface. + +*`Connection`* has 4 implementations: + +* *`ActiveMQConnection`* ← Will need to be updated with logic to create shared consumers. +* *`ActiveMQXAConnection`* ← Extends *`ActiveMQConnection`* so will take advantage of the logic added there for shared consumers. +* *`PooledConnection`* ← Will need to be updated with logic to create shared consumers. +* *`InboundConnectionProxy`* ← Currently doesn’t support creation of durable consumers. If durable consumers are not supported then it is unlikely that shared consumers would be too. +* *`ManagedConnectionProxy`* ← Currently doesn’t support creation of durable consumers. If durable consumers are not supported then it is unlikely that shared consumers would be too. + +### 7\. Openwire changes + +*`ConsumerInfo`* will need to be updated to include a new boolean *`shared`* property for determining if a consumer is shared. The current *`ConsumerInfo`* also determines durability based on *`return subscriptionName != null;`*. This will no longer hold true as the *`subscriptionName`* will now be set for non durable shared subscriptions. This means a new boolean *`durable`* property will also need to be added to *`ConsumerInfo`*. +It may be best to introduce a new v13 of openwire for Jakarta 3.1. This means users will need to use the latest version of the openwire to have the version of *`ConsumerInfoMarshaller`* which creates shared subscriptions. Older openwire marshellers will not be updated with any of the changes to populate the new fields on *`ConsumerInfo`* for shared subscriptions. +The *`isDurable`* method of *`ConsumerInfo`* will need to be updated to work with all versions of openwire. It will need the following evaluation: +*`return (!shared && subscriptionName != null) || (shared && durable);`*. +*`shared`* will default to false for openwire versions older than v13 and so *`isDurable`* will evaluate in the same way as previously, based on *`subscriptionName`*. For openwire v13 the *`shared`* and *`durable`* fields will be populated and so can be used to determine durability for shared subscriptions. + +## Appendix + +Links to the official spec: + +* [https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1\#shared-non-durable-subscriptions](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#shared-non-durable-subscriptions) +* [https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1\#shared-durable-subscriptions](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#shared-durable-subscriptions) +* [https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1\#jakarta-messaging-application-server-facilities](https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#jakarta-messaging-application-server-facilities) +* [https://www.oracle.com/technical-resources/articles/java/jms2messaging.html](https://www.oracle.com/technical-resources/articles/java/jms2messaging.html) \ No newline at end of file diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_1.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_1.png new file mode 100644 index 00000000000..e38a54f904d Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_1.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_10.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_10.png new file mode 100644 index 00000000000..deefbe289e8 Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_10.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_11.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_11.png new file mode 100644 index 00000000000..c3e81d27093 Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_11.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_12.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_12.png new file mode 100644 index 00000000000..e3a99acb4ed Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_12.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_13.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_13.png new file mode 100644 index 00000000000..300af3c7300 Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_13.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_2.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_2.png new file mode 100644 index 00000000000..bb5439885e0 Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_2.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_3.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_3.png new file mode 100644 index 00000000000..7f917e7a973 Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_3.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_4.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_4.png new file mode 100644 index 00000000000..6f4c0ab685c Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_4.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_5.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_5.png new file mode 100644 index 00000000000..b89c0d089d8 Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_5.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_6.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_6.png new file mode 100644 index 00000000000..72ed090079a Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_6.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_7.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_7.png new file mode 100644 index 00000000000..a7487476746 Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_7.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_8.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_8.png new file mode 100644 index 00000000000..cb78c8d450e Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_8.png differ diff --git a/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_9.png b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_9.png new file mode 100644 index 00000000000..3ade55d2732 Binary files /dev/null and b/docs/proposal/jakarta_messaging_3_1_shared_subscription/img/img_9.png differ