Skip to content

Commit 2b0dcd4

Browse files
committed
fix: consumer thread lifetime.
1 parent e01a32d commit 2b0dcd4

File tree

3 files changed

+64
-32
lines changed

3 files changed

+64
-32
lines changed

include/kafkalib/kafka.hpp

+11-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ namespace Kafka
2121
{
2222
class Packet {
2323
public:
24-
Packet(int8_t *data, uint32_t size) :
24+
Packet(int8_t *data, size_t size) :
2525
data(data), size(size) {}
2626

2727
~Packet();
@@ -32,7 +32,7 @@ class Packet {
3232
uint32_t get_size() const;
3333
private:
3434
int8_t *data;
35-
uint32_t size;
35+
size_t size;
3636
};
3737

3838
class PacketNode {
@@ -105,7 +105,7 @@ class KafkaController {
105105
/// <param name="topic"></param>
106106
/// <param name="group_id"></param>
107107
/// <param name="channel"></param>
108-
void add_consumer(const std::string &brokers, const std::string &topic, const std::string &group_id, const uint32_t channel);
108+
void add_consumer(const std::string &brokers, const std::vector<std::string> &topics, const std::string &group_id, const uint32_t channel);
109109

110110
/// <summary>
111111
/// Clears all producers of a channel.
@@ -210,11 +210,14 @@ class KafkaController {
210210
{
211211
public:
212212
ConsumerContext(
213-
std::shared_ptr<std::thread> thread,
213+
const uint32_t channel,
214214
std::vector<std::shared_ptr<RdKafka::KafkaConsumer>> consumers,
215215
std::shared_ptr<InternalLogger> logger,
216216
std::shared_ptr<InternalRebalanceCb> rebalance_cb
217-
) : thread(thread), consumers(consumers), logger(logger), rebalance_cb(rebalance_cb) {}
217+
) : consumers(consumers), logger(logger), rebalance_cb(rebalance_cb), running(true)
218+
{
219+
thread = std::make_shared<std::thread>(&ConsumerContext::_consumer_thread, this, channel);
220+
}
218221

219222
~ConsumerContext();
220223

@@ -235,7 +238,10 @@ class KafkaController {
235238
std::shared_ptr<InternalRebalanceCb> get_rebalance_cb() const;
236239

237240
std::vector<std::shared_ptr<RdKafka::KafkaConsumer>> get_consumers() const;
241+
void add_consumer(std::shared_ptr<RdKafka::KafkaConsumer> consumer);
238242
private:
243+
void _consumer_thread(const uint32_t channel);
244+
239245
std::atomic<PacketNode *> next_packet_head;
240246
std::atomic<PacketNode *> next_packet_tail;
241247
std::atomic<uint32_t> packet_count;
@@ -257,7 +263,5 @@ class KafkaController {
257263

258264
std::map<uint32_t, std::vector<std::shared_ptr<ProducerContext>>> m_producers;
259265
std::map<uint32_t, std::shared_ptr<ConsumerContext>> m_consumers;
260-
261-
void _consumer_thread(std::shared_ptr<ConsumerContext> &consumer, const uint32_t channel);
262266
};
263267
}

src/kafkalib/kafka.cpp

+49-21
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class Kafka::KafkaController::InternalDeliveryReportCb : public RdKafka::Deliver
2121
public:
2222
~InternalDeliveryReportCb()
2323
{
24-
printf("DeliveryReportCb destroyed\n");
24+
//printf("DeliveryReportCb destroyed\n");
2525
}
2626

2727
void set_callback(std::function<void(RdKafka::Message &)> cb) {
@@ -49,7 +49,7 @@ class Kafka::KafkaController::InternalRebalanceCb : public RdKafka::RebalanceCb
4949

5050
~InternalRebalanceCb()
5151
{
52-
printf("RebalanceCb destroyed\n");
52+
//printf("RebalanceCb destroyed\n");
5353
}
5454

5555
void set_callback(std::function<void(RdKafka::KafkaConsumer *, RdKafka::ErrorCode, std::vector<RdKafka::TopicPartition *> &)> cb) {
@@ -68,7 +68,7 @@ class Kafka::KafkaController::InternalLogger : public RdKafka::EventCb {
6868

6969
~InternalLogger()
7070
{
71-
printf("Logger destroyed\n");
71+
//printf("Logger destroyed\n");
7272
}
7373

7474
void set_callback(std::function<void(const LogLevel logLevel, const std::string &message)> cb) {
@@ -134,6 +134,11 @@ static RdKafka::KafkaConsumer *create_consumer(const std::string &brokers, const
134134
return nullptr;
135135
}
136136

137+
// Set the auto offset reset to earliest.
138+
if (conf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {
139+
return nullptr;
140+
}
141+
137142
// Create a Rdkafka Consumer
138143
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
139144
if (!consumer) {
@@ -204,12 +209,15 @@ void Kafka::KafkaController::add_producer(const std::string &brokers, const std:
204209

205210
// Set the delivery report callback
206211
deliveryReportCb->set_callback([this](RdKafka::Message &message) {
207-
printf("Message delivered to partition %d at offset %" PRId64 "\n", message.partition(), message.offset());
212+
//printf("Message delivered to partition %d at offset %" PRId64 "\n", message.partition(), message.offset());
208213

209214
if (message.err())
210215
{
211216
if (m_error_callback)
212217
m_error_callback("Message delivery failed: " + RdKafka::err2str(message.err()));
218+
} else {
219+
if (m_log_callback && m_log_level >= LogLevel::DEBUG)
220+
m_log_callback(LogLevel::DEBUG, "Message delivered to partition " + std::to_string(message.partition()) + " at offset " + std::to_string(message.offset()));
213221
}
214222
});
215223
loggerCb->set_callback([this](const LogLevel logLevel, const std::string &message) {
@@ -241,8 +249,7 @@ void Kafka::KafkaController::add_producer(const std::string &brokers, const std:
241249
m_producers[channel].push_back(producerContext);
242250
}
243251

244-
void Kafka::KafkaController::add_consumer(const std::string &brokers, const std::string &topic, const std::string &group_id, const uint32_t channel)
245-
{
252+
void Kafka::KafkaController::add_consumer(const std::string &brokers, const std::vector<std::string> &topics, const std::string &group_id, const uint32_t channel) {
246253
std::string errstr;
247254
std::shared_ptr<InternalRebalanceCb> rebalance_cb;
248255
std::shared_ptr<InternalLogger> loggerCb;
@@ -286,24 +293,19 @@ void Kafka::KafkaController::add_consumer(const std::string &brokers, const std:
286293
m_log_callback(logLevel, message);
287294
});
288295

289-
// Create topic handle
290-
RdKafka::Topic *rd_topic = RdKafka::Topic::create(consumer, topic, nullptr, errstr);
296+
// Subscribe.
297+
consumer->subscribe(topics);
291298

292299
// Create shared objects.
293300
std::shared_ptr<RdKafka::KafkaConsumer> consumer_shared(consumer);
294-
std::shared_ptr<RdKafka::Topic> topic_shared(rd_topic);
295301
std::shared_ptr<InternalLogger> loggerCb_shared(loggerCb);
296302
std::shared_ptr<InternalRebalanceCb> rebalance_cb_shared(rebalance_cb);
297303

298304
// Create Context, if it doesn't exist.
299305
if (m_consumers.find(channel) == m_consumers.end())
300306
{
301-
// Create the thread.
302-
std::thread *consumer_thread = new std::thread(&KafkaController::_consumer_thread, this, m_consumers[channel], channel);
303-
std::shared_ptr<std::thread> thread_shared(consumer_thread);
304-
305307
std::shared_ptr<ConsumerContext> consumerContext = std::make_shared<ConsumerContext>(
306-
thread_shared,
308+
channel,
307309
std::vector<std::shared_ptr<RdKafka::KafkaConsumer>>(),
308310
loggerCb_shared,
309311
rebalance_cb_shared
@@ -313,7 +315,7 @@ void Kafka::KafkaController::add_consumer(const std::string &brokers, const std:
313315
}
314316

315317
// Add the consumer to the vector.
316-
m_consumers[channel]->get_consumers().push_back(consumer_shared);
318+
m_consumers[channel]->add_consumer(consumer_shared);
317319

318320
// Wait until the consumer is connected.
319321
// Timeout after the MAX_TIMEOUT_MS.
@@ -375,6 +377,8 @@ bool Kafka::KafkaController::send(const uint32_t channel, const void *data, cons
375377
nullptr);
376378
}
377379

380+
producer->get_producer()->flush(MAX_TIMEOUT_MS);
381+
378382
if (err) {
379383
if (m_error_callback)
380384
m_error_callback("Failed to produce message: " + RdKafka::err2str(err));
@@ -451,11 +455,6 @@ void Kafka::KafkaController::set_error_callback(const std::function<void(const s
451455
m_error_callback = callback;
452456
}
453457

454-
void Kafka::KafkaController::_consumer_thread(std::shared_ptr<ConsumerContext> &consumer, const uint32_t channel)
455-
{
456-
std::printf("Consumer thread started : %s\n", std::to_string(channel).c_str());
457-
}
458-
459458
/// ------------------------- Packet ------------------------- ///
460459

461460
Kafka::Packet::~Packet() {
@@ -604,5 +603,34 @@ std::shared_ptr<Kafka::KafkaController::InternalRebalanceCb> Kafka::KafkaControl
604603
}
605604

606605
std::vector<std::shared_ptr<RdKafka::KafkaConsumer>> Kafka::KafkaController::ConsumerContext::get_consumers() const {
607-
return std::vector<std::shared_ptr<RdKafka::KafkaConsumer>>();
606+
return consumers;
607+
}
608+
609+
void Kafka::KafkaController::ConsumerContext::add_consumer(std::shared_ptr<RdKafka::KafkaConsumer> consumer) {
610+
consumers.emplace_back(std::move(consumer));
611+
}
612+
613+
void Kafka::KafkaController::ConsumerContext::_consumer_thread(const uint32_t channel)
614+
{
615+
616+
while (is_running())
617+
{
618+
std::vector<std::shared_ptr<Packet>> packets;
619+
for (auto &consumer : consumers)
620+
{
621+
RdKafka::Message *message = consumer->consume(1000);
622+
if (message->err() == RdKafka::ERR_NO_ERROR)
623+
{
624+
std::shared_ptr<Packet> packet = std::make_shared<Packet>(new int8_t[message->len()], message->len());
625+
memcpy(packet->get_data(), message->payload(), packet->get_size());
626+
packets.push_back(packet);
627+
}
628+
delete message;
629+
}
630+
631+
for (auto &packet : packets) {
632+
PacketNode *node = new PacketNode(packet);
633+
push_next_packet(node);
634+
}
635+
}
608636
}

src/test/main.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ int main()
4141
std::string brokers = "localhost:19092";
4242

4343
// Settings.
44-
controller.set_log_level(Kafka::LogLevel::DEBUG);
44+
controller.set_log_level(Kafka::LogLevel::INFO);
4545
printf("Log level set.\n");
4646

4747
controller.set_log_callback(log_callback);
@@ -56,7 +56,7 @@ int main()
5656
controller.add_producer(brokers, "test", 1);
5757
printf("Kafka Producer added.\n");
5858

59-
controller.add_consumer(brokers, "test", "test_group", 1);
59+
controller.add_consumer(brokers, { "test" }, "test_group", 1);
6060
printf("Kafka Consumer added.\n");
6161

6262
// Logic.
@@ -77,8 +77,8 @@ int main()
7777

7878
//while (true)
7979
{
80-
// Sleep for 5 seconds.
81-
std::this_thread::sleep_for(std::chrono::seconds(5));
80+
// Sleep.
81+
std::this_thread::sleep_for(std::chrono::seconds(1));
8282

8383
// Receive a message from the Kafka Consumer.
8484
std::shared_ptr<Kafka::Packet> message = controller.receive();

0 commit comments

Comments
 (0)