Skip to content

Commit 547acfe

Browse files
committed
Refactor: reduce locking while appending events
1 parent bdae84e commit 547acfe

File tree

1 file changed

+9
-12
lines changed

1 file changed

+9
-12
lines changed

lib/logstash/outputs/kafka.rb

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'logstash/namespace'
22
require 'logstash/outputs/base'
33
require 'java'
4+
require 'concurrent/map'
45
require 'logstash-integration-kafka_jars.rb'
56
require 'logstash/plugin_mixins/kafka_support'
67

@@ -190,7 +191,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base
190191

191192
public
192193
def register
193-
@thread_batch_map = Concurrent::Hash.new
194+
@thread_batch_map = Concurrent::Map.new
194195

195196
if !@retries.nil?
196197
if @retries < 0
@@ -204,33 +205,29 @@ def register
204205
@producer = create_producer
205206
if value_serializer == 'org.apache.kafka.common.serialization.StringSerializer'
206207
@codec.on_event do |event, data|
207-
write_to_kafka(event, data)
208+
push_event_data(event, data)
208209
end
209210
elsif value_serializer == 'org.apache.kafka.common.serialization.ByteArraySerializer'
210211
@codec.on_event do |event, data|
211-
write_to_kafka(event, data.to_java_bytes)
212+
push_event_data(event, data.to_java_bytes)
212213
end
213214
else
214215
raise ConfigurationError, "'value_serializer' only supports org.apache.kafka.common.serialization.ByteArraySerializer and org.apache.kafka.common.serialization.StringSerializer"
215216
end
216217
end
217218

218-
def prepare(record)
219+
def append_record(record)
219220
# This output is threadsafe, so we need to keep a batch per thread.
220-
@thread_batch_map[Thread.current].add(record)
221+
@thread_batch_map.get(Thread.current) << record
221222
end
222223

223224
def multi_receive(events)
224-
t = Thread.current
225-
if !@thread_batch_map.include?(t)
226-
@thread_batch_map[t] = java.util.ArrayList.new(events.size)
227-
end
225+
batch = @thread_batch_map.fetch_or_store(Thread.current) { Array.new(events.size).clear }
228226

229227
events.each do |event|
230228
@codec.encode(event)
231229
end
232230

233-
batch = @thread_batch_map[t]
234231
if batch.any?
235232
retrying_send(batch)
236233
batch.clear
@@ -314,13 +311,13 @@ def handle_kafka_error(e, record)
314311
end
315312
end
316313

317-
def write_to_kafka(event, serialized_data)
314+
def push_event_data(event, serialized_data)
318315
if @message_key.nil?
319316
record = ProducerRecord.new(event.sprintf(@topic_id), serialized_data)
320317
else
321318
record = ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(@message_key), serialized_data)
322319
end
323-
prepare(record)
320+
append_record(record)
324321
rescue LogStash::ShutdownSignal
325322
logger.debug('producer received shutdown signal')
326323
rescue => e

0 commit comments

Comments
 (0)