Skip to content

Commit 59edb0f

Browse files
committed
Support nonblocking socket read before select for MRI 1.9.3+ only
1 parent b6cda35 commit 59edb0f

File tree

2 files changed

+22
-8
lines changed

2 files changed

+22
-8
lines changed

lib/mqtt/client.rb

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,12 @@ def initialize(*args)
167167
@read_thread = nil
168168
@write_semaphore = Mutex.new
169169
@pubacks_semaphore = Mutex.new
170+
171+
if OpenSSL::SSL::SSLSocket.method_defined?(:read_nonblock)
172+
@wait_for_read_func = method(:nonblocking_read_before_select)
173+
else
174+
@wait_for_read_func = method(:select_without_nonblocking_read)
175+
end
170176
end
171177

172178
# Get the OpenSSL context, that is used if SSL/TLS is enabled
@@ -438,9 +444,12 @@ def unsubscribe(*topics)
438444

439445
private
440446

441-
# Try to read a packet from the server
442-
# Also sends keep-alive ping packets.
443-
def receive_packet
447+
def select_without_nonblocking_read
448+
# Poll socket - is there data waiting?
449+
[nil, !IO.select([@socket], [], [], SELECT_TIMEOUT).nil?]
450+
end
451+
452+
def nonblocking_read_before_select
444453
first_byte_in_packet = nil
445454
data_available_to_read = false
446455
begin
@@ -456,6 +465,13 @@ def receive_packet
456465
[@socket], [], [], SELECT_TIMEOUT
457466
).nil?
458467
end
468+
[first_byte_in_packet, data_available_to_read]
469+
end
470+
471+
# Try to read a packet from the server
472+
# Also sends keep-alive ping packets.
473+
def receive_packet
474+
first_byte_in_packet, data_available_to_read = @wait_for_read_func.call
459475
if data_available_to_read
460476
# Yes - read in the packet
461477
packet = MQTT::Packet.read(@socket, first_byte_in_packet)

spec/mqtt_client_spec.rb

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -845,11 +845,9 @@
845845
expect(@read_queue.size).to eq(1)
846846
end
847847

848-
it "should put PUBLISH messages on to the read queue following a wait readable exception" do
849-
wait_readable_exception = Class.new(StandardError) do
850-
include IO::WaitReadable
851-
end
852-
allow(socket).to receive(:read_nonblock).and_raise(wait_readable_exception)
848+
it "should put PUBLISH messages on to the read queue following an IO::WaitReadable exception",
849+
:if => OpenSSL::SSL::SSLSocket.respond_to?(:read_nonblock) do
850+
allow(socket).to receive(:read_nonblock).and_raise(IO::WaitReadable)
853851
socket.write("\x30\x0e\x00\x05topicpayload")
854852
socket.rewind
855853
client.send(:receive_packet)

0 commit comments

Comments
 (0)