diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index cc7527cc991..a1867860cd0 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -362,16 +362,16 @@ status(_) -> running. pending_count(#{dest := Dest}) -> - Pending = maps:get(pending, Dest, queue:new()), - queue:len(Pending). + Pending = maps:get(pending, Dest, lqueue:new()), + lqueue:len(Pending). add_pending(Elem, State = #{dest := Dest}) -> - Pending = maps:get(pending, Dest, queue:new()), - State#{dest => Dest#{pending => queue:in(Elem, Pending)}}. + Pending = maps:get(pending, Dest, lqueue:new()), + State#{dest => Dest#{pending => lqueue:in(Elem, Pending)}}. pop_pending(State = #{dest := Dest}) -> - Pending = maps:get(pending, Dest, queue:new()), - case queue:out(Pending) of + Pending = maps:get(pending, Dest, lqueue:new()), + case lqueue:out(Pending) of {empty, _} -> empty; {{value, Elem}, Pending2} -> diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index b957ea1712e..72ff5bf6a8a 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -464,7 +464,7 @@ status(State) -> end. pending_count(#{dest := #{pending_delivery := Pending}}) -> - queue:len(Pending); + lqueue:len(Pending); pending_count(_) -> 0. @@ -935,12 +935,12 @@ is_blocked(_) -> false. add_pending_delivery(Elem, State = #{dest := Dest}) -> - Pending = maps:get(pending_delivery, Dest, queue:new()), - State#{dest => Dest#{pending_delivery => queue:in(Elem, Pending)}}. + Pending = maps:get(pending_delivery, Dest, lqueue:new()), + State#{dest => Dest#{pending_delivery => lqueue:in(Elem, Pending)}}. pop_pending_delivery(State = #{dest := Dest}) -> - Pending = maps:get(pending_delivery, Dest, queue:new()), - case queue:out(Pending) of + Pending = maps:get(pending_delivery, Dest, lqueue:new()), + case lqueue:out(Pending) of {empty, _} -> empty; {{value, Elem}, Pending2} -> diff --git a/deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl index 244d3d13829..7b8f171f7ae 100644 --- a/deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp091_local_dynamic_SUITE.erl @@ -108,7 +108,8 @@ init_per_suite(Config0) -> "source_queue_down", "dest_queue_down", "inbound_link_detached", - "not_found" + "not_found", + "dependent process" ]} ]), rabbit_ct_helpers:run_setup_steps( @@ -318,7 +319,7 @@ missing_src_queue_with_src_predeclared(Config) -> Ch, #'queue.bind'{queue = Dest, exchange = <<"dest-ex">>, routing_key = <<"dest-key">>}), - + set_param_nowait(Config, ?PARAM, ?config(shovel_args, Config) ++ [{<<"src-queue">>, Src}, @@ -327,7 +328,7 @@ missing_src_queue_with_src_predeclared(Config) -> {<<"dest-exchange-key">>, <<"dest-key">>}]), await_shovel(Config, 0, ?PARAM, terminated), expect_missing_queue(Ch, Src), - + with_amqp091_ch( Config, fun(Ch2) -> @@ -368,7 +369,7 @@ missing_dest_queue_with_dest_predeclared(Config) -> fun(Ch2) -> amqp_channel:call( Ch2, #'queue.declare'{queue = Dest, - durable = true}), + durable = true}), await_shovel(Config, 0, ?PARAM, running), amqp091_publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, Dest, <<"hello!">>) end) @@ -392,7 +393,7 @@ missing_src_queue_without_src_predeclared(Config) -> Ch, #'queue.bind'{queue = Dest, exchange = <<"dest-ex">>, routing_key = <<"dest-key">>}), - + set_param_nowait(Config, ?PARAM, ?config(shovel_args, Config) ++ [{<<"src-queue">>, Src}, @@ -400,7 +401,7 @@ missing_src_queue_without_src_predeclared(Config) -> {<<"dest-exchange-key">>, <<"dest-key">>}]), await_shovel(Config, 0, ?PARAM, terminated), expect_missing_queue(Ch, Src), - + with_amqp091_ch( Config, fun(Ch2) -> diff --git a/deps/rabbitmq_shovel/test/pending_count_SUITE.erl b/deps/rabbitmq_shovel/test/pending_count_SUITE.erl index d84222d2545..6349cce2152 100644 --- a/deps/rabbitmq_shovel/test/pending_count_SUITE.erl +++ b/deps/rabbitmq_shovel/test/pending_count_SUITE.erl @@ -69,13 +69,13 @@ amqp091_pending_count_empty_queue(_Config) -> amqp091_pending_count_with_messages(_Config) -> %% Test that pending_count returns correct count when messages are pending - PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]), + PendingQueue = lqueue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]), State = #{dest => #{pending => PendingQueue}}, ?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)). amqp091_pending_count_after_drain(_Config) -> %% Test that pending_count returns 0 after messages are drained - EmptyQueue = queue:new(), + EmptyQueue = lqueue:new(), State = #{dest => #{pending => EmptyQueue}}, ?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).