Skip to content

Commit ba8fa96

Browse files
committed
Don't assume there's a timestamp on messages
Old messages may not have one.
1 parent e37817f commit ba8fa96

File tree

3 files changed

+17
-1
lines changed

3 files changed

+17
-1
lines changed

lib/kafka/protocol/message.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ def self.decode(decoder)
8888
# attributes.
8989
codec_id = attributes & 0b111
9090

91-
new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: Time.at(timestamp / 1000.0))
91+
# The timestamp will be nil if the message was written in the Kafka 0.9 log format.
92+
create_time = timestamp && Time.at(timestamp / 1000.0)
93+
94+
new(key: key, value: value, codec_id: codec_id, offset: offset, create_time: create_time)
9295
end
9396

9497
private

spec/fixtures/message-0.9-format

32 Bytes
Binary file not shown.

spec/protocol/message_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,17 @@
1313

1414
expect(Kafka::Protocol::Message.decode(decoder)).to eq message
1515
end
16+
17+
it "decodes messages written in the 0.9 format" do
18+
data = File.open("spec/fixtures/message-0.9-format")
19+
20+
decoder = Kafka::Protocol::Decoder.new(data)
21+
message = Kafka::Protocol::Message.decode(decoder)
22+
23+
expect(message.key).to eq "xx"
24+
expect(message.value).to eq "yolo"
25+
26+
# Messages didn't have timestamps back in the 0.9 days.
27+
expect(message.create_time).to eq nil
28+
end
1629
end

0 commit comments

Comments
 (0)