Skip to content

Commit

Permalink
local_queue_path config parameter to defined disk message queue path (#…
Browse files Browse the repository at this point in the history
…77)

* New optional parameter local_queue_path in global_client_options section

* fix config processing

* Update CONFIGURATION.md
  • Loading branch information
mijkenator authored Jan 29, 2025
1 parent 8ad5bfc commit cd9805a
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 1 deletion.
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ batch_size | P | 1 .. 2147483647 | 1000000
delivery_report_only_error | P | true, false | false | Only provide delivery reports for failed messages
delivery_report_callback | P | module or fun/2 | undefined| A callback where delivery reports are sent (`erlkaf_producer_callbacks` behaviour)
sticky_partitioning_linger_ms | P | 0 .. 900000 | 10 | Delay in milliseconds to wait to assign new sticky partitions for each topic. By default, set to double the time of linger.ms. To disable sticky behavior, set to 0. This behavior affects messages with the key NULL in all cases, and messages with key lengths of zero when the consistent_random partitioner is in use. These messages would otherwise be assigned randomly. A higher value allows for more effective batching of these messages.
local_queue_path | P | | | Path to directory where local disk queue files will be paced if queue_buffering_overflow_strategy for producer set to local_disk_queue. Default value is priv directory of erlkaf application.

## Topic configuration properties

Expand Down
6 changes: 6 additions & 0 deletions src/erlkaf_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ is_erlkaf_config(stats_callback = K, V) ->
check_callback(K, V, 2);
is_erlkaf_config(oauthbearer_token_refresh_callback = K, V) ->
check_callback(K, V, 1);
is_erlkaf_config(local_queue_path = K, []) -> throw({error, {options, {K, []}}});
is_erlkaf_config(local_queue_path = K, V) ->
case io_lib:latin1_char_list(V) of
true -> true;
_ -> throw({error, {options, {K, V}}})
end;
is_erlkaf_config(queue_buffering_overflow_strategy = K, V) ->
case V of
local_disk_queue ->
Expand Down
2 changes: 1 addition & 1 deletion src/erlkaf_local_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
]).

new(ClientId) ->
Path = erlkaf_utils:get_priv_path(ClientId),
Path = erlkaf_utils:get_local_queue_path(ClientId),
?LOG_INFO("persistent queue path: ~p", [Path]),
esq:new(Path).

Expand Down
15 changes: 15 additions & 0 deletions src/erlkaf_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

-export([
get_priv_path/1,
get_local_queue_path/1,
get_env/1,
get_env/2,
lookup/2,
Expand All @@ -15,6 +16,20 @@
parralel_exec/2
]).

-spec get_local_queue_path(string() | atom()) -> string() | undefined.
get_local_queue_path(File) ->
case application:get_env(erlkaf, global_client_options) of
undefined ->
get_priv_path(File);
{ok, Val} ->
case lookup(local_queue_path, Val) of
undefined ->
get_priv_path(File);
Path ->
filename:join(Path, File)
end
end.

get_priv_path(File) ->
case code:priv_dir(erlkaf) of
{error, bad_name} ->
Expand Down

0 comments on commit cd9805a

Please sign in to comment.