Skip to content

Commit b6cda35

Browse files
committed
Do nonblocking socket read before select for packet receive
1 parent 3895873 commit b6cda35

File tree

4 files changed

+51
-7
lines changed

4 files changed

+51
-7
lines changed

lib/mqtt/client.rb

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -441,11 +441,24 @@ def unsubscribe(*topics)
441441
# Try to read a packet from the server
442442
# Also sends keep-alive ping packets.
443443
def receive_packet
444-
# Poll socket - is there data waiting?
445-
result = IO.select([@socket], [], [], SELECT_TIMEOUT)
446-
unless result.nil?
444+
first_byte_in_packet = nil
445+
data_available_to_read = false
446+
begin
447+
# Poll socket - is there data waiting?
448+
result = @socket.read_nonblock(1)
449+
if result && result.length == 1
450+
first_byte_in_packet = result.unpack('C').first
451+
data_available_to_read = true
452+
end
453+
rescue IO::WaitReadable
454+
# Wait for data to be available
455+
data_available_to_read = !IO.select(
456+
[@socket], [], [], SELECT_TIMEOUT
457+
).nil?
458+
end
459+
if data_available_to_read
447460
# Yes - read in the packet
448-
packet = MQTT::Packet.read(@socket)
461+
packet = MQTT::Packet.read(@socket, first_byte_in_packet)
449462
handle_packet packet
450463
end
451464
keep_alive!

lib/mqtt/packet.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ class Packet
2424
}
2525

2626
# Read in a packet from a socket
27-
def self.read(socket)
27+
def self.read(socket, first_byte_in_packet = nil)
2828
# Read in the packet header and create a new packet object
2929
packet = create_from_header(
30-
read_byte(socket)
30+
first_byte_in_packet || read_byte(socket)
3131
)
3232
packet.validate_flags
3333

spec/mqtt_client_spec.rb

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -838,7 +838,18 @@
838838
allow(@parent_thread).to receive(:raise)
839839
end
840840

841-
it "should put PUBLISH messages on to the read queue" do
841+
it "should put PUBLISH messages on to the read queue when data can be immediately read" do
842+
socket.write("\x30\x0e\x00\x05topicpayload")
843+
socket.rewind
844+
client.send(:receive_packet)
845+
expect(@read_queue.size).to eq(1)
846+
end
847+
848+
it "should put PUBLISH messages on to the read queue following a wait readable exception" do
849+
wait_readable_exception = Class.new(StandardError) do
850+
include IO::WaitReadable
851+
end
852+
allow(socket).to receive(:read_nonblock).and_raise(wait_readable_exception)
842853
socket.write("\x30\x0e\x00\x05topicpayload")
843854
socket.rewind
844855
client.send(:receive_packet)
@@ -853,18 +864,24 @@
853864
end
854865

855866
it "should close the socket if there is an exception" do
867+
socket.write("\x20")
868+
socket.rewind
856869
expect(socket).to receive(:close).once
857870
allow(MQTT::Packet).to receive(:read).and_raise(MQTT::Exception)
858871
client.send(:receive_packet)
859872
end
860873

861874
it "should pass exceptions up to parent thread" do
875+
socket.write("\x20")
876+
socket.rewind
862877
expect(@parent_thread).to receive(:raise).once
863878
allow(MQTT::Packet).to receive(:read).and_raise(MQTT::Exception)
864879
client.send(:receive_packet)
865880
end
866881

867882
it "should update last_ping_response when receiving a Pingresp" do
883+
socket.write("\x20")
884+
socket.rewind
868885
allow(MQTT::Packet).to receive(:read).and_return MQTT::Packet::Pingresp.new
869886
client.instance_variable_set '@last_ping_response', Time.at(0)
870887
client.send :receive_packet

spec/mqtt_packet_spec.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,20 @@
434434
end
435435
end
436436

437+
describe "reading a packet from a socket with a separate first byte parameter" do
438+
let(:socket) { StringIO.new("\x11\x00\x04testhello world") }
439+
let(:packet) { MQTT::Packet.read(socket, 48) }
440+
441+
it "should correctly create the right type of packet object" do
442+
expect(packet.class).to eq(MQTT::Packet::Publish)
443+
end
444+
445+
it "should set the payload correctly" do
446+
expect(packet.payload).to eq('hello world')
447+
expect(packet.payload.encoding.to_s).to eq('ASCII-8BIT')
448+
end
449+
end
450+
437451
describe "when calling the inspect method" do
438452
it "should output the payload, if it is less than 16 bytes" do
439453
packet = MQTT::Packet::Publish.new( :topic => "topic", :payload => "payload" )

0 commit comments

Comments
 (0)