Skip to content

Commit 7f1115d

Browse files
committed
[AMQ-9692] Support destination gc sweep of destinations with only wildcard consumers
1 parent 2eb0d66 commit 7f1115d

File tree

13 files changed

+129
-43
lines changed

13 files changed

+129
-43
lines changed

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
6262
super.removeDestination(context, destination, timeout);
6363
regionBroker.unregister(destination);
6464
}
65+
6566
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,4 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
6262
super.removeDestination(context, destination, timeout);
6363
regionBroker.unregister(destination);
6464
}
65-
6665
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,4 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
6262
super.removeDestination(context, destination, timeout);
6363
regionBroker.unregister(destination);
6464
}
65-
6665
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,16 +260,21 @@ protected List<Subscription> addSubscriptionsForDestination(ConnectionContext co
260260
}
261261

262262
@Override
263-
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
264-
throws Exception {
265-
263+
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
266264
// No timeout.. then try to shut down right way, fails if there are
267265
// current subscribers.
268266
if (timeout == 0) {
269267
for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
270268
Subscription sub = iter.next();
271269
if (sub.matches(destination) ) {
272-
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
270+
if(sub.isWildcard()) {
271+
var dest = destinations.get(destination);
272+
if(dest != null && dest.isGcWithOnlyWildcardConsumers()) {
273+
continue;
274+
}
275+
} else {
276+
throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub);
277+
}
273278
}
274279
}
275280
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public abstract class BaseDestination implements Destination {
105105
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
106106
private boolean gcIfInactive;
107107
private boolean gcWithNetworkConsumers;
108-
private long lastActiveTime=0l;
108+
private boolean gcWithOnlyWildcardConsumers;
109+
private long lastActiveTime = 0L;
109110
private boolean reduceMemoryFootprint = false;
110111
protected final Scheduler scheduler;
111112
private boolean disposed = false;
@@ -311,12 +312,24 @@ public final MessageStore getMessageStore() {
311312

312313
@Override
313314
public boolean isActive() {
314-
boolean isActive = destinationStatistics.getConsumers().getCount() > 0 ||
315-
destinationStatistics.getProducers().getCount() > 0;
316-
if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) {
317-
isActive = hasRegularConsumers(getConsumers());
315+
if (destinationStatistics.getProducers().getCount() > 0) {
316+
return true;
318317
}
319-
return isActive;
318+
319+
var destinationActive = true;
320+
if (destinationStatistics.getConsumers().getCount() > 0) {
321+
if (isGcWithNetworkConsumers()) {
322+
destinationActive = hasRegularConsumers(getConsumers());
323+
}
324+
325+
if (destinationActive &&
326+
isGcWithOnlyWildcardConsumers()) {
327+
destinationActive = !getConsumers().stream().allMatch(Subscription::isWildcard);
328+
}
329+
} else {
330+
destinationActive = false;
331+
}
332+
return destinationActive;
320333
}
321334

322335
@Override
@@ -824,19 +837,37 @@ public boolean isGcWithNetworkConsumers() {
824837
return gcWithNetworkConsumers;
825838
}
826839

840+
/**
841+
* Indicate if it is ok to gc destinations that have only wildcard consumers
842+
* @param gcWithOnlyWildcardConsumers
843+
*/
844+
public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) {
845+
this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers;
846+
}
847+
848+
public boolean isGcWithOnlyWildcardConsumers() {
849+
return gcWithOnlyWildcardConsumers;
850+
}
851+
827852
@Override
828853
public void markForGC(long timeStamp) {
829-
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
830-
&& destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
854+
if (isGcIfInactive()
855+
&& this.lastActiveTime == 0
856+
&& destinationStatistics.getMessages().getCount() == 0
857+
&& getInactiveTimeoutBeforeGC() > 0L
858+
&& !isActive()) {
831859
this.lastActiveTime = timeStamp;
832860
}
833861
}
834862

835863
@Override
836864
public boolean canGC() {
837-
boolean result = false;
838-
final long currentLastActiveTime = this.lastActiveTime;
839-
if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) {
865+
var result = false;
866+
final var currentLastActiveTime = this.lastActiveTime;
867+
if (isGcIfInactive()
868+
&& currentLastActiveTime != 0L
869+
&& destinationStatistics.getMessages().getCount() == 0L
870+
&& !isActive()) {
840871
if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) {
841872
result = true;
842873
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,4 +267,5 @@ public interface Destination extends Service, Task, Message.MessageDestination {
267267

268268
void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled);
269269

270+
boolean isGcWithOnlyWildcardConsumers();
270271
}

activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,11 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic
429429
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
430430
}
431431

432+
@Override
433+
public boolean isGcWithOnlyWildcardConsumers() {
434+
return next.isGcWithOnlyWildcardConsumers();
435+
}
436+
432437
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
433438
if (next instanceof DestinationFilter) {
434439
DestinationFilter filter = (DestinationFilter) next;

activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
*
3434
*
3535
*/
36-
public class TempQueue extends Queue{
36+
public class TempQueue extends Queue {
3737
private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class);
3838
private final ActiveMQTempDestination tempDest;
3939

activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
5858

5959
super.removeDestination(context, destination, timeout);
6060
}
61-
61+
6262
/*
6363
* For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till
6464
* the notification to ensure that the subscription chosen by the master is used.

activemq-broker/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ public String toString() {
6868
}
6969

7070
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
71-
7271
// Force a timeout value so that we don't get an error that
7372
// there is still an active sub. Temp destination may be removed
7473
// while a network sub is still active which is valid.

0 commit comments

Comments
 (0)