Skip to content

Commit e3c8add

Browse files
authored
Merge pull request #451 from zendesk/dasch/fix-nil-timestamp
Don't assume there's a timestamp on messages
2 parents 07614b0 + ba8fa96 commit e3c8add

File tree

3 files changed

+33
-1
lines changed

3 files changed

+33
-1
lines changed

lib/kafka/protocol/message.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ def self.decode(decoder)
8888
# attributes.
8989
codec_id = attributes & 0b111
9090

91-
new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: Time.at(timestamp / 1000.0))
91+
# The timestamp will be nil if the message was written in the Kafka 0.9 log format.
92+
create_time = timestamp && Time.at(timestamp / 1000.0)
93+
94+
new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time)
9295
end
9396

9497
private

spec/fixtures/message-0.9-format

32 Bytes
Binary file not shown.

spec/protocol/message_spec.rb

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
describe Kafka::Protocol::Message do
2+
it "encodes and decodes messages" do
3+
message = Kafka::Protocol::Message.new(
4+
value: "yolo",
5+
key: "xx",
6+
)
7+
8+
io = StringIO.new
9+
encoder = Kafka::Protocol::Encoder.new(io)
10+
message.encode(encoder)
11+
data = StringIO.new(io.string)
12+
decoder = Kafka::Protocol::Decoder.new(data)
13+
14+
expect(Kafka::Protocol::Message.decode(decoder)).to eq message
15+
end
16+
17+
it "decodes messages written in the 0.9 format" do
18+
data = File.open("spec/fixtures/message-0.9-format")
19+
20+
decoder = Kafka::Protocol::Decoder.new(data)
21+
message = Kafka::Protocol::Message.decode(decoder)
22+
23+
expect(message.key).to eq "xx"
24+
expect(message.value).to eq "yolo"
25+
26+
# Messages didn't have timestamps back in the 0.9 days.
27+
expect(message.create_time).to eq nil
28+
end
29+
end

0 commit comments

Comments
 (0)