4343import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
4444import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
4545import java .util .concurrent .locks .ReentrantLock ;
46+ import java .util .function .Predicate ;
4647
4748import org .apache .activemq .artemis .api .config .ActiveMQDefaultConfiguration ;
4849import org .apache .activemq .artemis .api .core .ActiveMQException ;
@@ -2006,6 +2007,11 @@ private int iterQueue(final int flushLimit,
20062007 QueueIterateAction messageAction ) throws Exception {
20072008 int count = 0 ;
20082009 int txCount = 0 ;
2010+
2011+ if (filter1 != null ) {
2012+ messageAction .addFilter (filter1 );
2013+ }
2014+
20092015 // This is to avoid scheduling depaging while iterQueue is happening
20102016 // this should minimize the use of the paged executor.
20112017 depagePending = true ;
@@ -2024,7 +2030,7 @@ private int iterQueue(final int flushLimit,
20242030 while (iter .hasNext () && !messageAction .expectedHitsReached (count )) {
20252031 MessageReference ref = iter .next ();
20262032
2027- if (filter1 == null || filter1 .match (ref . getMessage () )) {
2033+ if (messageAction .match (ref )) {
20282034 if (messageAction .actMessage (tx , ref )) {
20292035 iter .remove ();
20302036 refRemoved (ref );
@@ -2046,7 +2052,7 @@ private int iterQueue(final int flushLimit,
20462052 return count ;
20472053 }
20482054
2049- List <MessageReference > cancelled = scheduledDeliveryHandler .cancel (ref -> filter1 == null ? true : filter1 .match (ref . getMessage () ));
2055+ List <MessageReference > cancelled = scheduledDeliveryHandler .cancel (ref -> messageAction .match (ref ));
20502056 for (MessageReference messageReference : cancelled ) {
20512057 messageAction .actMessage (tx , messageReference );
20522058 count ++;
@@ -2069,12 +2075,12 @@ private int iterQueue(final int flushLimit,
20692075 PagedReference reference = pageIterator .next ();
20702076 pageIterator .remove ();
20712077
2072- if (filter1 == null || filter1 .match (reference .getMessage ())) {
2073- count ++;
2074- txCount ++;
2078+ if (messageAction .match (reference )) {
20752079 if (!messageAction .actMessage (tx , reference )) {
20762080 addTail (reference , false );
20772081 }
2082+ txCount ++;
2083+ count ++;
20782084 } else {
20792085 addTail (reference , false );
20802086 }
@@ -2393,43 +2399,27 @@ public void run() {
23932399
23942400 @ Override
23952401 public synchronized boolean sendMessageToDeadLetterAddress (final long messageID ) throws Exception {
2396- try (LinkedListIterator <MessageReference > iter = iterator ()) {
2397- while (iter .hasNext ()) {
2398- MessageReference ref = iter .next ();
2399- if (ref .getMessage ().getMessageID () == messageID ) {
2400- incDelivering (ref );
2401- sendToDeadLetterAddress (null , ref );
2402- iter .remove ();
2403- refRemoved (ref );
2404- return true ;
2405- }
2406- }
2407- if (pageIterator != null && !queueDestroyed ) {
2408- while (pageIterator .hasNext ()) {
2409- PagedReference ref = pageIterator .next ();
2410- if (ref .getMessage ().getMessageID () == messageID ) {
2411- incDelivering (ref );
2412- sendToDeadLetterAddress (null , ref );
2413- pageIterator .remove ();
2414- refRemoved (ref );
2415- return true ;
2416- }
2417- }
2402+
2403+ return iterQueue (DEFAULT_FLUSH_LIMIT , null , new QueueIterateAction (messageID ) {
2404+
2405+ @ Override
2406+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2407+ incDelivering (ref );
2408+ sendToDeadLetterAddress (tx , ref );
2409+ return true ;
24182410 }
2419- return false ;
2420- }
2411+ }) == 1 ;
24212412 }
24222413
24232414 @ Override
24242415 public synchronized int sendMessagesToDeadLetterAddress (Filter filter ) throws Exception {
2425-
24262416 return iterQueue (DEFAULT_FLUSH_LIMIT , filter , new QueueIterateAction () {
24272417
24282418 @ Override
24292419 public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2430-
24312420 incDelivering (ref );
2432- return sendToDeadLetterAddress (tx , ref );
2421+ sendToDeadLetterAddress (tx , ref );
2422+ return true ;
24332423 }
24342424 });
24352425 }
@@ -2439,24 +2429,17 @@ public synchronized boolean moveReference(final long messageID,
24392429 final SimpleString toAddress ,
24402430 final Binding binding ,
24412431 final boolean rejectDuplicate ) throws Exception {
2442- try (LinkedListIterator <MessageReference > iter = iterator ()) {
2443- while (iter .hasNext ()) {
2444- MessageReference ref = iter .next ();
2445- if (ref .getMessage ().getMessageID () == messageID ) {
2446- iter .remove ();
2447- refRemoved (ref );
2448- incDelivering (ref );
2449- try {
2450- move (null , toAddress , binding , ref , rejectDuplicate , AckReason .NORMAL , null , null , true );
2451- } catch (Exception e ) {
2452- decDelivering (ref );
2453- throw e ;
2454- }
2455- return true ;
2456- }
2432+
2433+ return iterQueue (DEFAULT_FLUSH_LIMIT , null , new QueueIterateAction (messageID ) {
2434+
2435+ @ Override
2436+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2437+ incDelivering (ref );
2438+ move (tx , toAddress , binding , ref , rejectDuplicate , AckReason .NORMAL , null , null , true );
2439+ return true ;
24572440 }
2458- return false ;
2459- }
2441+
2442+ }) == 1 ;
24602443 }
24612444
24622445 @ Override
@@ -2502,7 +2485,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25022485 }
25032486
25042487 if (!ignored ) {
2505- move (null , toAddress , binding , ref , rejectDuplicates , AckReason .NORMAL , null , null , true );
2488+ move (tx , toAddress , binding , ref , rejectDuplicates , AckReason .NORMAL , null , null , true );
25062489 }
25072490
25082491 return true ;
@@ -2523,20 +2506,16 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
25232506 public synchronized boolean copyReference (final long messageID ,
25242507 final SimpleString toQueue ,
25252508 final Binding binding ) throws Exception {
2526- try (LinkedListIterator <MessageReference > iter = iterator ()) {
2527- while (iter .hasNext ()) {
2528- MessageReference ref = iter .next ();
2529- if (ref .getMessage ().getMessageID () == messageID ) {
2530- try {
2531- copy (null , toQueue , binding , ref );
2532- } catch (Exception e ) {
2533- throw e ;
2534- }
2535- return true ;
2536- }
2509+
2510+ return iterQueue (DEFAULT_FLUSH_LIMIT , null , new QueueIterateAction (messageID ) {
2511+
2512+ @ Override
2513+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2514+ copy (null , toQueue , binding , ref );
2515+ addTail (ref , false );
2516+ return true ;
25372517 }
2538- return false ;
2539- }
2518+ }) == 1 ;
25402519 }
25412520
25422521 public synchronized int rerouteMessages (final SimpleString queueName , final Filter filter ) throws Exception {
@@ -2609,39 +2588,28 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
26092588
26102589 @ Override
26112590 public synchronized boolean changeReferencePriority (final long messageID , final byte newPriority ) throws Exception {
2612- try ( LinkedListIterator < MessageReference > iter = iterator () ) {
2591+ return iterQueue ( DEFAULT_FLUSH_LIMIT , null , new QueueIterateAction ( messageID ) {
26132592
2614- while (iter .hasNext ()) {
2615- MessageReference ref = iter .next ();
2616- if (ref .getMessage ().getMessageID () == messageID ) {
2617- iter .remove ();
2618- refRemoved (ref );
2619- ref .getMessage ().setPriority (newPriority );
2620- addTail (ref , false );
2621- return true ;
2622- }
2593+ @ Override
2594+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2595+ ref .getMessage ().setPriority (newPriority );
2596+ addTail (ref , false );
2597+ return true ;
26232598 }
2624-
2625- return false ;
2626- }
2599+ }) == 1 ;
26272600 }
26282601
26292602 @ Override
26302603 public synchronized int changeReferencesPriority (final Filter filter , final byte newPriority ) throws Exception {
2631- try (LinkedListIterator <MessageReference > iter = iterator ()) {
2632- int count = 0 ;
2633- while (iter .hasNext ()) {
2634- MessageReference ref = iter .next ();
2635- if (filter == null || filter .match (ref .getMessage ())) {
2636- count ++;
2637- iter .remove ();
2638- refRemoved (ref );
2639- ref .getMessage ().setPriority (newPriority );
2640- addTail (ref , false );
2641- }
2604+ return iterQueue (DEFAULT_FLUSH_LIMIT , filter , new QueueIterateAction () {
2605+
2606+ @ Override
2607+ public boolean actMessage (Transaction tx , MessageReference ref ) throws Exception {
2608+ ref .getMessage ().setPriority (newPriority );
2609+ addTail (ref , false );
2610+ return true ;
26422611 }
2643- return count ;
2644- }
2612+ });
26452613 }
26462614
26472615 @ Override
@@ -4177,13 +4145,23 @@ public void run() {
41774145 abstract class QueueIterateAction {
41784146
41794147 protected Integer expectedHits ;
4148+ protected Long messageID ;
4149+ protected Filter filter1 = null ;
4150+ protected Predicate <MessageReference > match ;
41804151
41814152 QueueIterateAction (Integer expectedHits ) {
41824153 this .expectedHits = expectedHits ;
4154+ this .match = ref -> filter1 == null ? true : filter1 .match (ref .getMessage ());
4155+ }
4156+
4157+ QueueIterateAction (Long messageID ) {
4158+ this .expectedHits = 1 ;
4159+ this .match = ref -> ref .getMessage ().getMessageID () == messageID ;
41834160 }
41844161
41854162 QueueIterateAction () {
41864163 this .expectedHits = null ;
4164+ this .match = ref -> filter1 == null ? true : filter1 .match (ref .getMessage ());
41874165 }
41884166
41894167 /**
@@ -4198,6 +4176,15 @@ abstract class QueueIterateAction {
41984176 public boolean expectedHitsReached (int currentHits ) {
41994177 return expectedHits != null && currentHits >= expectedHits .intValue ();
42004178 }
4179+
4180+ public void addFilter (Filter filter1 ) {
4181+ this .filter1 = filter1 ;
4182+ }
4183+
4184+ public boolean match (MessageReference ref ) {
4185+ return match .test (ref );
4186+ }
4187+
42014188 }
42024189
42034190 // For external use we need to use a synchronized version since the list is not thread safe
0 commit comments