Skip to content

Commit e05d8cf

Browse files
the-mikedavismergify[bot]
authored andcommitted
Handle multiple alarms consistently in protocol readers
The MQTT, WebMQTT and STOMP reader modules tracked whether they were blocked by any alarm, but they should instead keep a set of active alarms. Only tracking the `Conserve` part of the alarm notification from `rabbit_alarm` doesn't work correctly if a cluster enters and exits multiple alarms. For example if a cluster had both memory and disk alarms active and then cleared memory, these readers would unblock while they should remain blocked until both alarms are cleared. Keeping a set of `rabbit_alarm:resource_alarm_source()`s matches the AMQP 0-9-1 and 1.0 readers. This change also updates the 0-9-1 reader to use map-based `sets` for the sake of consistency. (cherry picked from commit 971d32e) (cherry picked from commit af6d29d)
1 parent d0d769e commit e05d8cf

File tree

4 files changed

+55
-30
lines changed

4 files changed

+55
-30
lines changed

deps/rabbit/src/rabbit_reader.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
335335
throttle = #throttle{
336336
last_blocked_at = never,
337337
should_block = false,
338-
blocked_by = sets:new(),
338+
blocked_by = sets:new([{version, 2}]),
339339
connection_blocked_message_sent = false
340340
},
341341
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)},
@@ -1310,7 +1310,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
13101310
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
13111311

13121312
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
1313-
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]),
1313+
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms], [{version, 2}]),
13141314
Throttle1 = Throttle#throttle{blocked_by = BlockedBy},
13151315

13161316
{ok, ChannelSupSupPid} =

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
3737
rabbit_mqtt_processor:state(),
3838
connection_state :: running | blocked,
39-
conserve :: boolean(),
39+
blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()),
4040
stats_timer :: rabbit_event:state(),
4141
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
4242
conn_name :: binary()
@@ -53,8 +53,8 @@ start_link(Ref, _Transport, []) ->
5353
-spec conserve_resources(pid(),
5454
rabbit_alarm:resource_alarm_source(),
5555
rabbit_alarm:resource_alert()) -> ok.
56-
conserve_resources(Pid, _, {_, Conserve, _}) ->
57-
Pid ! {conserve_resources, Conserve},
56+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
57+
Pid ! {conserve_resources, Source, Conserve},
5858
ok.
5959

6060
-spec info(pid(), rabbit_types:info_keys()) ->
@@ -78,15 +78,15 @@ init(Ref) ->
7878
{ok, ConnStr} ->
7979
ConnName = rabbit_data_coercion:to_binary(ConnStr),
8080
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
81-
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
81+
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
8282
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
8383
erlang:send_after(LoginTimeout, self(), login_timeout),
8484
State0 = #state{socket = RealSocket,
8585
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
8686
conn_name = ConnName,
8787
await_recv = false,
8888
connection_state = running,
89-
conserve = false,
89+
blocked_by = sets:from_list(Alarms, [{version, 2}]),
9090
parse_state = rabbit_mqtt_packet:init_state(),
9191
stats_timer = rabbit_event:init_stats_timer()},
9292
State = control_throttle(State0),
@@ -185,9 +185,16 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
185185
when Tag =:= tcp_error; Tag =:= ssl_error ->
186186
network_error(Reason, State);
187187

188-
handle_info({conserve_resources, Conserve}, State) ->
188+
handle_info({conserve_resources, Source, Conserve},
189+
#state{blocked_by = BlockedBy0} = State) ->
190+
BlockedBy = case Conserve of
191+
true ->
192+
sets:add_element(Source, BlockedBy0);
193+
false ->
194+
sets:del_element(Source, BlockedBy0)
195+
end,
189196
maybe_process_deferred_recv(
190-
control_throttle(State #state{ conserve = Conserve }));
197+
control_throttle(State #state{ blocked_by = BlockedBy }));
191198

192199
handle_info({bump_credit, Msg}, State) ->
193200
credit_flow:handle_bump_msg(Msg),
@@ -417,10 +424,11 @@ run_socket(State = #state{ socket = Sock }) ->
417424
State#state{ await_recv = true }.
418425

419426
control_throttle(State = #state{connection_state = ConnState,
420-
conserve = Conserve,
427+
blocked_by = BlockedBy,
421428
proc_state = PState,
422429
keepalive = KState
423430
}) ->
431+
Conserve = not sets:is_empty(BlockedBy),
424432
Throttle = case PState of
425433
connect_packet_unprocessed -> Conserve;
426434
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
@@ -537,7 +545,7 @@ format_state(#state{socket = Socket,
537545
parse_state = _,
538546
proc_state = PState,
539547
connection_state = ConnectionState,
540-
conserve = Conserve,
548+
blocked_by = BlockedBy,
541549
stats_timer = StatsTimer,
542550
keepalive = Keepalive,
543551
conn_name = ConnName
@@ -552,7 +560,7 @@ format_state(#state{socket = Socket,
552560
rabbit_mqtt_processor:format_status(PState)
553561
end,
554562
connection_state => ConnectionState,
555-
conserve => Conserve,
563+
blocked_by => lists:sort(sets:to_list(BlockedBy)),
556564
stats_timer => StatsTimer,
557565
keepalive => Keepalive,
558566
conn_name => ConnName}.

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
parse_state,
3434
processor_state,
3535
state,
36-
conserve_resources,
36+
blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()),
3737
recv_outstanding,
3838
max_frame_size,
3939
current_frame_size,
@@ -82,7 +82,7 @@ init([SupHelperPid, Ref, Configuration]) ->
8282
[self(), ConnName]),
8383

8484
ParseState = rabbit_stomp_frame:initial_state(),
85-
_ = register_resource_alarm(),
85+
Alarms = register_resource_alarm(),
8686

8787
LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
8888
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
@@ -100,7 +100,7 @@ init([SupHelperPid, Ref, Configuration]) ->
100100
max_frame_size = MaxFrameSize,
101101
current_frame_size = 0,
102102
state = running,
103-
conserve_resources = false,
103+
blocked_by = sets:from_list(Alarms, [{version, 2}]),
104104
recv_outstanding = false})), #reader_state.stats_timer),
105105
{backoff, 1000, 1000, 10000});
106106
{error, enotconn} ->
@@ -146,8 +146,15 @@ handle_info({Tag, Sock, Reason}, State=#reader_state{socket=Sock})
146146
{stop, {inet_error, Reason}, State};
147147
handle_info(emit_stats, State) ->
148148
{noreply, emit_stats(State), hibernate};
149-
handle_info({conserve_resources, Conserve}, State) ->
150-
NewState = State#reader_state{conserve_resources = Conserve},
149+
handle_info({conserve_resources, Source, Conserve},
150+
#reader_state{blocked_by = BlockedBy0} = State) ->
151+
BlockedBy = case Conserve of
152+
true ->
153+
sets:add_element(Source, BlockedBy0);
154+
false ->
155+
sets:del_element(Source, BlockedBy0)
156+
end,
157+
NewState = State#reader_state{blocked_by = BlockedBy},
151158
{noreply, run_socket(control_throttle(NewState)), hibernate};
152159
handle_info({bump_credit, Msg}, State) ->
153160
credit_flow:handle_bump_msg(Msg),
@@ -288,18 +295,19 @@ process_received_bytes(Bytes,
288295
-spec conserve_resources(pid(),
289296
rabbit_alarm:resource_alarm_source(),
290297
rabbit_alarm:resource_alert()) -> ok.
291-
conserve_resources(Pid, _Source, {_, Conserve, _}) ->
292-
Pid ! {conserve_resources, Conserve},
298+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
299+
Pid ! {conserve_resources, Source, Conserve},
293300
ok.
294301

295302
register_resource_alarm() ->
296303
rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}).
297304

298305

299-
control_throttle(State = #reader_state{state = CS,
300-
conserve_resources = Mem,
306+
control_throttle(State = #reader_state{state = CS,
307+
blocked_by = BlockedBy,
301308
heartbeat = Heartbeat}) ->
302-
case {CS, Mem orelse credit_flow:blocked()} of
309+
Conserve = not sets:is_empty(BlockedBy),
310+
case {CS, Conserve orelse credit_flow:blocked()} of
303311
{running, true} -> State#reader_state{state = blocking};
304312
{blocking, false} -> rabbit_heartbeat:resume_monitor(Heartbeat),
305313
State#reader_state{state = running};

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
3939
rabbit_mqtt_processor:state(),
4040
connection_state = running :: running | blocked,
41-
conserve = false :: boolean(),
41+
blocked_by = sets:new([{version, 2}]) :: sets:set(rabbit_alarm:resource_alarm_source()),
4242
stats_timer :: option(rabbit_event:state()),
4343
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
4444
conn_name :: option(binary())
@@ -110,8 +110,9 @@ websocket_init(State0 = #state{socket = Sock}) ->
110110
{ok, ConnStr} ->
111111
ConnName = rabbit_data_coercion:to_binary(ConnStr),
112112
?LOG_INFO("Accepting Web MQTT connection ~s", [ConnName]),
113-
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
114-
State = State0#state{conn_name = ConnName},
113+
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
114+
State = State0#state{conn_name = ConnName,
115+
blocked_by = sets:from_list(Alarms, [{version, 2}])},
115116
process_flag(trap_exit, true),
116117
{[], State, hibernate};
117118
{error, Reason} ->
@@ -121,8 +122,8 @@ websocket_init(State0 = #state{socket = Sock}) ->
121122
-spec conserve_resources(pid(),
122123
rabbit_alarm:resource_alarm_source(),
123124
rabbit_alarm:resource_alert()) -> ok.
124-
conserve_resources(Pid, _, {_, Conserve, _}) ->
125-
Pid ! {conserve_resources, Conserve},
125+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
126+
Pid ! {conserve_resources, Source, Conserve},
126127
ok.
127128

128129
-spec websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) ->
@@ -145,8 +146,15 @@ websocket_handle(Frame, State) ->
145146
-spec websocket_info(any(), State) ->
146147
{cowboy_websocket:commands(), State} |
147148
{cowboy_websocket:commands(), State, hibernate}.
148-
websocket_info({conserve_resources, Conserve}, State) ->
149-
handle_credits(State#state{conserve = Conserve});
149+
websocket_info({conserve_resources, Source, Conserve},
150+
#state{blocked_by = BlockedBy0} = State) ->
151+
BlockedBy = case Conserve of
152+
true ->
153+
sets:add_element(Source, BlockedBy0);
154+
false ->
155+
sets:del_element(Source, BlockedBy0)
156+
end,
157+
handle_credits(State#state{blocked_by = BlockedBy});
150158
websocket_info({bump_credit, Msg}, State) ->
151159
credit_flow:handle_bump_msg(Msg),
152160
handle_credits(State);
@@ -371,10 +379,11 @@ handle_credits(State0) ->
371379
{[{active, Active}], State, hibernate}.
372380

373381
control_throttle(State = #state{connection_state = ConnState,
374-
conserve = Conserve,
382+
blocked_by = BlockedBy,
375383
proc_state = PState,
376384
keepalive = KState
377385
}) ->
386+
Conserve = not sets:is_empty(BlockedBy),
378387
Throttle = case PState of
379388
connect_packet_unprocessed -> Conserve;
380389
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)

0 commit comments

Comments
 (0)