Skip to content

Commit

Permalink
stamp: using handle_cast on worker to avoid timeouts
Browse files Browse the repository at this point in the history
Using async communication between `Extractor` and `Worker`
kallebysantos committed Jan 21, 2025

Verified

This commit was signed with the committer’s verified signature.
kallebysantos Kalleby Santos
1 parent d063e7a commit eb4aaff
Showing 5 changed files with 71 additions and 52 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM elixir:1.14.2
FROM elixir:1.17-otp-27-slim

# Update registry and install tesseract and dependencies
RUN apt-get update -qq \
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ import Config
config :ocrlot,
generators: [timestamp_type: :utc_datetime],
# Each worker can take ~120Mib
extractor_max_workers: String.to_integer(System.get_env("EXTRACTOR_MAX_WORKERS") || "1")
extractor_max_workers: String.to_integer(System.get_env("EXTRACTOR_MAX_WORKERS") || "3")

# Configures the endpoint
config :ocrlot, OcrlotWeb.Endpoint,
51 changes: 30 additions & 21 deletions lib/ocrlot/extractor.ex
Original file line number Diff line number Diff line change
@@ -18,8 +18,8 @@ defmodule Ocrlot.Extractor do
end

defmodule Result do
@enforce_keys [:filepath, :content]
defstruct [:filepath, :content]
@enforce_keys [:content]
defstruct [:content]
end

## Public API
@@ -31,37 +31,46 @@ defmodule Ocrlot.Extractor do
def init(%Args{} = args) do
PubSub.subscribe(Ocrlot.PubSub, args.in)

{:ok, args}
process_refs = %{}

{:ok, {args, process_refs}}
end

@impl true
def handle_info({:process, %Payload{} = payload, metadata}, %Args{} = state) do
def handle_info({:process, %Payload{} = payload, metadata}, {%Args{} = args, process_refs}) do
case WorkerPool.start_child() do
{:ok, worker} ->
Task.Supervisor.start_child(Ocrlot.Converter.TaskSupervisor, fn ->
{:ok, content} = Worker.process(worker, payload)

result = %Result{
filepath: payload.filepath,
content: content
}
{:ok, process_id} = Worker.process(worker, payload, self())

message = state.mapper.({result, metadata})
process_refs = Map.put(process_refs, process_id, metadata)

PubSub.broadcast(Ocrlot.PubSub, state.out, message)
end)
{:noreply, {args, process_refs}}

{:error, :max_children} ->
requeue(payload)
requeue(payload, metadata)

{:noreply, {args, process_refs}}
end
end

{:noreply, state}
@impl true
def handle_info(
{:extractor_worker_complete, process_id, %Result{} = result},
{%Args{} = args, process_refs}
) do
{metadata, process_refs} = Map.pop(process_refs, process_id)

message = args.mapper.({result, metadata})

PubSub.broadcast(Ocrlot.PubSub, args.out, message)

{:noreply, {args, process_refs}}
end

@impl true
def handle_info({:requeue, %Payload{} = payload}, state) do
PubSub.local_broadcast(Ocrlot.PubSub, state.in, {:process, payload})
{:noreply, state}
def handle_info({:requeue, %Payload{} = payload, metadata}, {%Args{} = args, process_refs}) do
PubSub.local_broadcast(Ocrlot.PubSub, args.in, {:process, payload, metadata})
{:noreply, {args, process_refs}}
end

@impl true
@@ -72,7 +81,7 @@ defmodule Ocrlot.Extractor do
{:noreply, state}
end

defp requeue(%Payload{} = payload) do
Process.send_after(self(), {:requeue, payload}, 3_000)
defp requeue(%Payload{} = payload, metadata) do
Process.send_after(self(), {:requeue, payload, metadata}, 3_000)
end
end
57 changes: 35 additions & 22 deletions lib/ocrlot/extractor/worker.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
defmodule Ocrlot.Extractor.Worker do
alias Ocrlot.Extractor.Result
alias Ocrlot.Extractor.Payload
alias Ocrlot.Extractor

@@ -8,19 +9,24 @@ defmodule Ocrlot.Extractor.Worker do
# Client APIs
def start_link(args), do: GenServer.start_link(__MODULE__, args)

def try_lock(pid), do: GenServer.call(pid, :try_lock, 10_000)
# def try_lock(pid), do: GenServer.call(pid, :try_lock, 10_000)

def process(pid, %Payload{} = params), do: GenServer.call(pid, {:process, params}, 10_000)
def process(pid, %Payload{} = params, receiver) do
process_id = System.unique_integer()

GenServer.cast(pid, {:process, process_id, params, receiver})

{:ok, process_id}
end

# Callbacks
@impl true
def init(state), do: {:ok, {state, nil}}
def init(_), do: {:ok, {}}

@impl true
def handle_call(
{:process, %Payload{filepath: filepath, languages: langs}},
_,
{:lock, idle_ref}
def handle_cast(
{:process, process_id, %Payload{filepath: filepath, languages: langs}, receiver},
idle_ref
) do
if is_reference(idle_ref) do
Process.cancel_timer(idle_ref)
@@ -36,28 +42,35 @@ defmodule Ocrlot.Extractor.Worker do
"6"
]

{result, 0} = System.cmd("tesseract", opts)
{ocr_output, 0} = System.cmd("tesseract", opts)

result = %Result{
content: ocr_output
}

send(receiver, {:extractor_worker_complete, process_id, result})

idle_ref = Process.send_after(self(), :terminate, terminate_worker_after())
{:reply, {:ok, result}, {:waiting, idle_ref}}

{:noreply, idle_ref}
end

@impl true
def handle_call({:process, _}, _, {:waiting, idle_ref}),
do: {:reply, {:error, :not_locked}, {:waiting, idle_ref}}
# @impl true
# def handle_call({:process, _}, _, {:waiting, idle_ref}),
# do: {:reply, {:error, :not_locked}, {:waiting, idle_ref}}

@impl true
def handle_call(:try_lock, _, {:waiting, idle_ref}) do
if is_reference(idle_ref) do
Process.cancel_timer(idle_ref)
end
# @impl true
# def handle_call(:try_lock, _, {:waiting, idle_ref}) do
# if is_reference(idle_ref) do
# Process.cancel_timer(idle_ref)
# end

{:reply, :ok, {:lock, idle_ref}}
end
# {:reply, :ok, {:lock, idle_ref}}
# end

@impl true
def handle_call(:try_lock, _, {:lock, idle_ref}),
do: {:reply, :error, {:lock, idle_ref}}
# @impl true
# def handle_call(:try_lock, _, {:lock, idle_ref}),
# do: {:reply, :error, {:lock, idle_ref}}

@impl true
def handle_info(:terminate, state) do
11 changes: 4 additions & 7 deletions lib/ocrlot/extractor/worker_pool.ex
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ defmodule Ocrlot.Extractor.WorkerPool do
do: DynamicSupervisor.init(strategy: :one_for_one, max_children: max_children())

def start_child do
case DynamicSupervisor.start_child(__MODULE__, {Worker, :lock}) do
case DynamicSupervisor.start_child(__MODULE__, Worker) do
{:ok, pid} -> {:ok, pid}
{:error, :max_children} -> round_robin(max_children())
error -> error
@@ -36,11 +36,8 @@ defmodule Ocrlot.Extractor.WorkerPool do
end)

case Enum.at(workers, rem(counter, length(workers))) do
{:undefined, pid, _, _} ->
case Worker.try_lock(pid) do
:ok -> {:ok, pid}
_ -> round_robin(retries - 1)
end
{:undefined, worker_pid, _, _} ->
{:ok, worker_pid}

_ ->
round_robin(retries - 1)
@@ -50,5 +47,5 @@ defmodule Ocrlot.Extractor.WorkerPool do

defp round_robin(0), do: {:error, :no_workers_available}

defp max_children(), do: Application.fetch_env!(:ocrlot, :extractor_max_workers) |> dbg()
defp max_children(), do: Application.fetch_env!(:ocrlot, :extractor_max_workers)
end

0 comments on commit eb4aaff

Please sign in to comment.