Skip to content

Commit 33487bb

Browse files
authored
Ensure we don't leak any child tasks in client loop. (#11)
1 parent e369f19 commit 33487bb

File tree

3 files changed

+39
-2
lines changed

3 files changed

+39
-2
lines changed

fixtures/async/container/supervisor/a_server.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ def around
5858
let(:monitors) {[registration_monitor]}
5959
let(:server) {Async::Container::Supervisor::Server.new(endpoint: @bound_endpoint, monitors: monitors)}
6060

61+
def restart_supervisor
62+
@server_task&.stop
63+
64+
@server_task = reactor.async do
65+
server.run
66+
end
67+
end
68+
6169
before do
6270
@bound_endpoint = endpoint.bound
6371

lib/async/container/supervisor/client.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ def connect
4848

4949
# Run the client in a loop, reconnecting if necessary.
5050
def run
51-
Async(annotation: "Supervisor Client", transient: true) do
51+
Async(annotation: "Supervisor Client", transient: true) do |task|
5252
loop do
5353
connection = connect!
5454

55-
Async do
55+
connected_task = task.async do
5656
connected!(connection)
5757
end
5858

@@ -61,6 +61,10 @@ def run
6161
Console.error(self, "Connection failed:", exception: error)
6262
sleep(rand)
6363
ensure
64+
# Ensure any tasks that were created during connection are stopped:
65+
connected_task&.stop
66+
67+
# Close the connection itself:
6468
connection&.close
6569
end
6670
end

test/async/container/client.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,30 @@
3232

3333
client_task.stop
3434
end
35+
36+
it "does not leak fibers when connected! creates tasks and reconnection occurs" do
37+
state = Thread::Queue.new
38+
39+
mock(client) do |mock|
40+
mock.replace(:connected!) do
41+
state << :connected
42+
43+
Async do
44+
sleep
45+
ensure
46+
state << :disconnected
47+
end
48+
end
49+
end
50+
51+
client_task = client.run
52+
expect(state.pop).to be == :connected
53+
54+
# Interrupt the supervisor:
55+
restart_supervisor
56+
57+
expect(state.pop).to be == :disconnected
58+
expect(state.pop).to be == :connected
59+
end
3560
end
3661
end

0 commit comments

Comments
 (0)