Skip to content

Add an optional response callback to Producer #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/poseidon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class ProducerShutdownError < StandardError; end
require "poseidon/producer"
require "poseidon/fetched_message"
require "poseidon/partition_consumer"
require "poseidon/produce_result"

# Poseidon!
require "poseidon/message"
Expand Down
6 changes: 5 additions & 1 deletion lib/poseidon/messages_for_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ def build_protocol_objects(compression_config)
# We can always retry these errors because they mean none of the kafka brokers persisted the message
ALWAYS_RETRYABLE = [Poseidon::Errors::LeaderNotAvailable, Poseidon::Errors::NotLeaderForPartition]

def successfully_sent(producer_response)
def successfully_sent(producer_response, callback)
failed = []
producer_response.topic_response.each do |topic_response|
topic_response.partitions.each do |partition|
if ALWAYS_RETRYABLE.include?(partition.error_class)
failed.push(*@topics[topic_response.topic][partition.partition])
else
if callback
callback.call(ProduceResult.new(topic_response, partition, @topics[topic_response.topic][partition.partition]))
end
end
end
end
Expand Down
71 changes: 71 additions & 0 deletions lib/poseidon/produce_result.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
module Poseidon
# Represents the result of a produce attempt against a kafka topic.
class ProduceResult
def initialize(produce_topic_response, produce_partition_response, messages)
@produce_topic_response = produce_topic_response
@produce_partition_response = produce_partition_response
@messages = messages
end

# Was the produce request successful?
#
# NOTE: This will return false if required_acks is > 1 and the leader
# timedout waiting for a response from the replicas. In this case
# trying to resend the messages will likely lead to duplicate messages
# because the leader will have succesfully persisted the message.
#
# You can use the `timeout?` method to differentiate between this case
# and other failures.
#
# @return [Boolean]
def success?
@produce_partition_response.error == Poseidon::Errors::NO_ERROR_CODE
end

# Did we fail to receive the required number of acks?
#
# @return [Boolean]
def timeout?
@produce_partition_response.error_class == Poseidon::Errors::RequestTimedOut
end

# Return an error if we recieved one.
#
# @return [Poseidon::Errors::ProtocolError,Nil]
def error
if !success?
@produce_partition_response.error_class
else
nil
end
end

# The messages we the produce request sent.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in this sentence.

#
# @return [Array<MessageToSend>]
def messages
@messages
end

# The topic we sent the messages to.
#
# @return [String]
def topic
@produce_topic_response.topic
end

# The partition we sent the message to.
#
# @return [Fixnum]
def partition
@produce_partition_response.partition
end

# The offset of the first message the broker persisted from this batch.
#
# @return [Fixnum]
def offset
@produce_partition_response.offset
end
end
end
10 changes: 8 additions & 2 deletions lib/poseidon/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,22 @@ def initialize(brokers, client_id, options = {})
# @param [Enumerable<MessageToSend>] messages
# Messages must have a +topic+ set and may have a +key+ set.
#
# @yieldparam [ProduceResult] callback
# Optional callback which will be triggered _at least once_ for each
# (topic,partition) pair we attempt to send messages to.
#
# Will never be called if required_acks is 0.
#
# @return [Boolean]
#
# @api public
def send_messages(messages)
def send_messages(messages, &callback)
raise Errors::ProducerShutdownError if @shutdown
if !messages.respond_to?(:each)
raise ArgumentError, "messages must respond to #each"
end

@producer.send_messages(convert_to_messages_objects(messages))
@producer.send_messages(convert_to_messages_objects(messages), &callback)
end

# Closes all open connections to brokers
Expand Down
13 changes: 9 additions & 4 deletions lib/poseidon/sync_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def initialize(client_id, seed_brokers, options = {})
@broker_pool = BrokerPool.new(client_id, seed_brokers)
end

def send_messages(messages)
def send_messages(messages, &callback)
return if messages.empty?

messages_to_send = MessagesToSend.new(messages, @cluster_metadata)
Expand All @@ -50,7 +50,7 @@ def send_messages(messages)
end

messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker|
if sent = send_to_broker(messages_for_broker)
if sent = send_to_broker(messages_for_broker, callback)
messages_to_send.successfully_sent(sent)
end
end
Expand Down Expand Up @@ -106,13 +106,18 @@ def refresh_metadata(topics)
false
end

def send_to_broker(messages_for_broker)
def send_to_broker(messages_for_broker, callback)
return false if messages_for_broker.broker_id == -1
to_send = messages_for_broker.build_protocol_objects(@compression_config)
response = @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce,
required_acks, ack_timeout_ms,
to_send)
return messages_for_broker.successfully_sent(response)
if required_acks != 0
messages_for_broker.successfully_sent(response, callback)
else
# Client requested 0 acks, assume all were successful
messages_for_broker.messages
end
rescue Connection::ConnectionFailedError
false
end
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/multiple_brokers/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ def start_first_broker
end

config.after(:suite) do
$tc.stop
$tc.stop if $tc
end
end
2 changes: 1 addition & 1 deletion spec/integration/simple/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
end

config.after(:suite) do
$tc.stop
$tc.stop if $tc
end
end
95 changes: 95 additions & 0 deletions spec/unit/produce_result_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
require 'spec_helper'
include Poseidon::Protocol

describe ProduceResult do
context "successful result" do
before(:each) do
@messages = [mock('message')]
partition_response = ProducePartitionResponse.new(1, 0, 10)
topic_response = ProduceTopicResponse.new("topic", [partition_response])
@pr = ProduceResult.new(
topic_response,
partition_response,
@messages
)
end

it "is success?" do
expect(@pr.success?).to eq(true)
end

it "has no error" do
expect(@pr.error).to eq(nil)
end

it "did not timeout" do
expect(@pr.timeout?).to eq(false)
end

it "provides topic" do
expect(@pr.topic).to eq("topic")
end

it "provides partition" do
expect(@pr.partition).to eq(1)
end

it "provides offset" do
expect(@pr.offset).to eq(10)
end

it "provides messages" do
expect(@pr.messages).to eq(@messages)
end
end

context "failed result" do
before(:each) do
@messages = [mock('message')]
partition_response = ProducePartitionResponse.new(1, 2, -1)
topic_response = ProduceTopicResponse.new("topic", [partition_response])
@pr = ProduceResult.new(
topic_response,
partition_response,
@messages
)
end

it "is success?" do
expect(@pr.success?).to eq(false)
end

it "has error" do
expect(@pr.error).to eq(Poseidon::Errors::InvalidMessage)
end

it "did not timeout" do
expect(@pr.timeout?).to eq(false)
end
end

context "timedout" do
before(:each) do
@messages = [mock('message')]
partition_response = ProducePartitionResponse.new(1, 7, -1)
topic_response = ProduceTopicResponse.new("topic", [partition_response])
@pr = ProduceResult.new(
topic_response,
partition_response,
@messages
)
end

it "is success?" do
expect(@pr.success?).to eq(false)
end

it "has error" do
expect(@pr.error).to eq(Poseidon::Errors::RequestTimedOut)
end

it "did timeout" do
expect(@pr.timeout?).to eq(true)
end
end
end
2 changes: 1 addition & 1 deletion spec/unit/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
end

it "turns MessagesToSend into Message objects" do
@sync_producer.should_receive(:send_messages).with([an_instance_of(Message)])
@sync_producer.should_receive(:send_messages).with([an_instance_of(Message)], anything)

m = MessageToSend.new("topic", "value")
@producer.send_messages([m])
Expand Down