Skip to content

Commit 4bc3529

Browse files
committed
ARTEMIS-5376 Include all messages in queue management operations
1 parent db45bac commit 4bc3529

File tree

4 files changed

+240
-92
lines changed

4 files changed

+240
-92
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

Lines changed: 77 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
4444
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4545
import java.util.concurrent.locks.ReentrantLock;
46+
import java.util.function.Predicate;
4647

4748
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
4849
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -2017,6 +2018,11 @@ private int iterQueue(final int flushLimit,
20172018
QueueIterateAction messageAction) throws Exception {
20182019
int count = 0;
20192020
int txCount = 0;
2021+
2022+
if (filter1 != null) {
2023+
messageAction.addFilter(filter1);
2024+
}
2025+
20202026
// This is to avoid scheduling depaging while iterQueue is happening
20212027
// this should minimize the use of the paged executor.
20222028
depagePending = true;
@@ -2035,7 +2041,7 @@ private int iterQueue(final int flushLimit,
20352041
while (iter.hasNext() && !messageAction.expectedHitsReached(count)) {
20362042
MessageReference ref = iter.next();
20372043

2038-
if (filter1 == null || filter1.match(ref.getMessage())) {
2044+
if (messageAction.match(ref)) {
20392045
if (messageAction.actMessage(tx, ref)) {
20402046
iter.remove();
20412047
refRemoved(ref);
@@ -2057,7 +2063,7 @@ private int iterQueue(final int flushLimit,
20572063
return count;
20582064
}
20592065

2060-
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : filter1.match(ref.getMessage()));
2066+
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> messageAction.match(ref));
20612067
for (MessageReference messageReference : cancelled) {
20622068
messageAction.actMessage(tx, messageReference);
20632069
count++;
@@ -2080,12 +2086,12 @@ private int iterQueue(final int flushLimit,
20802086
PagedReference reference = pageIterator.next();
20812087
pageIterator.remove();
20822088

2083-
if (filter1 == null || filter1.match(reference.getMessage())) {
2084-
count++;
2085-
txCount++;
2089+
if (messageAction.match(reference)) {
20862090
if (!messageAction.actMessage(tx, reference)) {
20872091
addTail(reference, false);
20882092
}
2093+
txCount++;
2094+
count++;
20892095
} else {
20902096
addTail(reference, false);
20912097
}
@@ -2404,43 +2410,27 @@ public void run() {
24042410

24052411
@Override
24062412
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
2407-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2408-
while (iter.hasNext()) {
2409-
MessageReference ref = iter.next();
2410-
if (ref.getMessage().getMessageID() == messageID) {
2411-
incDelivering(ref);
2412-
sendToDeadLetterAddress(null, ref);
2413-
iter.remove();
2414-
refRemoved(ref);
2415-
return true;
2416-
}
2417-
}
2418-
if (pageIterator != null && !queueDestroyed) {
2419-
while (pageIterator.hasNext()) {
2420-
PagedReference ref = pageIterator.next();
2421-
if (ref.getMessage().getMessageID() == messageID) {
2422-
incDelivering(ref);
2423-
sendToDeadLetterAddress(null, ref);
2424-
pageIterator.remove();
2425-
refRemoved(ref);
2426-
return true;
2427-
}
2428-
}
2413+
2414+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2415+
2416+
@Override
2417+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2418+
incDelivering(ref);
2419+
sendToDeadLetterAddress(tx, ref);
2420+
return true;
24292421
}
2430-
return false;
2431-
}
2422+
}) == 1;
24322423
}
24332424

24342425
@Override
24352426
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
2436-
24372427
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
24382428

24392429
@Override
24402430
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2441-
24422431
incDelivering(ref);
2443-
return sendToDeadLetterAddress(tx, ref);
2432+
sendToDeadLetterAddress(tx, ref);
2433+
return true;
24442434
}
24452435
});
24462436
}
@@ -2450,24 +2440,17 @@ public synchronized boolean moveReference(final long messageID,
24502440
final SimpleString toAddress,
24512441
final Binding binding,
24522442
final boolean rejectDuplicate) throws Exception {
2453-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2454-
while (iter.hasNext()) {
2455-
MessageReference ref = iter.next();
2456-
if (ref.getMessage().getMessageID() == messageID) {
2457-
iter.remove();
2458-
refRemoved(ref);
2459-
incDelivering(ref);
2460-
try {
2461-
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
2462-
} catch (Exception e) {
2463-
decDelivering(ref);
2464-
throw e;
2465-
}
2466-
return true;
2467-
}
2443+
2444+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2445+
2446+
@Override
2447+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2448+
incDelivering(ref);
2449+
move(tx, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
2450+
return true;
24682451
}
2469-
return false;
2470-
}
2452+
2453+
}) == 1;
24712454
}
24722455

24732456
@Override
@@ -2513,7 +2496,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25132496
}
25142497

25152498
if (!ignored) {
2516-
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
2499+
move(tx, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
25172500
}
25182501

25192502
return true;
@@ -2531,23 +2514,19 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25312514
}
25322515

25332516
@Override
2534-
public synchronized boolean copyReference(final long messageID,
2517+
public boolean copyReference(final long messageID,
25352518
final SimpleString toQueue,
25362519
final Binding binding) throws Exception {
2537-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2538-
while (iter.hasNext()) {
2539-
MessageReference ref = iter.next();
2540-
if (ref.getMessage().getMessageID() == messageID) {
2541-
try {
2542-
copy(null, toQueue, binding, ref);
2543-
} catch (Exception e) {
2544-
throw e;
2545-
}
2546-
return true;
2547-
}
2520+
2521+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
2522+
2523+
@Override
2524+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2525+
copy(tx, toQueue, binding, ref);
2526+
addTail(ref, false);
2527+
return true;
25482528
}
2549-
return false;
2550-
}
2529+
}) == 1;
25512530
}
25522531

25532532
public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
@@ -2620,39 +2599,28 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
26202599

26212600
@Override
26222601
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
2623-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2602+
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {
26242603

2625-
while (iter.hasNext()) {
2626-
MessageReference ref = iter.next();
2627-
if (ref.getMessage().getMessageID() == messageID) {
2628-
iter.remove();
2629-
refRemoved(ref);
2630-
ref.getMessage().setPriority(newPriority);
2631-
addTail(ref, false);
2632-
return true;
2633-
}
2604+
@Override
2605+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2606+
ref.getMessage().setPriority(newPriority);
2607+
addTail(ref, false);
2608+
return true;
26342609
}
2635-
2636-
return false;
2637-
}
2610+
}) == 1;
26382611
}
26392612

26402613
@Override
26412614
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
2642-
try (LinkedListIterator<MessageReference> iter = iterator()) {
2643-
int count = 0;
2644-
while (iter.hasNext()) {
2645-
MessageReference ref = iter.next();
2646-
if (filter == null || filter.match(ref.getMessage())) {
2647-
count++;
2648-
iter.remove();
2649-
refRemoved(ref);
2650-
ref.getMessage().setPriority(newPriority);
2651-
addTail(ref, false);
2652-
}
2615+
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
2616+
2617+
@Override
2618+
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
2619+
ref.getMessage().setPriority(newPriority);
2620+
addTail(ref, false);
2621+
return true;
26532622
}
2654-
return count;
2655-
}
2623+
});
26562624
}
26572625

26582626
@Override
@@ -4188,13 +4156,23 @@ public void run() {
41884156
abstract class QueueIterateAction {
41894157

41904158
protected Integer expectedHits;
4159+
protected Long messageID;
4160+
protected Filter filter1 = null;
4161+
protected Predicate<MessageReference> match;
41914162

41924163
QueueIterateAction(Integer expectedHits) {
41934164
this.expectedHits = expectedHits;
4165+
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
4166+
}
4167+
4168+
QueueIterateAction(Long messageID) {
4169+
this.expectedHits = 1;
4170+
this.match = ref -> ref.getMessage().getMessageID() == messageID;
41944171
}
41954172

41964173
QueueIterateAction() {
41974174
this.expectedHits = null;
4175+
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
41984176
}
41994177

42004178
/**
@@ -4209,6 +4187,15 @@ abstract class QueueIterateAction {
42094187
public boolean expectedHitsReached(int currentHits) {
42104188
return expectedHits != null && currentHits >= expectedHits.intValue();
42114189
}
4190+
4191+
public void addFilter(Filter filter1) {
4192+
this.filter1 = filter1;
4193+
}
4194+
4195+
public boolean match(MessageReference ref) {
4196+
return match.test(ref);
4197+
}
4198+
42124199
}
42134200

42144201
// For external use we need to use a synchronized version since the list is not thread safe

0 commit comments

Comments
 (0)