Skip to content

Commit

Permalink
Fix for #41
Browse files Browse the repository at this point in the history
  • Loading branch information
silviucpp committed Oct 10, 2022
1 parent 0c03fce commit f8c47c7
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 24 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#### v2.1.0

- Fix for kafka tombstones messages (https://github.com/silviucpp/erlkaf/issues/51)

- Add support for message timestamp (https://github.com/silviucpp/erlkaf/issues/41)

#### v2.0.9

Expand Down
23 changes: 21 additions & 2 deletions c_src/erlkaf_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace {

ERL_NIF_TERM get_headers(ErlNifEnv* env, const rd_kafka_message_t* msg)
inline ERL_NIF_TERM get_headers(ErlNifEnv* env, const rd_kafka_message_t* msg)
{
rd_kafka_headers_t* headers = NULL;

Expand All @@ -30,15 +30,34 @@ ERL_NIF_TERM get_headers(ErlNifEnv* env, const rd_kafka_message_t* msg)
return ATOMS.atomUndefined;
}

inline ERL_NIF_TERM rd_kafka_timestamp_type_to_nif(rd_kafka_timestamp_type_t t)
{
switch (t)
{
case RD_KAFKA_TIMESTAMP_NOT_AVAILABLE:
return ATOMS.atomNotAvailable;

case RD_KAFKA_TIMESTAMP_CREATE_TIME:
return ATOMS.atomCreateTime;

case RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME:
return ATOMS.atomLogAppendTime;
}
}

}

ERL_NIF_TERM make_message_nif(ErlNifEnv* env, ERL_NIF_TERM topic, ERL_NIF_TERM partition, const rd_kafka_message_t* msg)
{
rd_kafka_timestamp_type_t ts_type;
int64_t ts = rd_kafka_message_timestamp(msg, &ts_type);

ERL_NIF_TERM key = msg->key == NULL ? ATOMS.atomUndefined : make_binary(env, reinterpret_cast<const char*>(msg->key), msg->key_len);
ERL_NIF_TERM offset = enif_make_int64(env, msg->offset);
ERL_NIF_TERM value = msg->payload == NULL ? ATOMS.atomUndefined : make_binary(env, reinterpret_cast<const char*>(msg->payload), msg->len);
ERL_NIF_TERM headers = get_headers(env, msg);
return enif_make_tuple7(env, ATOMS.atomMessage, topic, partition, offset, key, value, headers);
ERL_NIF_TERM ts_tuple = enif_make_tuple2(env, enif_make_int64(env, ts), rd_kafka_timestamp_type_to_nif(ts_type));
return enif_make_tuple8(env, ATOMS.atomMessage, topic, partition, offset, key, value, headers, ts_tuple);
}

ERL_NIF_TERM make_message_nif(ErlNifEnv* env, const rd_kafka_message_t* msg)
Expand Down
8 changes: 7 additions & 1 deletion c_src/erlkaf_nif.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const char kAtomAssignPartition[] = "assign_partitions";
const char kAtomRevokePartition[] = "revoke_partitions";
const char kAtomStats[] = "stats";
const char kAtomClientStopped[] = "client_stopped";
const char kAtomNotAvailable[] = "not_available";
const char kAtomCreateTime[] = "create_time";
const char kAtomLogAppendTime[] = "log_append_time";

atoms ATOMS;

Expand Down Expand Up @@ -69,6 +72,9 @@ int on_nif_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOMS.atomRevokePartition = make_atom(env, kAtomRevokePartition);
ATOMS.atomStats = make_atom(env, kAtomStats);
ATOMS.atomClientStopped = make_atom(env, kAtomClientStopped);
ATOMS.atomNotAvailable = make_atom(env, kAtomNotAvailable);
ATOMS.atomCreateTime = make_atom(env, kAtomCreateTime);
ATOMS.atomLogAppendTime = make_atom(env, kAtomLogAppendTime);

erlkaf_data* data = static_cast<erlkaf_data*>(enif_alloc(sizeof(erlkaf_data)));
open_resources(env, data);
Expand Down Expand Up @@ -114,7 +120,7 @@ static ErlNifFunc nif_funcs[] =
{"producer_set_owner", 2, enif_producer_set_owner},
{"producer_topic_new", 3, enif_producer_topic_new},
{"producer_cleanup", 1, enif_producer_cleanup},
{"produce", 6, enif_produce},
{"produce", 7, enif_produce},
{"get_metadata", 1, enif_get_metadata, ERL_NIF_DIRTY_JOB_IO_BOUND},

{"consumer_new", 4, enif_consumer_new},
Expand Down
3 changes: 3 additions & 0 deletions c_src/erlkaf_nif.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ struct atoms
ERL_NIF_TERM atomRevokePartition;
ERL_NIF_TERM atomStats;
ERL_NIF_TERM atomClientStopped;
ERL_NIF_TERM atomNotAvailable;
ERL_NIF_TERM atomCreateTime;
ERL_NIF_TERM atomLogAppendTime;
};

struct erlkaf_data
Expand Down
7 changes: 6 additions & 1 deletion c_src/erlkaf_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ ERL_NIF_TERM enif_produce(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
int32_t partition;
ErlNifBinary key;
ErlNifBinary value;
long timestamp;

scoped_ptr(headers, rd_kafka_headers_t, NULL, rd_kafka_headers_destroy);

Expand Down Expand Up @@ -383,7 +384,10 @@ ERL_NIF_TERM enif_produce(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
}
}

if(!headers.get())
if(!enif_get_int64(env, argv[6], &timestamp))
return make_badarg(env);

if(!headers.get() && timestamp == 0)
{
rd_kafka_topic_t* topic = producer->topics->GetOrCreateTopic(topic_name);

Expand All @@ -402,6 +406,7 @@ ERL_NIF_TERM enif_produce(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
RD_KAFKA_V_VALUE(value.data, value.size),
RD_KAFKA_V_KEY(key.data, key.size),
RD_KAFKA_V_HEADERS(headers.get()),
RD_KAFKA_V_TIMESTAMP(timestamp),
RD_KAFKA_V_END);

if(result != RD_KAFKA_RESP_ERR_NO_ERROR)
Expand Down
4 changes: 3 additions & 1 deletion include/erlkaf.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-type callback_dispatch_mode() :: {dispatch_mode, dispatch_mode()}.
-type callback_options() :: callback_module() | callback_args() | callback_dispatch_mode().
-type topic() :: {binary(), [callback_options()]}.
-type timestamp_type() :: not_available | create_time | log_append_time.

-type topic_option() ::
{request_required_acks, integer()} |
Expand Down Expand Up @@ -151,6 +152,7 @@
offset ::integer(),
key :: key(),
value ::binary(),
headers :: headers()
headers :: headers(),
timestamp :: {integer(), timestamp_type()}
}).

13 changes: 10 additions & 3 deletions src/erlkaf.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
produce/4,
produce/5,
produce/6,
produce/7,

get_metadata/1,
get_readable_error/1
Expand Down Expand Up @@ -124,22 +125,28 @@ get_metadata(ClientId) ->
ok | {error, reason()}.

produce(ClientId, TopicName, Key, Value) ->
produce(ClientId, TopicName, ?DEFAULT_PARTITIONER, Key, Value, undefined).
produce(ClientId, TopicName, ?DEFAULT_PARTITIONER, Key, Value, undefined, 0).

-spec produce(client_id(), binary(), key(), value(), headers()) ->
ok | {error, reason()}.

produce(ClientId, TopicName, Key, Value, Headers) ->
produce(ClientId, TopicName, ?DEFAULT_PARTITIONER, Key, Value, Headers).
produce(ClientId, TopicName, ?DEFAULT_PARTITIONER, Key, Value, Headers, 0).

-spec produce(client_id(), binary(), partition(), key(), value(), headers()) ->
ok | {error, reason()}.

produce(ClientId, TopicName, Partition, Key, Value, Headers0) ->
produce(ClientId, TopicName, Partition, Key, Value, Headers0, 0).

-spec produce(client_id(), binary(), partition(), key(), value(), headers(), non_neg_integer()) ->
ok | {error, reason()}.

produce(ClientId, TopicName, Partition, Key, Value, Headers0, Timestamp) ->
case erlkaf_cache_client:get(ClientId) of
{ok, ClientRef, ClientPid} ->
Headers = to_headers(Headers0),
case erlkaf_nif:produce(ClientRef, TopicName, Partition, Key, Value, Headers) of
case erlkaf_nif:produce(ClientRef, TopicName, Partition, Key, Value, Headers, Timestamp) of
ok ->
ok;
{error, ?RD_KAFKA_RESP_ERR_QUEUE_FULL} ->
Expand Down
8 changes: 4 additions & 4 deletions src/erlkaf_local_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-export([
new/1,
free/1,
enq/6,
enq/7,
deq/1,
head/1
]).
Expand All @@ -20,11 +20,11 @@ free(undefined) ->
free(Queue) ->
esq:free(Queue).

enq(Queue, TopicName, Partition, Key, Value, Headers) ->
esq:enq({TopicName, Partition, Key, Value, Headers}, Queue).
enq(Queue, TopicName, Partition, Key, Value, Headers, Timestamp) ->
esq:enq({TopicName, Partition, Key, Value, Headers, Timestamp}, Queue).

deq(Queue) ->
esq:deq(Queue).

head(Queue) ->
esq:head(Queue).
esq:head(Queue).
4 changes: 2 additions & 2 deletions src/erlkaf_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
producer_cleanup/1,
producer_set_owner/2,
producer_topic_new/3,
produce/6,
produce/7,
get_metadata/1,

consumer_new/4,
Expand Down Expand Up @@ -48,7 +48,7 @@ producer_set_owner(_ClientRef, _Pid) ->
producer_topic_new(_ClientRef, _TopicName, _TopicConfig) ->
?NOT_LOADED.

produce(_ClientRef, _TopicRef, _Partition, _Key, _Value, _Headers) ->
produce(_ClientRef, _TopicRef, _Partition, _Key, _Value, _Headers, _Timestamp) ->
?NOT_LOADED.

get_metadata(_ClientRef) ->
Expand Down
18 changes: 9 additions & 9 deletions src/erlkaf_producer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
% api

start_link/4,
queue_event/6,
queue_event/7,

% gen_server

Expand All @@ -37,8 +37,8 @@
start_link(ClientId, DrCallback, ErlkafConfig, ProducerRef) ->
gen_server:start_link(?MODULE, [ClientId, DrCallback, ErlkafConfig, ProducerRef], []).

queue_event(Pid, TopicName, Partition, Key, Value, Headers) ->
erlkaf_utils:safe_call(Pid, {queue_event, TopicName, Partition, Key, Value, Headers}).
queue_event(Pid, TopicName, Partition, Key, Value, Headers, Timestamp) ->
erlkaf_utils:safe_call(Pid, {queue_event, TopicName, Partition, Key, Value, Headers, Timestamp}).

init([ClientId, DrCallback, ErlkafConfig, ProducerRef]) ->
Pid = self(),
Expand All @@ -64,15 +64,15 @@ init([ClientId, DrCallback, ErlkafConfig, ProducerRef]) ->
overflow_method = OverflowStrategy,
pqueue = Queue}}.

handle_call({queue_event, TopicName, Partition, Key, Value, Headers}, _From, #state{
handle_call({queue_event, TopicName, Partition, Key, Value, Headers, Timestamp}, _From, #state{
pqueue = Queue,
pqueue_sch = QueueScheduled,
overflow_method = OverflowMethod} = State) ->

case OverflowMethod of
local_disk_queue ->
schedule_consume_queue(QueueScheduled, 1000),
ok = erlkaf_local_queue:enq(Queue, TopicName, Partition, Key, Value, Headers),
ok = erlkaf_local_queue:enq(Queue, TopicName, Partition, Key, Value, Headers, Timestamp),
{reply, ok, State#state{pqueue_sch = true}};
_ ->
{reply, OverflowMethod, State}
Expand Down Expand Up @@ -169,8 +169,8 @@ consume_queue(ClientRef, Q, N) ->
log_completed(N),
completed;
#{payload := Msg} ->
{TopicName, Partition, Key, Value, Headers} = decode_queued_message(Msg),
case erlkaf_nif:produce(ClientRef, TopicName, Partition, Key, Value, Headers) of
{TopicName, Partition, Key, Value, Headers, Timestamp} = decode_queued_message(Msg),
case erlkaf_nif:produce(ClientRef, TopicName, Partition, Key, Value, Headers, Timestamp) of
ok ->
[#{payload := Msg}] = erlkaf_local_queue:deq(Q),
consume_queue(ClientRef, Q, N-1);
Expand All @@ -195,7 +195,7 @@ log_completed(N) ->
% todo: remove this in the future. it's here just to make sure nobody will update from a
% older version having messages in the local queue. Without this hack old messages won't be decoded

decode_queued_message({TopicName, Partition, Key, Value}) ->
{TopicName, Partition, Key, Value, undefined};
decode_queued_message({TopicName, Partition, Key, Value, Headers}) ->
{TopicName, Partition, Key, Value, Headers, 0};
decode_queued_message(R) ->
R.
4 changes: 4 additions & 0 deletions test/test_consume_produce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
create/1,
produce/2,
produce/3,
produce/4,
produce_multiple/4,

% consumer callbacks
Expand Down Expand Up @@ -66,6 +67,9 @@ produce(Key, Value) ->
produce(Key, Value, Headers) ->
ok = erlkaf:produce(?PRODUCER_CLIENT, ?TOPIC, Key, Value, Headers).

produce(Key, Value, Headers, Ts) ->
ok = erlkaf:produce(?PRODUCER_CLIENT, ?TOPIC, ?DEFAULT_PARTITIONER, Key, Value, Headers, Ts).

produce_multiple(0, _Key, _Value, _Headers) ->
ok;
produce_multiple(Count, Key, Value, Headers) ->
Expand Down

0 comments on commit f8c47c7

Please sign in to comment.