Skip to content

Commit e23f099

Browse files
committed
[AMQ-9692] Support destination gc sweep of destinations with only wildcard consumers
1 parent 2eb0d66 commit e23f099

28 files changed

+258
-47
lines changed

activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,12 @@ public void addDestinationInfo(ConnectionContext context, DestinationInfo info)
278278

279279
@Override
280280
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
281-
super.removeDestination(context, destination, timeout);
281+
removeDestination(context, destination, timeout, false);
282+
}
283+
284+
@Override
285+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
286+
super.removeDestination(context, destination, timeout, withOnlyWildcardConsumers);
282287
DestinationInfo info = destinations.remove(destination);
283288
if (info != null) {
284289

@@ -311,7 +316,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
311316
ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination);
312317
for (ActiveMQTopic advisoryDestination : advisoryDestinations) {
313318
try {
314-
next.removeDestination(context, advisoryDestination, -1);
319+
next.removeDestination(context, advisoryDestination, -1, withOnlyWildcardConsumers);
315320
} catch (Exception expectedIfDestinationDidNotExistYet) {
316321
}
317322
}

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,15 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
191191
}
192192
}
193193

194+
@Override
195+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
196+
next.removeDestination(context, destination, timeout, withOnlyWildcardConsumers);
197+
Broker brokers[] = getListeners();
198+
for (int i = 0; i < brokers.length; i++) {
199+
brokers[i].removeDestination(context, destination, timeout, withOnlyWildcardConsumers);
200+
}
201+
}
202+
194203
@Override
195204
public void start() throws Exception {
196205
next.start();

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
179179
getNext().removeDestination(context, destination, timeout);
180180
}
181181

182+
@Override
183+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
184+
getNext().removeDestination(context, destination, timeout, withOnlyWildcardConsumers);
185+
}
186+
182187
@Override
183188
public ActiveMQDestination[] getDestinations() throws Exception {
184189
return getNext().getDestinations();

activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ public Destination addDestination(ConnectionContext context, ActiveMQDestination
153153
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
154154
}
155155

156+
@Override
157+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
158+
}
159+
156160
@Override
157161
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
158162
return null;

activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
171171
throw new BrokerStoppedException(this.message);
172172
}
173173

174+
@Override
175+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
176+
throw new BrokerStoppedException(this.message);
177+
}
178+
174179
@Override
175180
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
176181
throw new BrokerStoppedException(this.message);

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,9 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
6262
super.removeDestination(context, destination, timeout);
6363
regionBroker.unregister(destination);
6464
}
65+
66+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
67+
super.removeDestination(context, destination, timeout, withOnlyWildcardConsumers);
68+
regionBroker.unregister(destination);
69+
}
6570
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,9 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
6363
regionBroker.unregister(destination);
6464
}
6565

66+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
67+
super.removeDestination(context, destination, timeout, withOnlyWildcardConsumers);
68+
regionBroker.unregister(destination);
69+
}
70+
6671
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,8 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
6363
regionBroker.unregister(destination);
6464
}
6565

66+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
67+
super.removeDestination(context, destination, timeout, withOnlyWildcardConsumers);
68+
regionBroker.unregister(destination);
69+
}
6670
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,8 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
6363
regionBroker.unregister(destination);
6464
}
6565

66+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout, boolean withOnlyWildcardConsumers) throws Exception {
67+
super.removeDestination(context, destination, timeout, withOnlyWildcardConsumers);
68+
regionBroker.unregister(destination);
69+
}
6670
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,24 @@ protected List<Subscription> addSubscriptionsForDestination(ConnectionContext co
260260
}
261261

262262
@Override
263-
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
264-
throws Exception {
263+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
264+
removeDestination(context, destination, timeout, false);
265+
}
266+
267+
@Override
268+
public void removeDestination(final ConnectionContext context, final ActiveMQDestination destination, final long timeout, final boolean withOnlyWildcardConsumers) throws Exception {
265269

266270
// No timeout.. then try to shut down right way, fails if there are
267271
// current subscribers.
268272
if (timeout == 0) {
269273
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
270274
Subscription sub = iter.next();
271275
if (sub.matches(destination) ) {
272-
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
276+
if(withOnlyWildcardConsumers && sub.isWildcard()) {
277+
continue;
278+
} else {
279+
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
280+
}
273281
}
274282
}
275283
}

0 commit comments

Comments
 (0)