diff --git a/lib/poseidon.rb b/lib/poseidon.rb index 276209c..2ddd3b7 100644 --- a/lib/poseidon.rb +++ b/lib/poseidon.rb @@ -82,6 +82,7 @@ class ProducerShutdownError < StandardError; end require "poseidon/producer" require "poseidon/fetched_message" require "poseidon/partition_consumer" +require "poseidon/produce_result" # Poseidon! require "poseidon/message" diff --git a/lib/poseidon/messages_for_broker.rb b/lib/poseidon/messages_for_broker.rb index e2ba079..a3c8435 100644 --- a/lib/poseidon/messages_for_broker.rb +++ b/lib/poseidon/messages_for_broker.rb @@ -39,12 +39,16 @@ def build_protocol_objects(compression_config) # We can always retry these errors because they mean none of the kafka brokers persisted the message ALWAYS_RETRYABLE = [Poseidon::Errors::LeaderNotAvailable, Poseidon::Errors::NotLeaderForPartition] - def successfully_sent(producer_response) + def successfully_sent(producer_response, callback) failed = [] producer_response.topic_response.each do |topic_response| topic_response.partitions.each do |partition| if ALWAYS_RETRYABLE.include?(partition.error_class) failed.push(*@topics[topic_response.topic][partition.partition]) + else + if callback + callback.call(ProduceResult.new(topic_response, partition, @topics[topic_response.topic][partition.partition])) + end end end end diff --git a/lib/poseidon/produce_result.rb b/lib/poseidon/produce_result.rb new file mode 100644 index 0000000..557cf60 --- /dev/null +++ b/lib/poseidon/produce_result.rb @@ -0,0 +1,71 @@ +module Poseidon + # Represents the result of a produce attempt against a kafka topic. + class ProduceResult + def initialize(produce_topic_response, produce_partition_response, messages) + @produce_topic_response = produce_topic_response + @produce_partition_response = produce_partition_response + @messages = messages + end + + # Was the produce request successful? + # + # NOTE: This will return false if required_acks is > 1 and the leader + # timedout waiting for a response from the replicas. In this case + # trying to resend the messages will likely lead to duplicate messages + # because the leader will have succesfully persisted the message. + # + # You can use the `timeout?` method to differentiate between this case + # and other failures. + # + # @return [Boolean] + def success? + @produce_partition_response.error == Poseidon::Errors::NO_ERROR_CODE + end + + # Did we fail to receive the required number of acks? + # + # @return [Boolean] + def timeout? + @produce_partition_response.error_class == Poseidon::Errors::RequestTimedOut + end + + # Return an error if we recieved one. + # + # @return [Poseidon::Errors::ProtocolError,Nil] + def error + if !success? + @produce_partition_response.error_class + else + nil + end + end + + # The messages we the produce request sent. + # + # @return [Array] + def messages + @messages + end + + # The topic we sent the messages to. + # + # @return [String] + def topic + @produce_topic_response.topic + end + + # The partition we sent the message to. + # + # @return [Fixnum] + def partition + @produce_partition_response.partition + end + + # The offset of the first message the broker persisted from this batch. + # + # @return [Fixnum] + def offset + @produce_partition_response.offset + end + end +end diff --git a/lib/poseidon/producer.rb b/lib/poseidon/producer.rb index 8241c02..3bfd89c 100644 --- a/lib/poseidon/producer.rb +++ b/lib/poseidon/producer.rb @@ -147,16 +147,22 @@ def initialize(brokers, client_id, options = {}) # @param [Enumerable] messages # Messages must have a +topic+ set and may have a +key+ set. # + # @yieldparam [ProduceResult] callback + # Optional callback which will be triggered _at least once_ for each + # (topic,partition) pair we attempt to send messages to. + # + # Will never be called if required_acks is 0. + # # @return [Boolean] # # @api public - def send_messages(messages) + def send_messages(messages, &callback) raise Errors::ProducerShutdownError if @shutdown if !messages.respond_to?(:each) raise ArgumentError, "messages must respond to #each" end - @producer.send_messages(convert_to_messages_objects(messages)) + @producer.send_messages(convert_to_messages_objects(messages), &callback) end # Closes all open connections to brokers diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index e6ac006..c80bfb3 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -35,7 +35,7 @@ def initialize(client_id, seed_brokers, options = {}) @broker_pool = BrokerPool.new(client_id, seed_brokers) end - def send_messages(messages) + def send_messages(messages, &callback) return if messages.empty? messages_to_send = MessagesToSend.new(messages, @cluster_metadata) @@ -50,7 +50,7 @@ def send_messages(messages) end messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker| - if sent = send_to_broker(messages_for_broker) + if sent = send_to_broker(messages_for_broker, callback) messages_to_send.successfully_sent(sent) end end @@ -106,13 +106,18 @@ def refresh_metadata(topics) false end - def send_to_broker(messages_for_broker) + def send_to_broker(messages_for_broker, callback) return false if messages_for_broker.broker_id == -1 to_send = messages_for_broker.build_protocol_objects(@compression_config) response = @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, required_acks, ack_timeout_ms, to_send) - return messages_for_broker.successfully_sent(response) + if required_acks != 0 + messages_for_broker.successfully_sent(response, callback) + else + # Client requested 0 acks, assume all were successful + messages_for_broker.messages + end rescue Connection::ConnectionFailedError false end diff --git a/spec/integration/multiple_brokers/spec_helper.rb b/spec/integration/multiple_brokers/spec_helper.rb index b4c1116..1431a7f 100644 --- a/spec/integration/multiple_brokers/spec_helper.rb +++ b/spec/integration/multiple_brokers/spec_helper.rb @@ -38,6 +38,6 @@ def start_first_broker end config.after(:suite) do - $tc.stop + $tc.stop if $tc end end diff --git a/spec/integration/simple/spec_helper.rb b/spec/integration/simple/spec_helper.rb index bda8ccf..b9dc997 100644 --- a/spec/integration/simple/spec_helper.rb +++ b/spec/integration/simple/spec_helper.rb @@ -12,6 +12,6 @@ end config.after(:suite) do - $tc.stop + $tc.stop if $tc end end diff --git a/spec/unit/produce_result_spec.rb b/spec/unit/produce_result_spec.rb new file mode 100644 index 0000000..e613afa --- /dev/null +++ b/spec/unit/produce_result_spec.rb @@ -0,0 +1,95 @@ +require 'spec_helper' +include Poseidon::Protocol + +describe ProduceResult do + context "successful result" do + before(:each) do + @messages = [mock('message')] + partition_response = ProducePartitionResponse.new(1, 0, 10) + topic_response = ProduceTopicResponse.new("topic", [partition_response]) + @pr = ProduceResult.new( + topic_response, + partition_response, + @messages + ) + end + + it "is success?" do + expect(@pr.success?).to eq(true) + end + + it "has no error" do + expect(@pr.error).to eq(nil) + end + + it "did not timeout" do + expect(@pr.timeout?).to eq(false) + end + + it "provides topic" do + expect(@pr.topic).to eq("topic") + end + + it "provides partition" do + expect(@pr.partition).to eq(1) + end + + it "provides offset" do + expect(@pr.offset).to eq(10) + end + + it "provides messages" do + expect(@pr.messages).to eq(@messages) + end + end + + context "failed result" do + before(:each) do + @messages = [mock('message')] + partition_response = ProducePartitionResponse.new(1, 2, -1) + topic_response = ProduceTopicResponse.new("topic", [partition_response]) + @pr = ProduceResult.new( + topic_response, + partition_response, + @messages + ) + end + + it "is success?" do + expect(@pr.success?).to eq(false) + end + + it "has error" do + expect(@pr.error).to eq(Poseidon::Errors::InvalidMessage) + end + + it "did not timeout" do + expect(@pr.timeout?).to eq(false) + end + end + + context "timedout" do + before(:each) do + @messages = [mock('message')] + partition_response = ProducePartitionResponse.new(1, 7, -1) + topic_response = ProduceTopicResponse.new("topic", [partition_response]) + @pr = ProduceResult.new( + topic_response, + partition_response, + @messages + ) + end + + it "is success?" do + expect(@pr.success?).to eq(false) + end + + it "has error" do + expect(@pr.error).to eq(Poseidon::Errors::RequestTimedOut) + end + + it "did timeout" do + expect(@pr.timeout?).to eq(true) + end + end +end diff --git a/spec/unit/producer_spec.rb b/spec/unit/producer_spec.rb index c5680f0..a166f17 100644 --- a/spec/unit/producer_spec.rb +++ b/spec/unit/producer_spec.rb @@ -32,7 +32,7 @@ end it "turns MessagesToSend into Message objects" do - @sync_producer.should_receive(:send_messages).with([an_instance_of(Message)]) + @sync_producer.should_receive(:send_messages).with([an_instance_of(Message)], anything) m = MessageToSend.new("topic", "value") @producer.send_messages([m])