Skip to content

Commit 7580fed

Browse files
Remove dead code and fix timeout handling.
1 parent e3599ba commit 7580fed

File tree

1 file changed

+5
-22
lines changed

1 file changed

+5
-22
lines changed

lib/async/container/supervisor/connection.rb

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ def close
8080
# Iterate over all responses from the call.
8181
#
8282
# @yields {|response| ...} Each response from the queue.
83-
def each(&block)
84-
while response = self.pop
83+
def each(timeout: nil, &block)
84+
while response = self.pop(timeout: timeout)
8585
yield response
8686
end
8787
end
@@ -175,7 +175,7 @@ def self.dispatch(connection, target, id, message)
175175
# @parameter message [Hash] The call message/parameters.
176176
# @yields {|response| ...} Each intermediate response if block given.
177177
# @returns [Hash, Array] The final response or array of intermediate responses.
178-
def self.call(connection, **message, &block)
178+
def self.call(connection, timeout: nil, **message, &block)
179179
id = connection.next_id
180180
call = self.new(connection, id, message)
181181

@@ -184,11 +184,11 @@ def self.call(connection, **message, &block)
184184
connection.write(id: id, **message)
185185

186186
if block_given?
187-
call.each(&block)
187+
call.each(timeout: timeout, &block)
188188
else
189189
intermediate = nil
190190

191-
while response = call.pop
191+
while response = call.pop(timeout: timeout)
192192
if response.delete(:finished)
193193
if intermediate
194194
if response.any?
@@ -247,23 +247,6 @@ def write(**message)
247247
@stream.flush
248248
end
249249

250-
# Make a synchronous call and wait for a single response.
251-
#
252-
# @parameter timeout [Numeric, nil] Optional timeout for the call.
253-
# @parameter message [Hash] The call message.
254-
# @raises [IOError | Errno::EPIPE | Errno::ECONNRESET] If the write fails.
255-
# @returns [Hash] The response.
256-
def call(timeout: nil, **message)
257-
id = next_id
258-
calls[id] = ::Thread::Queue.new
259-
260-
write(id: id, **message)
261-
262-
return calls[id].pop(timeout: timeout)
263-
ensure
264-
calls.delete(id)
265-
end
266-
267250
# Read a message from the connection stream.
268251
#
269252
# @returns [Hash, nil] The parsed message or nil if stream is closed.

0 commit comments

Comments
 (0)