Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public class BrokerService implements Service {
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
// to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
private boolean advisorySupport = false;
private boolean anonymousProducerAdvisorySupport = false;
private URI vmConnectorURI;
private String defaultSocketURIString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public TransportConnection(TransportConnector connector, final Transport transpo
@Override
public void onCommand(Object o) {
serviceLock.readLock().lock();
System.out.println("[PUB_PATH] onCommand " + o.getClass().getName());
try {
if (!(o instanceof Command)) {
throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
Expand Down Expand Up @@ -332,6 +333,7 @@ public Response service(Command command) {
Response response = null;
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
System.out.println("[PUB_PATH] got commandId: " + commandId);
try {
if (status.get() != PENDING_STOP) {
response = command.visit(this);
Expand Down Expand Up @@ -576,6 +578,9 @@ public Response processRecoverTransactions(TransactionInfo info) throws Exceptio

@Override
public Response processMessage(Message messageSend) throws Exception {
// [PUB_PATH]
System.out.println("[PUB_PATH] in processMessage");
System.out.println(messageSend.toString());
ProducerId producerId = messageSend.getProducerId();
ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
if (producerExchange.canDispatch(messageSend)) {
Expand Down Expand Up @@ -628,6 +633,7 @@ public Response processRemoveDestination(DestinationInfo info) throws Exception

@Override
public Response processAddProducer(ProducerInfo info) throws Exception {
System.out.println("[PUB_PATH] in processAddProducer]");
SessionId sessionId = info.getProducerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
Expand Down Expand Up @@ -683,6 +689,7 @@ public Response processRemoveProducer(ProducerId id) throws Exception {

@Override
public Response processAddConsumer(ConsumerInfo info) throws Exception {
System.out.println("[SUB_PATH] in processAddConsumer]");
SessionId sessionId = info.getConsumerId().getParentId();
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void onAccept(final Transport transport) {
public void run() {
try {
if (!brokerService.isStopping()) {
Connection connection = createConnection(transport);
Connection connection = createConnection(transport); // [PUB_PATH]
connection.start();
} else {
throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void stop() throws Exception {
@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,
boolean createIfTemporary) throws Exception {

System.out.println("[SUB_PATH] AbstractRegion addDesitnation: " + destination.toString());
destinationsLock.writeLock().lock();
try {
Destination dest = destinations.get(destination);
Expand All @@ -154,6 +154,7 @@ public Destination addDestination(ConnectionContext context, ActiveMQDestination
validateMaxDestinations(destination);

LOG.debug("{} adding destination: {}", broker.getBrokerName(), destination);
System.out.println("[SUB_PATH] AbstractRegion addDestination: creating destination: " + destination.toString());
dest = createDestination(context, destination);
// intercept if there is a valid interceptor defined
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
Expand Down Expand Up @@ -238,6 +239,7 @@ protected void validateMaxDestinations(ActiveMQDestination destination)
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
List<Subscription> rc = new ArrayList<Subscription>();
// Add all consumers that are interested in the destination.
System.out.println("[SUB_PATH] AbstractRegion addSubscriptionsForDestination: " + dest.toString());
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
Subscription sub = iter.next();
if (sub.matches(dest.getActiveMQDestination())) {
Expand Down Expand Up @@ -342,10 +344,11 @@ public Map<ActiveMQDestination, Destination> getDestinationMap() {
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug("{} adding consumer: {} for destination: {}",
broker.getBrokerName(), info.getConsumerId(), info.getDestination());
System.out.println("[SUB_PATH] in AbstractRegion addConsumer " + info.getSubscriptionName() + " " + info.getDestination());
ActiveMQDestination destination = info.getDestination();
if (destination != null && !destination.isPattern() && !destination.isComposite()) {
// lets auto-create the destination
lookup(context, destination,true);
lookup(context, destination, true);
}

Object addGuard;
Expand Down Expand Up @@ -379,7 +382,9 @@ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) th
//
DestinationFilter.parseFilter(info.getDestination());

System.out.println("[SUB_PATH] AbstractRegion addConsumer creating subscription");
Subscription sub = createSubscription(context, info);
System.out.println("[SUB_PATH] created subscription: " + sub.toString());

// At this point we're done directly manipulating subscriptions,
// but we need to retain the synchronized block here. Consider
Expand All @@ -404,8 +409,9 @@ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) th

List<Destination> removeList = new ArrayList<Destination>();
for (Destination dest : addList) {
System.out.println("[SUB_PATH] adding subscription to destination: " + dest.toString());
try {
dest.addSubscription(context, sub);
dest.addSubscription(context, sub); // This is logic that added the subscription
removeList.add(dest);
} catch (SecurityException e){
if (sub.isWildcard()) {
Expand Down Expand Up @@ -503,13 +509,14 @@ public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo

@Override
public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
System.out.println("[PUB_PATH] AbstractRegion send");
final ConnectionContext context = producerExchange.getConnectionContext();

if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
final Destination regionDestination = lookup(context, messageSend.getDestination(),false);
producerExchange.setRegionDestination(regionDestination);
}

System.out.println("[PUB_PATH] AbstractRegion send destination: " + producerExchange.getRegionDestination().toString());
producerExchange.getRegionDestination().send(producerExchange, messageSend);

if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){
Expand Down Expand Up @@ -551,7 +558,7 @@ protected Destination lookup(ConnectionContext context, ActiveMQDestination dest

protected Destination lookup(ConnectionContext context, ActiveMQDestination destination, boolean createTemporary, boolean autoCreate) throws Exception {
Destination dest = null;

System.out.println("[SUB_PATH] AbstractRegion in lookup for desitination: " + destination.toString());
destinationsLock.readLock().lock();
try {
dest = destinations.get(destination);
Expand All @@ -564,7 +571,9 @@ protected Destination lookup(ConnectionContext context, ActiveMQDestination dest
// Try to auto create the destination... re-invoke broker
// from the
// top so that the proper security checks are performed.
System.out.println("[SUB_PATH] AbstractRegion lookup: creating destination");
dest = context.getBroker().addDestination(context, destination, createTemporary);
System.out.println("[SUB_PATH] AbstractRegion lookup: done create destination");
}

if (dest == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ public boolean isDisposed() {
* some way - such as to send to a dead letter queue or something..
*/
protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
System.out.println("[PUB_PATH] onMessageWithNoConsumers");
if (!msg.isPersistent()) {
if (isSendAdvisoryIfNoConsumers()) {
// allow messages with no consumers to be dispatched to a dead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
private final AtomicBoolean active = new AtomicBoolean();
private final AtomicLong offlineTimestamp = new AtomicLong(-1);
private final HashSet<MessageId> ackedAndPrepared = new HashSet<MessageId>();
private boolean shared = false;

public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
Expand Down Expand Up @@ -115,6 +116,7 @@ protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDis

@Override
public void add(ConnectionContext context, Destination destination) throws Exception {
System.out.println("[SUB_PATH] DurableTopicSubscription add: " + destination.toString());
if (!destinations.contains(destination)) {
super.add(context, destination);
}
Expand Down Expand Up @@ -381,7 +383,7 @@ public void afterRollback() throws Exception {
public synchronized String toString() {
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
+ durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount()
+ ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
+ ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension() + ", shared: " + isShared();
}

public SubscriptionKey getSubscriptionKey() {
Expand Down Expand Up @@ -438,4 +440,12 @@ public boolean isKeepDurableSubsActive() {
public boolean isEnableMessageExpirationOnActiveDurableSubs() {
return enableMessageExpirationOnActiveDurableSubs;
}

public boolean isShared() {
return shared;
}

public void setShared(boolean shared) {
this.shared = shared;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redis

// made public so it can be used in MQTTProtocolConverter
public void dispatchPending() throws IOException {
System.out.println("[PUB_PATH] PrefetchSubscription dispatch pending");
List<Destination> slowConsumerTargets = null;

synchronized(pendingLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFact
}

protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
System.out.println("[PUB_SUB] create topic region");
return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}

Expand Down Expand Up @@ -427,9 +428,12 @@ public void removeProducer(ConnectionContext context, ProducerInfo info) throws

@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
System.out.println("[SUB_PATH] Add a consumer in RegionBroker");
ActiveMQDestination destination = info.getDestination();
if (destinationInterceptor != null) {
System.out.println("[SUB_PATH] RegionBroker creating destinaiton incerceptor for destination: " + destination.toString());
destinationInterceptor.create(this, context, destination);
System.out.println("[SUB_PATH] RegionBroker done creating destinaiton incerceptor for destination: " + destination.toString());
}
inactiveDestinationsPurgeLock.readLock().lock();
try {
Expand Down Expand Up @@ -462,8 +466,9 @@ public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo
}

@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { // [PUB_PATH]
ActiveMQDestination destination = message.getDestination();
System.out.println("[PUB_PATH] RegionBroker.send: " + destination.toString());
message.setBrokerInTime(System.currentTimeMillis());
if (producerExchange.isMutable() || producerExchange.getRegion() == null
|| (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.activemq.broker.region;

import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class SharedDurableTopicSubscriptionMetadata {
private static final Logger LOG = LoggerFactory.getLogger(SharedDurableTopicSubscriptionMetadata.class);

private String subscriptionName;
private ArrayList<SubscriptionKey> subKeys = new ArrayList<>();
private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> subMap = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private int counter;

public SharedDurableTopicSubscriptionMetadata(String subscriptionName) {
this.subscriptionName = subscriptionName;
}

public void addDurableTopicSubscription(SubscriptionKey key, DurableTopicSubscription subscription) {
if (!subMap.containsKey(key)) {
subKeys.add(key);
subMap.put(key, subscription);
}
}

public void removeDurableTopicSubscription(SubscriptionKey key, DurableTopicSubscription subscription) {
if (subMap.containsKey(key)) {
subMap.remove(key);
subKeys.remove(key);
}
}

public DurableTopicSubscription getNextDurableTopicSubscription() {
int index = counter % subKeys.size();
counter++;
return subMap.get(subKeys.get(index));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.apache.activemq.broker.region;

import jakarta.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.usage.UsageListener;

public class SharedTopicSubscription extends AbstractSubscription {

public SharedTopicSubscription(Broker broker, ConnectionContext context,
ConsumerInfo info) throws InvalidSelectorException {
super(broker, context, info);
}

@Override
public void add(MessageReference node) throws Exception {

}

@Override
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
return null;
}

@Override
public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {

}

@Override
public int getPendingQueueSize() {
return 0;
}

@Override
public long getPendingMessageSize() {
return 0;
}

@Override
public int getDispatchedQueueSize() {
return 0;
}

@Override
public long getDispatchedCounter() {
return 0;
}

@Override
public long getEnqueueCounter() {
return 0;
}

@Override
public long getDequeueCounter() {
return 0;
}

@Override
public boolean isLowWaterMark() {
return false;
}

@Override
public boolean isHighWaterMark() {
return false;
}

@Override
public boolean isFull() {
return false;
}

@Override
public void updateConsumerPrefetch(int newPrefetch) {

}

@Override
public void destroy() {

}

@Override
public int getInFlightSize() {
return 0;
}
}
Loading