diff --git a/installer/templates/phx_web/telemetry.ex b/installer/templates/phx_web/telemetry.ex index 4edb792901..5662c7f7c9 100644 --- a/installer/templates/phx_web/telemetry.ex +++ b/installer/templates/phx_web/telemetry.ex @@ -43,6 +43,7 @@ defmodule <%= @web_namespace %>.Telemetry do summary("phoenix.socket_connected.duration", unit: {:native, :millisecond} ), + sum("phoenix.socket_drain.count"), summary("phoenix.channel_joined.duration", unit: {:native, :millisecond} ), diff --git a/lib/phoenix/endpoint.ex b/lib/phoenix/endpoint.ex index 0fceb6f4d9..5a6cde1999 100644 --- a/lib/phoenix/endpoint.ex +++ b/lib/phoenix/endpoint.ex @@ -784,6 +784,8 @@ defmodule Phoenix.Endpoint do batch to terminate. Defaults to 2000ms. * `:shutdown` - The maximum amount of time in milliseconds allowed to drain all batches. Defaults to 30000ms. + * `:log` - the log level for drain actions. Defaults the `:log` option + passed to `use Phoenix.Socket` or `:info`. Set it to `false` to disable logging. For example, if you have 150k connections, the default values will split them into 15 batches of 10k connections. Each batch takes diff --git a/lib/phoenix/logger.ex b/lib/phoenix/logger.ex index 6a19b6a532..f467887362 100644 --- a/lib/phoenix/logger.ex +++ b/lib/phoenix/logger.ex @@ -58,6 +58,11 @@ defmodule Phoenix.Logger do * Metadata: `%{endpoint: atom, transport: atom, params: term, connect_info: map, vsn: binary, user_socket: atom, result: :ok | :error, serializer: atom, log: Logger.level | false}` * Disable logging: `use Phoenix.Socket, log: false` or `socket "/foo", MySocket, websocket: [log: false]` in your endpoint + * `[:phoenix, :socket_drain]` - dispatched by `Phoenix.Socket` when using the `:drainer` option + * Measurement: `%{count: integer, total: integer, index: integer, rounds: integer}` + * Metadata: `%{endpoint: atom, socket: atom, intervasl: integer, log: Logger.level | false}` + * Disable logging: `use Phoenix.Socket, log: false` in your endpoint or pass `:log` option in the `:drainer` option + * `[:phoenix, :channel_joined]` - dispatched at the end of a channel join * Measurement: `%{duration: native_time}` * Metadata: `%{result: :ok | :error, params: term, socket: Phoenix.Socket.t}` @@ -134,6 +139,7 @@ defmodule Phoenix.Logger do [:phoenix, :router_dispatch, :start] => &__MODULE__.phoenix_router_dispatch_start/4, [:phoenix, :error_rendered] => &__MODULE__.phoenix_error_rendered/4, [:phoenix, :socket_connected] => &__MODULE__.phoenix_socket_connected/4, + [:phoenix, :socket_drain] => &__MODULE__.phoenix_socket_drain/4, [:phoenix, :channel_joined] => &__MODULE__.phoenix_channel_joined/4, [:phoenix, :channel_handled_in] => &__MODULE__.phoenix_channel_handled_in/4 } @@ -339,6 +345,22 @@ defmodule Phoenix.Logger do defp connect_result(:ok), do: "CONNECTED TO " defp connect_result(:error), do: "REFUSED CONNECTION TO " + @doc false + def phoenix_socket_drain(_, _, %{log: false}, _), do: :ok + + def phoenix_socket_drain(_, %{count: count, total: total, index: index, rounds: rounds}, %{log: level} = meta, _) do + Logger.log(level, fn -> + %{socket: socket, interval: interval} = meta + + [ + "DRAINING #{count} of #{total} total connection(s) for socket ", + inspect(socket), + " every #{interval}ms - ", + "round #{index} of #{rounds}" + ] + end) + end + ## Event: [:phoenix, :channel_joined] @doc false diff --git a/lib/phoenix/socket.ex b/lib/phoenix/socket.ex index 7963ef31e8..00c4885c60 100644 --- a/lib/phoenix/socket.ex +++ b/lib/phoenix/socket.ex @@ -464,10 +464,14 @@ defmodule Phoenix.Socket do case drainer do {module, function, arguments} -> apply(module, function, arguments) + _ -> drainer end - {Phoenix.Socket.PoolDrainer, {endpoint, handler, drainer}} + + opts = Keyword.merge(opts, drainer: drainer) + + {Phoenix.Socket.PoolDrainer, {endpoint, handler, opts}} else :ignore end diff --git a/lib/phoenix/socket/pool_supervisor.ex b/lib/phoenix/socket/pool_supervisor.ex index 80cb8b510f..f019d0939e 100644 --- a/lib/phoenix/socket/pool_supervisor.ex +++ b/lib/phoenix/socket/pool_supervisor.ex @@ -75,7 +75,7 @@ defmodule Phoenix.Socket.PoolDrainer do %{ id: {:terminator, name}, start: {__MODULE__, :start_link, [tuple]}, - shutdown: Keyword.get(opts, :shutdown, 30_000) + shutdown: Keyword.get(opts[:drainer], :shutdown, 30_000) } end @@ -86,13 +86,14 @@ defmodule Phoenix.Socket.PoolDrainer do @impl true def init({endpoint, name, opts}) do Process.flag(:trap_exit, true) - size = Keyword.get(opts, :batch_size, 10_000) - interval = Keyword.get(opts, :batch_interval, 2_000) - {:ok, {endpoint, name, size, interval}} + size = Keyword.get(opts[:drainer], :batch_size, 10_000) + interval = Keyword.get(opts[:drainer], :batch_interval, 2_000) + log_level = Keyword.get(opts[:drainer], :log, opts[:log] || :info) + {:ok, {endpoint, name, size, interval, log_level}} end @impl true - def terminate(_reason, {endpoint, name, size, interval}) do + def terminate(_reason, {endpoint, name, size, interval, log_level}) do ets = endpoint.config({:socket, name}) partitions = :ets.lookup_element(ets, :partitions, 2) @@ -109,12 +110,21 @@ defmodule Phoenix.Socket.PoolDrainer do rounds = div(total, size) + 1 - if total != 0 do - Logger.info("Shutting down #{total} sockets in #{rounds} rounds of #{interval}ms") - end - for {pids, index} <- collection |> Stream.concat() |> Stream.chunk_every(size) |> Stream.with_index(1) do + count = if index == rounds, do: length(pids), else: size + + :telemetry.execute( + [:phoenix, :socket_drain], + %{count: count, total: total, index: index, rounds: rounds}, + %{ + endpoint: endpoint, + socket: name, + interval: interval, + log: log_level + } + ) + spawn(fn -> for pid <- pids do send(pid, %Phoenix.Socket.Broadcast{event: "phx_drain"}) diff --git a/test/phoenix/socket/socket_test.exs b/test/phoenix/socket/socket_test.exs index d089a8de29..9ac57b4259 100644 --- a/test/phoenix/socket/socket_test.exs +++ b/test/phoenix/socket/socket_test.exs @@ -76,7 +76,7 @@ defmodule Phoenix.SocketTest do test "merges keyword lists" do socket = %Phoenix.Socket{} socket = assign(socket, %{foo: :bar, abc: :def}) - socket = assign(socket, [foo: :baz]) + socket = assign(socket, foo: :baz) assert socket.assigns[:foo] == :baz assert socket.assigns[:abc] == :def end @@ -109,7 +109,8 @@ defmodule Phoenix.SocketTest do ] assert DrainerSpecSocket.drainer_spec(drainer: drainer_spec, endpoint: Endpoint) == - {Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}} + {Phoenix.Socket.PoolDrainer, + {Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}} end test "loads dynamic drainer config" do @@ -119,7 +120,8 @@ defmodule Phoenix.SocketTest do drainer: {DrainerSpecSocket, :dynamic_drainer_config, []}, endpoint: Endpoint ) == - {Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}} + {Phoenix.Socket.PoolDrainer, + {Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}} end test "returns ignore if drainer is set to false" do