Skip to content

Commit 16e5cb4

Browse files
authored
Merge pull request #476 from zendesk/dasch/refactor-fdadfsafd
Refactor Broker and Connection
2 parents 8b68a48 + 04955b3 commit 16e5cb4

File tree

9 files changed

+90
-124
lines changed

9 files changed

+90
-124
lines changed

lib/kafka.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class NoPartitionsToFetchFrom < Error
2525
class MessageTooLargeToRead < Error
2626
end
2727

28+
# A connection has been unused for too long, we assume the server has killed it.
29+
class IdleConnection < Error
30+
end
31+
2832
# Subclasses of this exception class map to an error code described in the
2933
# Kafka protocol specification.
3034
#

lib/kafka/broker.rb

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,27 @@
44

55
module Kafka
66
class Broker
7-
def initialize(connection:, node_id: nil, logger:)
8-
@connection = connection
7+
def initialize(connection_builder:, host:, port:, node_id: nil, logger:)
8+
@connection_builder = connection_builder
9+
@connection = nil
10+
@host = host
11+
@port = port
912
@node_id = node_id
1013
@logger = logger
1114
end
1215

1316
def address_match?(host, port)
14-
@connection.address_match?(host, port)
17+
host == @host && port == @port
1518
end
1619

1720
# @return [String]
1821
def to_s
19-
"#{@connection} (node_id=#{@node_id.inspect})"
22+
"#{connection} (node_id=#{@node_id.inspect})"
2023
end
2124

2225
# @return [nil]
2326
def disconnect
24-
@connection.close
27+
connection.close
2528
end
2629

2730
# Fetches cluster metadata from the broker.
@@ -31,7 +34,7 @@ def disconnect
3134
def fetch_metadata(**options)
3235
request = Protocol::TopicMetadataRequest.new(**options)
3336

34-
@connection.send_request(request)
37+
send_request(request)
3538
end
3639

3740
# Fetches messages from a specified topic and partition.
@@ -41,7 +44,7 @@ def fetch_metadata(**options)
4144
def fetch_messages(**options)
4245
request = Protocol::FetchRequest.new(**options)
4346

44-
@connection.send_request(request)
47+
send_request(request)
4548
end
4649

4750
# Lists the offset of the specified topics and partitions.
@@ -51,7 +54,7 @@ def fetch_messages(**options)
5154
def list_offsets(**options)
5255
request = Protocol::ListOffsetRequest.new(**options)
5356

54-
@connection.send_request(request)
57+
send_request(request)
5558
end
5659

5760
# Produces a set of messages to the broker.
@@ -61,61 +64,81 @@ def list_offsets(**options)
6164
def produce(**options)
6265
request = Protocol::ProduceRequest.new(**options)
6366

64-
@connection.send_request(request)
67+
send_request(request)
6568
end
6669

6770
def fetch_offsets(**options)
6871
request = Protocol::OffsetFetchRequest.new(**options)
6972

70-
@connection.send_request(request)
73+
send_request(request)
7174
end
7275

7376
def commit_offsets(**options)
7477
request = Protocol::OffsetCommitRequest.new(**options)
7578

76-
@connection.send_request(request)
79+
send_request(request)
7780
end
7881

7982
def join_group(**options)
8083
request = Protocol::JoinGroupRequest.new(**options)
8184

82-
@connection.send_request(request)
85+
send_request(request)
8386
end
8487

8588
def sync_group(**options)
8689
request = Protocol::SyncGroupRequest.new(**options)
8790

88-
@connection.send_request(request)
91+
send_request(request)
8992
end
9093

9194
def leave_group(**options)
9295
request = Protocol::LeaveGroupRequest.new(**options)
9396

94-
@connection.send_request(request)
97+
send_request(request)
9598
end
9699

97100
def find_group_coordinator(**options)
98101
request = Protocol::GroupCoordinatorRequest.new(**options)
99102

100-
@connection.send_request(request)
103+
send_request(request)
101104
end
102105

103106
def heartbeat(**options)
104107
request = Protocol::HeartbeatRequest.new(**options)
105108

106-
@connection.send_request(request)
109+
send_request(request)
107110
end
108111

109112
def create_topics(**options)
110113
request = Protocol::CreateTopicsRequest.new(**options)
111114

112-
@connection.send_request(request)
115+
send_request(request)
113116
end
114117

115118
def api_versions
116119
request = Protocol::ApiVersionsRequest.new
117120

118-
@connection.send_request(request)
121+
send_request(request)
122+
end
123+
124+
private
125+
126+
def send_request(request)
127+
connection.send_request(request)
128+
rescue IdleConnection
129+
@logger.warn "Connection has been unused for too long, re-connecting..."
130+
@connection.close rescue nil
131+
@connection = nil
132+
retry
133+
rescue ConnectionError
134+
@connection.close rescue nil
135+
@connection = nil
136+
137+
raise
138+
end
139+
140+
def connection
141+
@connection ||= @connection_builder.build_connection(@host, @port)
119142
end
120143
end
121144
end

lib/kafka/broker_pool.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ def connect(host, port, node_id: nil)
1717
end
1818

1919
broker = Broker.new(
20-
connection: @connection_builder.build_connection(host, port),
20+
connection_builder: @connection_builder,
21+
host: host,
22+
port: port,
2123
node_id: node_id,
2224
logger: @logger,
2325
)

lib/kafka/connection.rb

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,14 @@ class Connection
4848
# broker. Default is 10 seconds.
4949
#
5050
# @return [Connection] a new connection.
51-
def initialize(host:, port:, client_id:, logger:, instrumenter:, sasl_authenticator:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil)
51+
def initialize(host:, port:, client_id:, logger:, instrumenter:, connect_timeout: nil, socket_timeout: nil, ssl_context: nil)
5252
@host, @port, @client_id = host, port, client_id
5353
@logger = logger
5454
@instrumenter = instrumenter
5555

5656
@connect_timeout = connect_timeout || CONNECT_TIMEOUT
5757
@socket_timeout = socket_timeout || SOCKET_TIMEOUT
5858
@ssl_context = ssl_context
59-
@sasl_authenticator = sasl_authenticator
60-
end
61-
62-
def address_match?(host, port)
63-
@host == host && @port == port
6459
end
6560

6661
def to_s
@@ -75,8 +70,6 @@ def close
7570
@logger.debug "Closing socket to #{to_s}"
7671

7772
@socket.close if @socket
78-
79-
@socket = nil
8073
end
8174

8275
# Sends a request over the connection.
@@ -98,7 +91,8 @@ def send_request(request)
9891

9992
@instrumenter.instrument("request.connection", notification) do
10093
open unless open?
101-
reopen if idle?
94+
95+
raise IdleConnection if idle?
10296

10397
@correlation_id += 1
10498

@@ -137,7 +131,6 @@ def open
137131
@correlation_id = 0
138132

139133
@last_request = nil
140-
@sasl_authenticator.authenticate!(self)
141134
rescue Errno::ETIMEDOUT => e
142135
@logger.error "Timed out while trying to connect to #{self}: #{e}"
143136
raise ConnectionError, e
@@ -146,11 +139,6 @@ def open
146139
raise ConnectionError, e
147140
end
148141

149-
def reopen
150-
close
151-
open
152-
end
153-
154142
def idle?
155143
@last_request && @last_request < Time.now - IDLE_TIMEOUT
156144
end

lib/kafka/connection_builder.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ def build_connection(host, port)
2020
logger: @logger,
2121
instrumenter: @instrumenter,
2222
ssl_context: @ssl_context,
23-
sasl_authenticator: @sasl_authenticator
2423
)
2524

25+
@sasl_authenticator.authenticate!(connection)
26+
2627
connection
2728
end
2829

spec/broker_pool_spec.rb

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
let(:connection_builder) { double('connection_builder') }
33
let(:connection) { double('connection') }
44
let(:logger) { LOGGER }
5+
56
let(:broker_pool) do
67
described_class.new(
78
connection_builder: connection_builder,
89
logger: logger,
910
)
1011
end
12+
1113
let(:broker) { double('broker') }
1214
let(:host) { "localhost" }
1315
let(:port) { 9092 }
@@ -21,40 +23,24 @@
2123
describe "#connect" do
2224
it "creates a new broker everytime it is called with node_id nil" do
2325
# Call without node_id the first time.
24-
allow(Kafka::Broker).to receive(:new).once.with({
25-
connection: connection,
26-
node_id: nil,
27-
logger: logger
28-
}) { broker }
26+
allow(Kafka::Broker).to receive(:new).once { broker }
2927
expect(broker_pool.connect(host, port)).to eq(broker)
3028

3129
# Call without node_id the second time returns new broker.
3230
second_broker = double()
33-
allow(Kafka::Broker).to receive(:new).once.with({
34-
connection: connection,
35-
node_id: nil,
36-
logger: logger
37-
}) { second_broker }
31+
allow(Kafka::Broker).to receive(:new).once { second_broker }
3832
expect(broker_pool.connect(host, port)).to eq(second_broker)
3933
end
4034

4135
it "creates a new broker the first time it is called with a particular node_id" do
42-
allow(Kafka::Broker).to receive(:new).once.with({
43-
connection: connection,
44-
node_id: node_id,
45-
logger: logger
46-
}) { broker }
36+
allow(Kafka::Broker).to receive(:new).once { broker }
4737

4838
expect(broker_pool.connect(host, port, node_id: node_id)).to eq(broker)
4939
end
5040

5141
it "does not create a new broker if address & node_id match" do
5242
allow(broker).to receive(:address_match?).with(host, port) { true }
53-
allow(Kafka::Broker).to receive(:new).once.with({
54-
connection: connection,
55-
node_id: node_id,
56-
logger: logger
57-
}) { broker }
43+
allow(Kafka::Broker).to receive(:new).once { broker }
5844

5945
expect(broker_pool.connect(host, port, node_id: node_id)).to eq(broker)
6046
expect(broker_pool.connect(host, port, node_id: node_id)).to eq(broker)
@@ -63,21 +49,13 @@
6349
it "disconnects existing broker if new broker address does not match pre-existing broker address for a given node_id" do
6450
allow(broker).to receive(:address_match?).with("#{host}1", "#{port}1") { false }
6551
allow(broker).to receive(:disconnect).once
66-
allow(Kafka::Broker).to receive(:new).once.with({
67-
connection: connection,
68-
node_id: node_id,
69-
logger: logger
70-
}) { broker }
52+
allow(Kafka::Broker).to receive(:new).once { broker }
7153
expect(broker_pool.connect(host, port, node_id: node_id)).to eq(broker)
7254

7355
new_host = "#{host}1"
7456
new_port = "#{port}1"
7557
allow(connection_builder).to receive(:build_connection).with(new_host, new_port) { connection }
76-
allow(Kafka::Broker).to receive(:new).once.with({
77-
connection: connection,
78-
node_id: node_id,
79-
logger: logger
80-
})
58+
allow(Kafka::Broker).to receive(:new).once
8159

8260
broker_pool.connect(new_host, new_port, node_id: node_id)
8361

@@ -87,21 +65,13 @@
8765
it "creates a new broker if address does not match pre-existing broker address for a given node_id" do
8866
allow(broker).to receive(:address_match?).with("#{host}1", "#{port}1") { false }
8967
allow(broker).to receive(:disconnect).once
90-
allow(Kafka::Broker).to receive(:new).once.with({
91-
connection: connection,
92-
node_id: node_id,
93-
logger: logger
94-
}) { broker }
68+
allow(Kafka::Broker).to receive(:new).once { broker }
9569

9670
new_host = "#{host}1"
9771
new_port = "#{port}1"
9872
allow(connection_builder).to receive(:build_connection).with(new_host, new_port) { connection }
9973
second_broker = double()
100-
allow(Kafka::Broker).to receive(:new).once.with({
101-
connection: connection,
102-
node_id: node_id,
103-
logger: logger
104-
}) { second_broker }
74+
allow(Kafka::Broker).to receive(:new).once { second_broker }
10575

10676
actual_broker = broker_pool.connect(new_host, new_port, node_id: node_id)
10777

spec/broker_spec.rb

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,21 @@
33
describe Kafka::Broker do
44
let(:logger) { LOGGER }
55
let(:connection) { FakeConnection.new }
6-
let(:broker) { Kafka::Broker.new(connection: connection, logger: logger) }
6+
let(:connection_builder) { double(:connection_builder) }
7+
8+
before do
9+
allow(connection_builder).to receive(:build_connection) { connection }
10+
end
11+
12+
let(:broker) {
13+
Kafka::Broker.new(
14+
connection_builder: connection_builder,
15+
host: "x.com",
16+
port: 9092,
17+
node_id: 1,
18+
logger: logger,
19+
)
20+
}
721

822
class FakeConnection
923
def initialize
@@ -23,10 +37,14 @@ def send_request(request)
2337
it "delegates to @connection" do
2438
host = "test_host"
2539
port = 333
26-
connection = instance_double(Kafka::Connection)
27-
allow(connection).to receive(:address_match?).with(host, port) { true }
2840

29-
broker = Kafka::Broker.new(connection: connection, logger: logger)
41+
broker = Kafka::Broker.new(
42+
connection_builder: connection_builder,
43+
host: host,
44+
port: port,
45+
node_id: 1,
46+
logger: logger,
47+
)
3048

3149
expect(broker.address_match?(host, port)).to be_truthy
3250
end

0 commit comments

Comments
 (0)