diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 98521510..c9e7ad83 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -5,6 +5,8 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base config_name 'kafka' milestone 1 + attr_accessor :exception_repeats + default :codec, 'json' config :zk_connect, :validate => :string, :default => 'localhost:2181' @@ -21,6 +23,7 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base config :decorate_events, :validate => :boolean, :default => true config :consumer_id, :validate => :string, :default => nil config :fetch_message_max_bytes, :validate => :number, :default => 1048576 + config :logstash_stop_on_exception_repeat, :validate => :number, :default=>0 # stop logstash after an exception repeats for certain times, 0 for never stop public def register @@ -53,6 +56,9 @@ def register def run(logstash_queue) java_import 'kafka.common.ConsumerRebalanceFailedException' @logger.info('Running kafka', :group_id => @group_id, :topic_id => @topic_id, :zk_connect => @zk_connect) + + @exception_repeats = {} + begin @consumer_group.run(@consumer_threads,@kafka_client_queue) begin @@ -74,6 +80,23 @@ def run(logstash_queue) if @consumer_group.running? @consumer_group.shutdown() end + + if @logstash_stop_on_exception_repeat > 0 + case @exception_repeats[e.to_s] + when nil + @exception_repeats[e.to_s] = @logstash_stop_on_exception_repeat + when 1 + @logger.error("Exception repeated over #{@logstash_stop_on_exception_repeat} times: ", + :exception => e) + finished + raise LogStash::ShutdownSignal + when 2..@logstash_stop_on_exception_repeat + @exception_repeats[e.to_s] -= 1 + else + @logger.warn('Exception repeater error') + end + end + sleep(Float(@consumer_restart_sleep_ms) * 1 / 1000) retry end diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index f780f984..7082b86d 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -4,7 +4,9 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config_name 'kafka' milestone 1 - + + attr_accessor :exception_repeats + default :codec, 'json' config :broker_list, :validate => :string, :default => 'localhost:9092' @@ -27,6 +29,9 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :send_buffer_bytes, :validate => :number, :default => 100 * 1024 config :client_id, :validate => :string, :default => "" + config :logstash_stop_on_exception_repeat, :validate => :number, :default=>0 # stop logstash after an exception repeats for certain times, 0 for never stop + + public def register jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/kafka*/libs/*.jar") @@ -55,11 +60,16 @@ def register :send_buffer_bytes => @send_buffer_bytes, :client_id => @client_id } + + + @producer = Kafka::Producer.new(options) @producer.connect() @logger.info('Registering kafka producer', :topic_id => @topic_id, :broker_list => @broker_list) + @exception_repeats = {} + @codec.on_event do |event| begin @producer.sendMsg(@topic_id,nil,event) @@ -68,8 +78,25 @@ def register rescue => e @logger.warn('kafka producer threw exception, restarting', :exception => e) + + if @logstash_stop_on_exception_repeat > 0 + case @exception_repeats[e.to_s] + when nil + @exception_repeats[e.to_s] = @logstash_stop_on_exception_repeat + when 1 + @logger.error("Exception repeated over #{@logstash_stop_on_exception_repeat} times: ", + :exception => e) + finished + raise LogStash::ShutdownSignal + when 2..@logstash_stop_on_exception_repeat + @exception_repeats[e.to_s] -= 1 + else + @logger.warn('Exception repeater error') + end + end end end + end # def register def receive(event) @@ -82,3 +109,4 @@ def receive(event) end end #class LogStash::Outputs::Kafka +