diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index 3a0d72863245..748b2617a4ef 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -1,5 +1,6 @@ -module(rabbit_classic_queue). -behaviour(rabbit_queue_type). +-behaviour(rabbit_queue_commands). -behaviour(rabbit_policy_validator). -include("amqqueue.hrl"). @@ -74,6 +75,15 @@ -export([validate_policy/1]). +%% commands +-export([add_member/5, + list_with_local_promotable/0, + delete_member/3, + peek/2, + status/1, + reclaim_memory/1, + shrink_all/1]). + -rabbit_boot_step( {rabbit_classic_queue_type, [{description, "Classic queue: queue type"}, @@ -730,3 +740,24 @@ queue_vm_stats_sups() -> %% Other as usual by substraction. queue_vm_ets() -> {[], []}. + +add_member(_VHost, _Name, _Node, _Membership, _Timeout) -> + {error, classic_queue_not_supported}. + +list_with_local_promotable() -> + {error, classic_queue_not_supported}. + +delete_member(_VHost, _Name, _Node) -> + {error, classic_queue_not_supported}. + +peek(_Pos, _QName) -> + {error, classic_queue_not_supported}. + +status(_QName) -> + {error, classic_queue_not_supported}. + +reclaim_memory(_QName) -> + {error, classic_queue_not_supported}. + +shrink_all(_Node) -> + {error, classic_queue_not_supported}. diff --git a/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl b/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl index 1c9b72a0cea1..ac543ce8b8f5 100644 --- a/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl +++ b/deps/rabbit/src/rabbit_observer_cli_quorum_queues.erl @@ -138,7 +138,7 @@ sheet_body(PrevState) -> empty_row(Name); _ -> QQCounters = maps:get({QName, node()}, ra_counters:overview()), - {ok, InternalName} = rabbit_queue_type_util:qname_to_internal_name(#resource{virtual_host = Vhost, name= Name}), + {ok, InternalName} = rabbit_queue_type_util:qname_to_internal_name(rabbit_misc:queue_resource(Vhost, Name)), [{_, CT, SnapIdx, LA, CI, LW, CL}] = ets:lookup(ra_metrics, R), [ Pid, diff --git a/deps/rabbit/src/rabbit_queue_commands.erl b/deps/rabbit/src/rabbit_queue_commands.erl new file mode 100644 index 000000000000..086a98cbc7da --- /dev/null +++ b/deps/rabbit/src/rabbit_queue_commands.erl @@ -0,0 +1,146 @@ +-module(rabbit_queue_commands). + +-export([add_member/5, + list_with_local_promotable/1, + delete_member/3, + grow/5, + peek/2, + status/1, + reclaim_memory/2, + shrink_all/2]). + +-include_lib("rabbit_common/include/resource.hrl"). + +%% TODO: membership is a subset of ra_membership() with 'unknown' removed. +-type membership() :: voter | non_voter | promotable. + +-callback add_member(VHost :: rabbit_types:vhost(), Name :: resource_name(), Node :: node(), Membership :: membership(), Timeout :: timeout()) -> ok | {error, term()}. + +-callback list_with_local_promotable() -> [amqqueue:amqqueue()] | {error, term()}. + +-callback delete_member(VHost :: rabbit_types:vhost(), Name :: resource_name(), Node :: node()) -> ok | {error, term()}. + +-callback peek(Pos :: non_neg_integer(), QName :: rabbit_amqqueue:name()) -> {ok, [{binary(), term()}]} | {error, term()}. + +-callback status(QName :: rabbit_amqqueue:name()) -> [[{binary(), term()}]] | {error, term()}. + +-callback reclaim_memory(QName :: rabbit_amqqueue:name()) -> ok | {error, term()}. + +-callback shrink_all(Node :: node()) -> [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}] | {error, term()}. + +-spec add_member(rabbit_types:vhost(), resource_name(), node(), membership(), timeout()) -> ok | {error, term()}. +add_member(VHost, Name, Node, Membership, Timeout) -> + case rabbit_amqqueue:lookup(rabbit_misc:queue_resource(VHost, Name)) of + {ok, Q} -> + Type = amqqueue:get_type(Q), + Type:add_member(VHost, Name, Node, Membership, Timeout); + {error, not_found} = E -> + E + end. + +-spec list_with_local_promotable(atom() | binary()) -> [amqqueue:amqqueue()] | {error, term()}. +list_with_local_promotable(TypeDescriptor) -> + case rabbit_queue_type:lookup(TypeDescriptor) of + {ok, Type} -> + {ok, Type:list_with_local_promotable()}; + {error, not_found} -> + {error, {unknown_queue_type, TypeDescriptor}} + end. + +-spec delete_member(rabbit_types:vhost(), resource_name(), node()) -> ok | {error, term()}. +delete_member(VHost, Name, Node) -> + case rabbit_amqqueue:lookup(rabbit_misc:queue_resource(VHost, Name)) of + {ok, Q} -> + Type = amqqueue:get_type(Q), + Type:delete_member(VHost, Name, Node); + {error, not_found} = E -> + E + end. + +-spec grow(node(), binary(), binary(), all | even, membership()) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. +grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> + Running = rabbit_nodes:list_running(), + [begin + Size = length(amqqueue:get_nodes(Q)), + QName = amqqueue:get_name(Q), + rabbit_log:info("~ts: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + Type = amqqueue:get_type(Q), + case Type:add_member(Q, Node, Membership) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + rabbit_log:warning( + "~ts: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end + end + || Q <- rabbit_amqqueue:list(), + rabbit_queue_type:is_replicable(Q), + %% don't add a member if there is already one on the node + not lists:member(Node, amqqueue:get_nodes(Q)), + %% node needs to be running + lists:member(Node, Running), + matches_strategy(Strategy, amqqueue:get_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + +-spec peek(Pos :: non_neg_integer(), QName :: rabbit_amqqueue:name()) -> {ok, [{binary(), term()}]} | {error, term()}. +peek(Pos, QName) -> + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Type = amqqueue:get_type(Q), + Type:peek(Pos, QName); + {error, not_found} = E -> + E + end. + +-spec status(QName :: rabbit_amqqueue:name()) -> [[{binary(), term()}]] | {error, term()}. +status(QName) -> + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Type = amqqueue:get_type(Q), + Type:status(QName); + {error, not_found} = E -> + E + end. + +-spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}. +reclaim_memory(VHost, QueueName) -> + QName = #resource{virtual_host = VHost, name = QueueName, kind = queue}, + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Type = amqqueue:get_type(Q), + Type:reclaim_memory(QName); + {error, not_found} = E -> + E + end. + +-spec shrink_all(Node :: node(), TypeDescriptor :: rabbit_registry:type_descriptor()) -> + {ok, pos_integer()} | + {error, pos_integer(), term()} | + {error, {unknown_queue_type, rabbit_registry:type_descriptor()}} | + {error, atom()}. %% to cover not_supported +shrink_all(Node, <<"all">>) -> + lists:flatten([Type:shrink_all(Node) || Type <- rabbit_queue_type:list_replicable()]); +shrink_all(Node, TypeDescriptor) -> + case rabbit_queue_type:lookup(TypeDescriptor) of + {ok, Type} -> + {ok, Type:shrink_all(Node)}; + {error, not_found} -> + {error, {unknown_queue_type, TypeDescriptor}} + end. + +matches_strategy(all, _) -> true; +matches_strategy(even, Members) -> + length(Members) rem 2 == 0. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + +get_resource_name(#resource{name = Name}) -> + Name. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index d11b1ec14fa8..bf9bc061a27d 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -19,6 +19,7 @@ init/0, close/1, discover/1, + lookup/1, short_alias_of/1, default/0, default_alias/0, @@ -64,6 +65,7 @@ can_redeliver/2, rebalance_module/1, is_replicable/1, + list_replicable/0, stop/1, list_with_minimum_quorum/0, drain/1, @@ -313,6 +315,14 @@ discover(TypeDescriptor) -> {ok, TypeModule} = rabbit_registry:lookup_type_module(queue, TypeDescriptor), TypeModule. +-spec lookup(binary() | atom()) -> {ok, queue_type()} | {error, not_found}. +lookup(<<"undefined">>) -> + fallback(); +lookup(undefined) -> + fallback(); +lookup(TypeDescriptor) -> + rabbit_registry:lookup_type_module(queue, TypeDescriptor). + -spec short_alias_of(TypeDescriptor) -> Ret when TypeDescriptor :: {utf8, binary()} | atom() | binary(), Ret :: binary() | undefined. @@ -906,12 +916,18 @@ rebalance_module(Q) -> Capabilities = TypeModule:capabilities(), maps:get(rebalance_module, Capabilities, undefined). --spec is_replicable(amqqueue:amqqueue()) -> undefine | module(). -is_replicable(Q) -> +-spec is_replicable(amqqueue:amqqueue() | queue_type()) -> false | module(). +is_replicable(Q) when ?is_amqqueue(Q) -> TypeModule = amqqueue:get_type(Q), + is_replicable(TypeModule); +is_replicable(TypeModule) when is_atom(TypeModule) -> Capabilities = TypeModule:capabilities(), maps:get(is_replicable, Capabilities, false). +list_replicable() -> + _ = [TypeModule || + {_Type, TypeModule} <- rabbit_registry:lookup_all(queue), is_replicable(TypeModule)]. + -spec stop(rabbit_types:vhost()) -> ok. stop(VHost) -> %% original rabbit_amqqueue:stop doesn't do any catches or try after diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 9c0e7fd9ca3e..ad5426500768 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -9,6 +9,7 @@ -feature(maybe_expr, enable). -behaviour(rabbit_queue_type). +-behaviour(rabbit_queue_commands). -behaviour(rabbit_policy_validator). -behaviour(rabbit_policy_merge_strategy). @@ -31,7 +32,7 @@ -export([purge/1]). -export([deliver/3]). -export([dead_letter_publish/5]). --export([cluster_state/1, status/2]). +-export([cluster_state/1, status/1]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). -export([become_leader/2, handle_tick/3, spawn_deleter/1]). @@ -55,14 +56,13 @@ -export([transfer_leadership/2, get_replicas/1, queue_length/1]). -export([list_with_minimum_quorum/0, list_with_local_promotable/0, - list_with_local_promotable_for_cli/0, filter_quorum_critical/3, all_replica_states/0]). -export([capabilities/0]). -export([repair_amqqueue_nodes/1, repair_amqqueue_nodes/2 ]). --export([reclaim_memory/2, +-export([reclaim_memory/1, wal_force_roll_over/1]). -export([notify_decorators/1, notify_decorators/3, @@ -501,11 +501,6 @@ list_with_local_promotable() -> #{node() := ReplicaStates} = get_replica_states([node()]), filter_promotable(Queues, ReplicaStates). --spec list_with_local_promotable_for_cli() -> [#{binary() => any()}]. -list_with_local_promotable_for_cli() -> - Qs = list_with_local_promotable(), - lists:map(fun amqqueue:to_printable/1, Qs). - -spec get_replica_states([node()]) -> #{node() => replica_states()}. get_replica_states(Nodes) -> maps:from_list( @@ -1246,11 +1241,10 @@ key_metrics_rpc(ServerId) -> Metrics = ra:key_metrics(ServerId), Metrics#{machine_version => rabbit_fifo:version()}. --spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> +-spec status(rabbit_types:r(queue)) -> [[{binary(), term()}]] | {error, term()}. -status(Vhost, QueueName) -> +status(QName) -> %% Handle not found queues - QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; @@ -1618,9 +1612,8 @@ matches_strategy(even, Members) -> is_match(Subj, E) -> nomatch /= re:run(Subj, E). --spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}. -reclaim_memory(Vhost, QueueName) -> - QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, +-spec reclaim_memory(rabbit_amqqueue:name()) -> ok | {error, term()}. +reclaim_memory(QName) -> case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index dc240e04eee1..2d82c89ddfe8 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -11,6 +11,7 @@ -include("mc.hrl"). -behaviour(rabbit_queue_type). +-behaviour(rabbit_queue_commands). -export([is_enabled/0, is_compatible/3, @@ -68,6 +69,15 @@ queue_vm_stats_sups/0, queue_vm_ets/0]). +%% commands +-export([add_member/5, + list_with_local_promotable/0, + delete_member/3, + peek/2, + status/1, + reclaim_memory/1, + shrink_all/1]). + -include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). @@ -1471,3 +1481,24 @@ queue_vm_stats_sups() -> queue_vm_ets() -> {[], []}. + +add_member(_VHost, _Name, _Node, _Membership, _Timeout) -> + {error, not_quorum_queue}. + +list_with_local_promotable() -> + {error, not_quorum_queue}. + +delete_member(_VHost, _Name, _Node) -> + {error, not_quorum_queue}. + +peek(_Pos, _QName) -> + {error, not_quorum_queue}. + +status(_QName) -> + {error, not_quorum_queue}. + +reclaim_memory(_QName) -> + {error, not_quorum_queue}. + +shrink_all(_Node) -> + {error, not_quorum_queue}. diff --git a/deps/rabbit/src/rabbit_upgrade_preparation.erl b/deps/rabbit/src/rabbit_upgrade_preparation.erl index a6df3572d8de..b859947e12e3 100644 --- a/deps/rabbit/src/rabbit_upgrade_preparation.erl +++ b/deps/rabbit/src/rabbit_upgrade_preparation.erl @@ -8,7 +8,7 @@ -module(rabbit_upgrade_preparation). -export([await_online_quorum_plus_one/1, - list_with_minimum_quorum_for_cli/0]). + with_minimum_quorum/0]). %% %% API @@ -56,8 +56,8 @@ endangered_critical_components() -> do_await_safe_online_quorum(0) -> false; do_await_safe_online_quorum(IterationsLeft) -> - EndangeredQueues = rabbit_queue_type:list_with_minimum_quorum(), - case EndangeredQueues =:= [] andalso endangered_critical_components() =:= [] of + {EndangeredQueues, EndangeredCriticalComponents} = with_minimum_quorum(), + case EndangeredQueues =:= [] andalso EndangeredCriticalComponents =:= [] of true -> true; false -> case IterationsLeft rem ?LOGGING_FREQUENCY of @@ -67,7 +67,7 @@ do_await_safe_online_quorum(IterationsLeft) -> N -> rabbit_log:info("Waiting for ~p queues and streams to have quorum+1 replicas online. " "You can list them with `rabbitmq-diagnostics check_if_node_is_quorum_critical`", [N]) end, - case endangered_critical_components() of + case EndangeredCriticalComponents of [] -> ok; _ -> rabbit_log:info("Waiting for the following critical components to have quorum+1 replicas online: ~p.", [endangered_critical_components()]) @@ -79,13 +79,8 @@ do_await_safe_online_quorum(IterationsLeft) -> do_await_safe_online_quorum(IterationsLeft - 1) end. --spec list_with_minimum_quorum_for_cli() -> [#{binary() => term()}]. -list_with_minimum_quorum_for_cli() -> +-spec with_minimum_quorum() -> {[amqqueue:amqqueue()], [atom()]}. +with_minimum_quorum() -> EndangeredQueues = rabbit_queue_type:list_with_minimum_quorum(), - [amqqueue:to_printable(Q) || Q <- EndangeredQueues] ++ - [#{ - <<"readable_name">> => C, - <<"name">> => C, - <<"virtual_host">> => <<"(not applicable)">>, - <<"type">> => process - } || C <- endangered_critical_components()]. + EndangeredCriticalComponents = endangered_critical_components(), + {EndangeredQueues, EndangeredCriticalComponents}. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 8937ff074cae..674605b2322a 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -3476,14 +3476,14 @@ reclaim_memory_with_wrong_queue_type(Config) -> %% legacy, special case for classic queues ?assertMatch({error, classic_queue_not_supported}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, - reclaim_memory, [<<"/">>, CQ])), + reclaim_memory, [rabbit_misc:queue_resource(<<"/">>, CQ)])), SQ = ?config(alt_queue_name, Config), ?assertEqual({'queue.declare_ok', SQ, 0, 0}, declare(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])), %% all other (future) queue types get the same error ?assertMatch({error, not_quorum_queue}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, - reclaim_memory, [<<"/">>, SQ])), + reclaim_memory, [rabbit_misc:queue_resource(<<"/">>, SQ)])), ok. queue_length_limit_drop_head(Config) -> @@ -3703,7 +3703,7 @@ status(Config) -> T2 /= <<>> andalso T3 /= <<>>, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, - status, [<<"/">>, QQ])), + status, [rabbit_misc:queue_resource(<<"/">>, QQ)])), wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ok. diff --git a/deps/rabbit_common/include/resource.hrl b/deps/rabbit_common/include/resource.hrl index ffff1ba7f37e..73b4426d489f 100644 --- a/deps/rabbit_common/include/resource.hrl +++ b/deps/rabbit_common/include/resource.hrl @@ -5,10 +5,15 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% +-type resource_kind() :: queue | exchange | topic. +-type resource_name() :: binary(). + -record(resource, { - virtual_host, + %% '_'s come from rabbit_table:resource_match + %% 'undefined' is expected by import definitions module + virtual_host, %% :: rabbit_types:vhost() | undefined | '_', %% exchange, queue, topic - kind, + kind, %% :: resource_kind() | '_', %% name as a binary - name + name %% :: resource_name() | '_' }). diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index 09ff33a8cab2..5f7a91b59392 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -100,7 +100,6 @@ -type ok_or_error() :: rabbit_types:ok_or_error(any()). -type thunk(T) :: fun(() -> T). --type resource_name() :: binary(). -type channel_or_connection_exit() :: rabbit_types:channel_exit() | rabbit_types:connection_exit(). -type digraph_label() :: term(). diff --git a/deps/rabbit_common/src/rabbit_registry.erl b/deps/rabbit_common/src/rabbit_registry.erl index 59d36a2921b5..a1580fb9ad48 100644 --- a/deps/rabbit_common/src/rabbit_registry.erl +++ b/deps/rabbit_common/src/rabbit_registry.erl @@ -17,9 +17,14 @@ -export([register/3, unregister/2, binary_to_type/1, lookup_module/2, lookup_type_module/2, lookup_type_name/2, lookup_all/1]). +-export_type([type_descriptor/0]). + -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). +-type type_descriptor() :: atom() | %% either full typemodule or atomized typename + binary(). %% typename or typemodule in binary + -spec start_link() -> rabbit_types:ok_pid_or_error(). -spec register(atom(), binary(), atom()) -> 'ok'. -spec unregister(atom(), binary()) -> 'ok'. @@ -65,8 +70,7 @@ lookup_module(Class, T) when is_atom(T) -> -spec lookup_type_module(Class, TypeDescriptor) -> Ret when Class :: atom(), - TypeDescriptor :: atom() | %% can be TypeModule or Type - binary(), %% or what is currently called "alias" - a TypeName + TypeDescriptor :: type_descriptor(), Ret :: {ok, TypeModule} | {error, not_found}, TypeModule :: atom(). lookup_type_module(Class, TypeDescriptor) -> @@ -80,8 +84,7 @@ lookup_type_module(Class, TypeDescriptor) -> -spec lookup_type_name(Class, TypeDescriptor) -> Ret when Class :: atom(), - TypeDescriptor :: atom() | %% either full typemodule or atomized typename - binary(), %% typename or typemodule in binary + TypeDescriptor :: type_descriptor(), Ret :: {ok, binary()} | {error, not_found}. lookup_type_name(Class, TypeDescriptor) -> case lookup_type(Class, TypeDescriptor) of @@ -113,7 +116,10 @@ lookup_type(Class, TypeDescriptor) %% when we register a type we convert %% typename to atom so we can lookup %% only existing atoms. - lookup_type(Class, binary_to_existing_atom(TypeDescriptor)). + case binary_to_existing_atom_or_not_found(TypeDescriptor) of + {error, _} = E -> E; + Atom -> lookup_type(Class, Atom) + end. lookup_all(Class) -> @@ -121,6 +127,14 @@ lookup_all(Class) -> %%--------------------------------------------------------------------------- +binary_to_existing_atom_or_not_found(TypeDescriptor) -> + try binary_to_existing_atom(TypeDescriptor) + catch + error:badarg -> + {error, not_found} + end. + + internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> binary_to_atom(TypeBin). diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex index d7f9440315c6..cbcafbb90f22 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/add_member_command.ex @@ -73,15 +73,10 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do [name, node] = _args, %{vhost: vhost, node: node_name, timeout: timeout, membership: membership} ) do - args = [vhost, name, to_atom(node)] - args = - case to_atom(membership) do - :promotable -> args ++ [timeout] - other -> args ++ [other, timeout] - end + args = [vhost, name, to_atom(node)] ++ [to_atom(membership), timeout] - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, args) do + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_commands, :add_member, args) do {:error, :classic_queue_not_supported} -> {:error, "Cannot add members to a classic queue"} @@ -99,7 +94,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do def usage_additional do [ - ["", "quorum queue name"], + ["", "queue name"], ["", "node to add a new replica on"], ["--membership ", "add a promotable non-voter (default) or full voter"] ] @@ -113,7 +108,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do def help_section, do: :replication - def description, do: "Adds a quorum queue member (replica) on the given node." + def description, do: "Adds a queue member (replica) on the given node." def banner([name, node], _) do [ diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_new_quorum_queue_replicas_have_finished_initial_sync.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_new_quorum_queue_replicas_have_finished_initial_sync.ex index 823e652e820b..c9cd6f97cba7 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_new_quorum_queue_replicas_have_finished_initial_sync.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_new_quorum_queue_replicas_have_finished_initial_sync.ex @@ -6,7 +6,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNewQuorumQueueReplicasHaveFinishedInitialSyncCommand do @moduledoc """ - Exits with a non-zero code if there are quorum queues + Exits with a non-zero code if there are queues that run "non-voter" (not yet done with their initial sync, promotable to voters) replicas on the current node. @@ -16,29 +16,51 @@ defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNewQuorumQueueReplicasHaveFinished @behaviour RabbitMQ.CLI.CommandBehaviour + defp default_opts, do: %{type: "quorum"} + import RabbitMQ.CLI.Core.Platform, only: [line_separator: 0] def scopes(), do: [:diagnostics, :queues] - use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout - use RabbitMQ.CLI.Core.MergesNoDefaults +# use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout +# use RabbitMQ.CLI.Core.MergesNoDefaults use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments use RabbitMQ.CLI.Core.RequiresRabbitAppRunning - def run([], %{node: node_name, timeout: timeout}) do + def merge_defaults(args, opts) do + default = default_opts() + + {args, Map.merge(default, opts)} + end + + def run([], %{node: node_name, timeout: timeout, type: type}) do case :rabbit_misc.rpc_call( node_name, - :rabbit_quorum_queue, - :list_with_local_promotable_for_cli, - [], + :rabbit_queue_commands, + :list_with_local_promotable, + [type], timeout ) do - [] -> {:ok, []} - qs when is_list(qs) -> {:ok, qs} + {:ok, []} -> {:ok, []} + {:ok, qs} when is_list(qs) -> {:ok, Enum.map(qs, &:amqqueue.to_printable/1)} other -> other end end + def switches(), + do: [ + timeout: :integer, + type: :string + ] + + def output({:error, {:unknown_queue_type, type}}, _args) do + {:error, + %{ + "result" => "error", + "message" => "Unknown queue type #{type}" + }} + end + def output({:ok, []}, %{formatter: "json"}) do {:ok, %{"result" => "ok"}} end @@ -79,7 +101,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNewQuorumQueueReplicasHaveFinished "that run promotable replicas on the current node." end - def usage, do: "check_if_new_quorum_queue_replicas_have_finished_initial_sync" + def usage, do: "check_if_new_quorum_queue_replicas_have_finished_initial_sync [--type type]" def banner([], %{node: node_name}) do "Checking if node #{node_name} runs promotable replicas of any queues ..." diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_node_is_quorum_critical_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_node_is_quorum_critical_command.ex index f558f19ec28d..0978ccf59150 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_node_is_quorum_critical_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/check_if_node_is_quorum_critical_command.ex @@ -43,12 +43,18 @@ defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNodeIsQuorumCriticalCommand do case :rabbit_misc.rpc_call( node_name, :rabbit_upgrade_preparation, - :list_with_minimum_quorum_for_cli, + :list_with_minimum_quorum, [], timeout ) do [] -> {:ok, []} - qs when is_list(qs) -> {:ok, qs} + {qs, ps} when is_list(qs) -> {:ok, Enum.map(qs, &:amqqueue.to_printable/1) + ++ Enum.map(ps, fn p -> %{ + "readable_name" => p, + "name" => p, + "virtual_host" => "(not applicable)", + "type" => "process" + } end)} other -> other end end diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/delete_member_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/delete_member_command.ex index 11538005a82f..e740e064ea78 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/delete_member_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/delete_member_command.ex @@ -18,7 +18,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.DeleteMemberCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppRunning def run([name, node] = _args, %{vhost: vhost, node: node_name}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :delete_member, [ + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_commands, :delete_member, [ vhost, name, to_atom(node) @@ -40,7 +40,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.DeleteMemberCommand do def usage_additional do [ - ["", "quorum queue name"], + ["", "queue name"], ["", "node to remove a new replica on"] ] end @@ -53,7 +53,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.DeleteMemberCommand do def help_section, do: :replication - def description, do: "Removes a quorum queue member (replica) on the given node." + def description, do: "Removes a queue member (replica) on the given node." def banner([name, node], _) do "Removing a replica of queue #{name} on node #{node}..." diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex index f1ada3a383bb..3cbe3bf49b87 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/grow_command.ex @@ -67,15 +67,9 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do membership: membership, errors_only: errors_only }) do - args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)] + args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy), to_atom(membership)] - args = - case to_atom(membership) do - :promotable -> args - other -> args ++ [other] - end - - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, args) do + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_commands, :grow, args) do {:error, _} = error -> error diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/peek_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/peek_command.ex index e0eef8703c4b..f19a0da3a881 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/peek_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/peek_command.ex @@ -43,7 +43,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.PeekCommand do def run([name, pos] = _args, %{node: node_name, vhost: vhost}) do {pos, _} = Integer.parse(pos) - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :peek, [vhost, name, pos]) do + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_commands, :peek, [pos, :rabbit_misc.queue_resource(vhost, name)]) do {:error, :classic_queue_not_supported} -> {:error, "Cannot peek into a classic queue"} diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/quorum_status_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/quorum_status_command.ex index a68f2171b0c7..d1bbe8a72deb 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/quorum_status_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/quorum_status_command.ex @@ -16,7 +16,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.QuorumStatusCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppRunning def run([name] = _args, %{node: node_name, vhost: vhost}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :status, [vhost, name]) do + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_commands, :status, [:rabbit_misc.queue_resource(vhost, name)]) do {:error, :classic_queue_not_supported} -> {:error, "Cannot get quorum status of a classic queue"} diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/reclaim_quorum_memory_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/reclaim_quorum_memory_command.ex index cf70e9f32fe3..ca7e110d38d4 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/reclaim_quorum_memory_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/reclaim_quorum_memory_command.ex @@ -15,7 +15,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.ReclaimQuorumMemoryCommand do use RabbitMQ.CLI.Core.RequiresRabbitAppRunning def run([name] = _args, %{node: node_name, vhost: vhost}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :reclaim_memory, [vhost, name]) do + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_commands, :reclaim_memory, [vhost, name]) do {:error, :classic_queue_not_supported} -> {:error, "This operation is not applicable to classic queues"} diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/shrink_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/shrink_command.ex index d172ccbbba22..6a885888c9ea 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/shrink_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/shrink_command.ex @@ -10,26 +10,29 @@ defmodule RabbitMQ.CLI.Queues.Commands.ShrinkCommand do @behaviour RabbitMQ.CLI.CommandBehaviour + defp default_opts, do: %{type: "quorum", errors_only: false} + def switches() do - [errors_only: :boolean] + [errors_only: :boolean, + type: :string] end def merge_defaults(args, opts) do - {args, Map.merge(%{errors_only: false}, opts)} + {args, Map.merge(default_opts(), opts)} end use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument use RabbitMQ.CLI.Core.RequiresRabbitAppRunning - def run([node], %{node: node_name, errors_only: errs}) do - case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :shrink_all, [to_atom(node)]) do + def run([node], %{node: node_name, errors_only: errs, type: type}) do + case :rabbit_misc.rpc_call(node_name, :rabbit_queue_commands, :shrink_all, [to_atom(node), type]) do {:error, _} = error -> error {:badrpc, _} = error -> error - results when errs -> + {:ok, results} when errs -> for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results, do: [ {:vhost, vhost}, @@ -38,7 +41,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.ShrinkCommand do {:result, format_result(res)} ] - results -> + {:ok, results} -> for {{:resource, vhost, _kind, name}, res} <- results, do: [ {:vhost, vhost}, @@ -54,15 +57,16 @@ defmodule RabbitMQ.CLI.Queues.Commands.ShrinkCommand do def formatter(), do: RabbitMQ.CLI.Formatters.Table def banner([node], _) do - "Shrinking quorum queues on #{node}..." + "Shrinking queues on #{node}..." end - def usage, do: "shrink [--errors-only]" + def usage, do: "shrink [--errors-only] --type[ quorum]" def usage_additional() do [ ["", "node name to remove replicas from"], - ["--errors-only", "only list queues which reported an error"] + ["--errors-only", "only list queues which reported an error"], + ["--type", "only shrink queues of specific type. Default is 'quorum'. Accept 'all' to shrink all types"] ] end @@ -75,12 +79,20 @@ defmodule RabbitMQ.CLI.Queues.Commands.ShrinkCommand do def help_section, do: :cluster_management def description, - do: "Shrinks quorum queue clusters by removing any members (replicas) on the given node." + do: "Shrinks --type queue clusters by removing any members (replicas) on the given node." # # Implementation # + def format_output({:error, {:unknown_queue_type, type}}, _args) do + {:error, + %{ + "result" => "error", + "message" => "Unknown queue type #{type}" + }} + end + defp format_size({:ok, size}) do size end diff --git a/deps/rabbitmq_cli/test/queues/check_if_new_quorum_queue_replicas_have_finished_initial_sync_test.exs b/deps/rabbitmq_cli/test/queues/check_if_new_quorum_queue_replicas_have_finished_initial_sync_test.exs index abeb4d161cfc..369b025e9a47 100644 --- a/deps/rabbitmq_cli/test/queues/check_if_new_quorum_queue_replicas_have_finished_initial_sync_test.exs +++ b/deps/rabbitmq_cli/test/queues/check_if_new_quorum_queue_replicas_have_finished_initial_sync_test.exs @@ -40,6 +40,6 @@ defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNewQuorumQueueReplicasHaveFinished @tag test_timeout: 3000 test "run: targeting an unreachable node throws a badrpc" do - assert match?({:badrpc, _}, @command.run([], %{node: :jake@thedog, vhost: "/", timeout: 200})) + assert match?({:badrpc, _}, @command.run([], %{node: :jake@thedog, vhost: "/", timeout: 200, type: "quorum"})) end end diff --git a/deps/rabbitmq_cli/test/queues/check_if_node_is_quorum_critical_command_test.exs b/deps/rabbitmq_cli/test/queues/check_if_node_is_quorum_critical_command_test.exs index 26cb00f0816f..f24562963d29 100644 --- a/deps/rabbitmq_cli/test/queues/check_if_node_is_quorum_critical_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/check_if_node_is_quorum_critical_command_test.exs @@ -40,6 +40,6 @@ defmodule RabbitMQ.CLI.Queues.Commands.CheckIfNodeIsQuorumCriticalCommandTest do @tag test_timeout: 3000 test "run: targeting an unreachable node throws a badrpc" do - assert match?({:badrpc, _}, @command.run([], %{node: :jake@thedog, vhost: "/", timeout: 200})) + assert match?({:badrpc, _}, @command.run([], %{node: :jake@thedog, vhost: "/", timeout: 200, type: "quorum"})) end end diff --git a/deps/rabbitmq_cli/test/queues/shrink_command_test.exs b/deps/rabbitmq_cli/test/queues/shrink_command_test.exs index b11abbd517e5..11a6e9f0bf0b 100644 --- a/deps/rabbitmq_cli/test/queues/shrink_command_test.exs +++ b/deps/rabbitmq_cli/test/queues/shrink_command_test.exs @@ -26,7 +26,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.ShrinkCommandTest do end test "merge_defaults: defaults to reporting complete results" do - assert @command.merge_defaults([], %{}) == {[], %{errors_only: false}} + assert @command.merge_defaults([], %{}) == {[], %{errors_only: false, type: "quorum"}} end test "validate: when no arguments are provided, returns a failure" do @@ -55,7 +55,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.ShrinkCommandTest do {:badrpc, _}, @command.run( ["quorum-queue-a"], - Map.merge(context[:opts], %{node: :jake@thedog, vhost: "/", timeout: 200}) + Map.merge(context[:opts], %{node: :jake@thedog, vhost: "/", timeout: 200, type: "quorum"}) ) ) end diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_health_check_node_is_quorum_critical.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_health_check_node_is_quorum_critical.erl index 727df49321e2..5629f4478eb4 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_health_check_node_is_quorum_critical.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_health_check_node_is_quorum_critical.erl @@ -34,12 +34,19 @@ to_json(ReqData, Context) -> rabbit_mgmt_util:reply(#{status => ok, reason => <<"single node cluster">>}, ReqData, Context); false -> - case rabbit_upgrade_preparation:list_with_minimum_quorum_for_cli() of - [] -> + case rabbit_upgrade_preparation:with_minimum_quorum() of + {[], []} -> rabbit_mgmt_util:reply(#{status => ok}, ReqData, Context); - Qs when length(Qs) > 0 -> - Msg = <<"There are quorum queues that would lose their quorum if the target node is shut down">>, - failure(Msg, Qs, ReqData, Context) + {Queues, Components} when length(Queues) > 0 orelse length(Components)> 0 -> + Msg = <<"There are queues and/or critical components that would lose their quorum if the target node is shut down">>, + QueuesAndComponents = [amqqueue:to_printable(Q) || Q <- Queues] ++ + [#{ + <<"readable_name">> => C, + <<"name">> => C, + <<"virtual_host">> => <<"(not applicable)">>, + <<"type">> => process + } || C <- Components], + failure(Msg, QueuesAndComponents, ReqData, Context) end end. diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl index 232a60f62609..9798e1d4b4b5 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_add_member.erl @@ -41,7 +41,7 @@ accept_content(ReqData, Context) -> rabbit_amqqueue:with( rabbit_misc:r(VHost, queue, QName), fun(_Q) -> - rabbit_quorum_queue:add_member( + rabbit_queue_commands:add_member( VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode), diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl index c336d5497d77..ac2d51bd5a81 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_delete_member.erl @@ -38,7 +38,7 @@ delete_resource(ReqData, Context) -> rabbit_amqqueue:with( rabbit_misc:r(VHost, queue, QName), fun(_Q) -> - rabbit_quorum_queue:delete_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode)) + rabbit_queue_commands:delete_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode)) end) end), case Res of diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl index d4cbb165aac5..eb6b4754e379 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_replicas_grow.erl @@ -39,7 +39,7 @@ accept_content(ReqData, Context) -> [vhost_pattern, queue_pattern, strategy], ReqData, Context, fun([VHPattern, QPattern, Strategy], Body, _ReqData) -> Membership = maps:get(<<"membership">>, Body, promotable), - rabbit_quorum_queue:grow( + rabbit_queue_commands:grow( rabbit_data_coercion:to_atom(NewReplicaNode), VHPattern, QPattern, diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_status.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_status.erl index d416e327c79a..2a794e8e5043 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_status.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_quorum_queue_status.erl @@ -51,7 +51,7 @@ queue(ReqData) -> queue(VHost, QName) -> Name = rabbit_misc:r(VHost, queue, QName), case rabbit_amqqueue:lookup(Name) of - {ok, _Q} -> rabbit_quorum_queue:status(VHost, QName); + {ok, _Q} -> rabbit_quorum_queue:status(Name); {error, not_found} -> not_found end.