From eb4aaffc1c0787370c913af6a87d62e4f858e1f1 Mon Sep 17 00:00:00 2001 From: kallebysantos Date: Tue, 21 Jan 2025 22:52:14 +0000 Subject: [PATCH] stamp: using `handle_cast` on worker to avoid timeouts Using async communication between `Extractor` and `Worker` --- Dockerfile | 2 +- config/config.exs | 2 +- lib/ocrlot/extractor.ex | 51 +++++++++++++++----------- lib/ocrlot/extractor/worker.ex | 57 ++++++++++++++++++----------- lib/ocrlot/extractor/worker_pool.ex | 11 ++---- 5 files changed, 71 insertions(+), 52 deletions(-) diff --git a/Dockerfile b/Dockerfile index 7a50de5..ec4fa06 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM elixir:1.14.2 +FROM elixir:1.17-otp-27-slim # Update registry and install tesseract and dependencies RUN apt-get update -qq \ diff --git a/config/config.exs b/config/config.exs index 724056e..2b37702 100644 --- a/config/config.exs +++ b/config/config.exs @@ -10,7 +10,7 @@ import Config config :ocrlot, generators: [timestamp_type: :utc_datetime], # Each worker can take ~120Mib - extractor_max_workers: String.to_integer(System.get_env("EXTRACTOR_MAX_WORKERS") || "1") + extractor_max_workers: String.to_integer(System.get_env("EXTRACTOR_MAX_WORKERS") || "3") # Configures the endpoint config :ocrlot, OcrlotWeb.Endpoint, diff --git a/lib/ocrlot/extractor.ex b/lib/ocrlot/extractor.ex index e86b6b2..fe8f351 100644 --- a/lib/ocrlot/extractor.ex +++ b/lib/ocrlot/extractor.ex @@ -18,8 +18,8 @@ defmodule Ocrlot.Extractor do end defmodule Result do - @enforce_keys [:filepath, :content] - defstruct [:filepath, :content] + @enforce_keys [:content] + defstruct [:content] end ## Public API @@ -31,37 +31,46 @@ defmodule Ocrlot.Extractor do def init(%Args{} = args) do PubSub.subscribe(Ocrlot.PubSub, args.in) - {:ok, args} + process_refs = %{} + + {:ok, {args, process_refs}} end @impl true - def handle_info({:process, %Payload{} = payload, metadata}, %Args{} = state) do + def handle_info({:process, %Payload{} = payload, metadata}, {%Args{} = args, process_refs}) do case WorkerPool.start_child() do {:ok, worker} -> - Task.Supervisor.start_child(Ocrlot.Converter.TaskSupervisor, fn -> - {:ok, content} = Worker.process(worker, payload) - - result = %Result{ - filepath: payload.filepath, - content: content - } + {:ok, process_id} = Worker.process(worker, payload, self()) - message = state.mapper.({result, metadata}) + process_refs = Map.put(process_refs, process_id, metadata) - PubSub.broadcast(Ocrlot.PubSub, state.out, message) - end) + {:noreply, {args, process_refs}} {:error, :max_children} -> - requeue(payload) + requeue(payload, metadata) + + {:noreply, {args, process_refs}} end + end - {:noreply, state} + @impl true + def handle_info( + {:extractor_worker_complete, process_id, %Result{} = result}, + {%Args{} = args, process_refs} + ) do + {metadata, process_refs} = Map.pop(process_refs, process_id) + + message = args.mapper.({result, metadata}) + + PubSub.broadcast(Ocrlot.PubSub, args.out, message) + + {:noreply, {args, process_refs}} end @impl true - def handle_info({:requeue, %Payload{} = payload}, state) do - PubSub.local_broadcast(Ocrlot.PubSub, state.in, {:process, payload}) - {:noreply, state} + def handle_info({:requeue, %Payload{} = payload, metadata}, {%Args{} = args, process_refs}) do + PubSub.local_broadcast(Ocrlot.PubSub, args.in, {:process, payload, metadata}) + {:noreply, {args, process_refs}} end @impl true @@ -72,7 +81,7 @@ defmodule Ocrlot.Extractor do {:noreply, state} end - defp requeue(%Payload{} = payload) do - Process.send_after(self(), {:requeue, payload}, 3_000) + defp requeue(%Payload{} = payload, metadata) do + Process.send_after(self(), {:requeue, payload, metadata}, 3_000) end end diff --git a/lib/ocrlot/extractor/worker.ex b/lib/ocrlot/extractor/worker.ex index 86fa0eb..3cd7eea 100644 --- a/lib/ocrlot/extractor/worker.ex +++ b/lib/ocrlot/extractor/worker.ex @@ -1,4 +1,5 @@ defmodule Ocrlot.Extractor.Worker do + alias Ocrlot.Extractor.Result alias Ocrlot.Extractor.Payload alias Ocrlot.Extractor @@ -8,19 +9,24 @@ defmodule Ocrlot.Extractor.Worker do # Client APIs def start_link(args), do: GenServer.start_link(__MODULE__, args) - def try_lock(pid), do: GenServer.call(pid, :try_lock, 10_000) + # def try_lock(pid), do: GenServer.call(pid, :try_lock, 10_000) - def process(pid, %Payload{} = params), do: GenServer.call(pid, {:process, params}, 10_000) + def process(pid, %Payload{} = params, receiver) do + process_id = System.unique_integer() + + GenServer.cast(pid, {:process, process_id, params, receiver}) + + {:ok, process_id} + end # Callbacks @impl true - def init(state), do: {:ok, {state, nil}} + def init(_), do: {:ok, {}} @impl true - def handle_call( - {:process, %Payload{filepath: filepath, languages: langs}}, - _, - {:lock, idle_ref} + def handle_cast( + {:process, process_id, %Payload{filepath: filepath, languages: langs}, receiver}, + idle_ref ) do if is_reference(idle_ref) do Process.cancel_timer(idle_ref) @@ -36,28 +42,35 @@ defmodule Ocrlot.Extractor.Worker do "6" ] - {result, 0} = System.cmd("tesseract", opts) + {ocr_output, 0} = System.cmd("tesseract", opts) + + result = %Result{ + content: ocr_output + } + + send(receiver, {:extractor_worker_complete, process_id, result}) idle_ref = Process.send_after(self(), :terminate, terminate_worker_after()) - {:reply, {:ok, result}, {:waiting, idle_ref}} + + {:noreply, idle_ref} end - @impl true - def handle_call({:process, _}, _, {:waiting, idle_ref}), - do: {:reply, {:error, :not_locked}, {:waiting, idle_ref}} + # @impl true + # def handle_call({:process, _}, _, {:waiting, idle_ref}), + # do: {:reply, {:error, :not_locked}, {:waiting, idle_ref}} - @impl true - def handle_call(:try_lock, _, {:waiting, idle_ref}) do - if is_reference(idle_ref) do - Process.cancel_timer(idle_ref) - end + # @impl true + # def handle_call(:try_lock, _, {:waiting, idle_ref}) do + # if is_reference(idle_ref) do + # Process.cancel_timer(idle_ref) + # end - {:reply, :ok, {:lock, idle_ref}} - end + # {:reply, :ok, {:lock, idle_ref}} + # end - @impl true - def handle_call(:try_lock, _, {:lock, idle_ref}), - do: {:reply, :error, {:lock, idle_ref}} + # @impl true + # def handle_call(:try_lock, _, {:lock, idle_ref}), + # do: {:reply, :error, {:lock, idle_ref}} @impl true def handle_info(:terminate, state) do diff --git a/lib/ocrlot/extractor/worker_pool.ex b/lib/ocrlot/extractor/worker_pool.ex index e9d8a27..068e923 100644 --- a/lib/ocrlot/extractor/worker_pool.ex +++ b/lib/ocrlot/extractor/worker_pool.ex @@ -17,7 +17,7 @@ defmodule Ocrlot.Extractor.WorkerPool do do: DynamicSupervisor.init(strategy: :one_for_one, max_children: max_children()) def start_child do - case DynamicSupervisor.start_child(__MODULE__, {Worker, :lock}) do + case DynamicSupervisor.start_child(__MODULE__, Worker) do {:ok, pid} -> {:ok, pid} {:error, :max_children} -> round_robin(max_children()) error -> error @@ -36,11 +36,8 @@ defmodule Ocrlot.Extractor.WorkerPool do end) case Enum.at(workers, rem(counter, length(workers))) do - {:undefined, pid, _, _} -> - case Worker.try_lock(pid) do - :ok -> {:ok, pid} - _ -> round_robin(retries - 1) - end + {:undefined, worker_pid, _, _} -> + {:ok, worker_pid} _ -> round_robin(retries - 1) @@ -50,5 +47,5 @@ defmodule Ocrlot.Extractor.WorkerPool do defp round_robin(0), do: {:error, :no_workers_available} - defp max_children(), do: Application.fetch_env!(:ocrlot, :extractor_max_workers) |> dbg() + defp max_children(), do: Application.fetch_env!(:ocrlot, :extractor_max_workers) end