Skip to content

Commit 4e85ada

Browse files
Multi-hop invoke.
1 parent 05d6286 commit 4e85ada

File tree

11 files changed

+364
-60
lines changed

11 files changed

+364
-60
lines changed

guides/controllers/readme.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ When a controller method returns another controller, the client receives a proxy
112112

113113
```ruby
114114
# Server:
115-
server = ChatServerController.new
115+
chat = ChatServerController.new
116116

117117
server.accept do |connection|
118-
connection.bind(:chat, server)
118+
connection.bind(:chat, chat)
119119
end
120120

121121
# Client 1:
@@ -135,7 +135,7 @@ end
135135

136136
## Passing Controllers as Arguments
137137

138-
Because controllers are passed by reference, you can pass them as arguments to enable bidirectional communication. This pattern is useful for event handlers, callbacks, or subscription systems:
138+
Because controllers are passed by reference, you can pass them as arguments to enable bidirectional communication. When a client passes a proxy as an argument, the server receives a proxy that points back to the client's controller. This enables the server to call methods on the client's controller. This pattern is useful for event handlers, callbacks, or subscription systems:
139139

140140
```ruby
141141
class ChatRoomController < Async::Bus::Controller
@@ -149,7 +149,7 @@ class ChatRoomController < Async::Bus::Controller
149149
# subscriber is a proxy to the client's controller:
150150
@subscribers << subscriber
151151
# Send existing messages to the new subscriber:
152-
@messages.each { |msg| subscriber.on_message(msg) }
152+
@messages.each{|msg| subscriber.on_message(msg)}
153153
true
154154
end
155155

guides/getting-started/readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Async do
3535

3636
# Shared mutable state:
3737
items = Array.new
38-
38+
3939
server.accept do |connection|
4040
# Bind any object - it will be proxied to clients:
4141
connection.bind(:items, items)

lib/async/bus/client.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,15 @@ def connect(parent: Task.current)
6464
# This is useful for long-running clients that need to maintain a persistent connection.
6565
#
6666
# @parameter parent [Async::Task] The parent task to run under.
67-
def run(parent: Task.current)
68-
parent.async(annotation: "Bus Client", transient: true) do |task|
67+
def run
68+
Sync do |task|
6969
loop do
7070
connection = connect!
7171

7272
connected_task = task.async do
7373
connected!(connection)
74+
75+
yield(connection) if block_given?
7476
end
7577

7678
connection.run

lib/async/bus/protocol/connection.rb

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -143,19 +143,14 @@ def []=(name, object)
143143

144144
# Generate a proxy for a remotely bound object.
145145
#
146-
# **This will not return objects bound locally, only proxies for remotely bound objects.**
146+
# **This always returns a proxy, even if the object is bound locally.**
147+
# The object bus is not shared between client and server, so `[]` always
148+
# returns a proxy to the remote instance.
147149
#
148150
# @parameter name [String] The name of the bound object.
149-
# @returns [Object | Proxy] The object or proxy instance for the bound object.
151+
# @returns [Proxy] A proxy instance for the bound object.
150152
def [](name)
151-
unless proxy = @proxies[name]
152-
proxy = Proxy.new(self, name)
153-
@proxies[name] = proxy
154-
155-
::ObjectSpace.define_finalizer(proxy, finalize(name))
156-
end
157-
158-
return proxy
153+
return proxy_for(name)
159154
end
160155

161156
# Explicitly bind an object to a name, such that it could be accessed remotely.
@@ -175,8 +170,8 @@ def bind(name, object)
175170
# Bind the object into the local object store (explicitly bound, not temporary):
176171
@objects[name] = Explicit.new(object)
177172

178-
# Return the proxy instance for the bound object:
179-
return self[name]
173+
# Always return a proxy for passing by reference, even for locally bound objects:
174+
return proxy_for(name)
180175
end
181176

182177
# Implicitly bind an object with a temporary name, such that it could be accessed remotely.
@@ -188,13 +183,13 @@ def bind(name, object)
188183
# @parameter object [Object] The object to bind to a temporary name.
189184
# @returns [Proxy] A proxy instance for the bound object.
190185
def proxy(object)
191-
name = "<#{object.class}@#{next_id.to_s(16)}>".freeze
186+
name = object.__id__
192187

193188
# Bind the object into the local object store (temporary):
194-
@objects[name] = Implicit.new(object)
189+
@objects[name] ||= Implicit.new(object)
195190

196-
# This constructs the Proxy instance:
197-
return self[name]
191+
# Always return a proxy for passing by reference:
192+
return proxy_for(name)
198193
end
199194

200195
# Implicitly bind an object with a temporary name, such that it could be accessed remotely.
@@ -206,15 +201,50 @@ def proxy(object)
206201
# @parameter object [Object] The object to bind to a temporary name.
207202
# @returns [String] The name of the bound object.
208203
def proxy_name(object)
209-
name = "<#{object.class}@#{next_id.to_s(16)}>".freeze
204+
name = object.__id__
210205

211206
# Bind the object into the local object store (temporary):
212-
@objects[name] = Implicit.new(object)
207+
@objects[name] ||= Implicit.new(object)
213208

214209
# Return the name:
215210
return name
216211
end
217212

213+
# Get an object or proxy for a bound object, handling reverse lookup.
214+
#
215+
# If the object is bound locally and the proxy is for this connection, returns the actual object.
216+
# If the object is bound remotely, or the proxy is from a different connection, returns a proxy.
217+
# This is used when deserializing proxies to handle round-trip scenarios and avoid name collisions.
218+
#
219+
# @parameter name [String] The name of the bound object.
220+
# @parameter local [Boolean] Whether the proxy is for this connection (from serialization). Defaults to true.
221+
# @returns [Object | Proxy] The object if bound locally and proxy is for this connection, or a proxy otherwise.
222+
def proxy_object(name)
223+
# If the proxy is for this connection and the object is bound locally, return the actual object:
224+
if entry = @objects[name]
225+
# This handles round-trip scenarios correctly.
226+
return entry.object
227+
end
228+
229+
# Otherwise, create a proxy for the remote object:
230+
return proxy_for(name)
231+
end
232+
233+
# Get or create a proxy for a named object.
234+
#
235+
# @parameter name [String] The name of the object.
236+
# @returns [Proxy] A proxy instance for the named object.
237+
private def proxy_for(name)
238+
unless proxy = @proxies[name]
239+
proxy = Proxy.new(self, name)
240+
@proxies[name] = proxy
241+
242+
::ObjectSpace.define_finalizer(proxy, finalize(name))
243+
end
244+
245+
return proxy
246+
end
247+
218248
private def finalize(name)
219249
proc do
220250
@finalized.push(name) rescue nil

lib/async/bus/protocol/proxy.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ def initialize(connection, name)
1919
@name = name
2020
end
2121

22+
# Get the connection to the remote object.
23+
# @returns [Connection] The connection to the remote object.
24+
def __connection__
25+
@connection
26+
end
27+
2228
# Get the name of the remote object.
2329
# @returns [Symbol] The name of the remote object.
2430
def __name__

lib/async/bus/protocol/wrapper.rb

Lines changed: 71 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,54 +26,91 @@ def initialize(connection, reference_types: [Controller])
2626
@connection = connection
2727
@reference_types = reference_types
2828

29+
# Store the peer connection for forwarding proxies:
30+
# When a proxy is forwarded (local=false), it should point back to the sender
31+
# (the peer connection), not the receiver (this connection).
32+
@peer_connection = nil
33+
2934
# The order here matters.
3035

3136
self.register_type(0x00, Invoke, recursive: true,
32-
packer: ->(invoke, packer){invoke.pack(packer)},
33-
unpacker: ->(unpacker){Invoke.unpack(unpacker)},
34-
)
37+
packer: ->(invoke, packer){invoke.pack(packer)},
38+
unpacker: ->(unpacker){Invoke.unpack(unpacker)},
39+
)
3540

3641
[Return, Yield, Error, Next, Throw, Close].each_with_index do |klass, index|
3742
self.register_type(0x01 + index, klass, recursive: true,
38-
packer: ->(value, packer){value.pack(packer)},
39-
unpacker: ->(unpacker){klass.unpack(unpacker)},
40-
)
43+
packer: ->(value, packer){value.pack(packer)},
44+
unpacker: ->(unpacker){klass.unpack(unpacker)},
45+
)
4146
end
4247

4348
# Reverse serialize proxies back into proxies:
44-
# When a Proxy is received, create a proxy pointing back
49+
# When a Proxy is received, use proxy_object to handle reverse lookup
4550
self.register_type(0x10, Proxy, recursive: true,
46-
# Since name can be a Symbol or String, we need to use recursive packing to ensure the name is properly serialized.
47-
packer: ->(proxy, packer){packer.write(proxy.__name__)},
48-
unpacker: ->(unpacker){connection[unpacker.read]},
49-
)
51+
packer: self.method(:pack_proxy),
52+
unpacker: self.method(:unpack_proxy),
53+
)
5054

5155
self.register_type(0x11, Release, recursive: true,
52-
packer: ->(release, packer){release.pack(packer)},
53-
unpacker: ->(unpacker){Release.unpack(unpacker)},
54-
)
56+
packer: ->(release, packer){release.pack(packer)},
57+
unpacker: ->(unpacker){Release.unpack(unpacker)},
58+
)
5559

5660
self.register_type(0x20, Symbol)
57-
self.register_type(0x21, Exception,
58-
packer: self.method(:pack_exception),
59-
unpacker: self.method(:unpack_exception),
60-
recursive: true,
61-
)
61+
self.register_type(0x21, Exception, recursive: true,
62+
packer: self.method(:pack_exception),
63+
unpacker: self.method(:unpack_exception),
64+
)
6265

6366
self.register_type(0x22, Class,
64-
packer: ->(klass){klass.name},
65-
unpacker: ->(name){Object.const_get(name)},
66-
)
67+
packer: ->(klass){klass.name},
68+
unpacker: ->(name){Object.const_get(name)},
69+
)
70+
71+
reference_packer = self.method(:pack_reference)
72+
reference_unpacker = self.method(:unpack_reference)
6773

6874
# Serialize objects into proxies:
6975
reference_types&.each_with_index do |klass, index|
70-
self.register_type(0x30 + index, klass,
71-
packer: connection.method(:proxy_name),
72-
unpacker: connection.method(:[]),
73-
)
76+
self.register_type(0x30 + index, klass, recursive: true,
77+
packer: reference_packer,
78+
unpacker: reference_unpacker,
79+
)
7480
end
7581
end
7682

83+
# Pack a proxy into a MessagePack packer.
84+
#
85+
# Validates that the proxy is for this connection and serializes the proxy name.
86+
# Multi-hop proxy forwarding is not supported, so proxies can only be serialized
87+
# from the same connection they were created for (round-trip scenarios).
88+
#
89+
# @parameter proxy [Proxy] The proxy to serialize.
90+
# @parameter packer [MessagePack::Packer] The packer to write to.
91+
# @raises [ArgumentError] If the proxy is from a different connection (multi-hop forwarding not supported).
92+
def pack_proxy(proxy, packer)
93+
# Check if the proxy is for this connection:
94+
if proxy.__connection__ != @connection
95+
proxy = @connection.proxy(proxy)
96+
end
97+
98+
packer.write(proxy.__name__)
99+
end
100+
101+
# Unpack a proxy from a MessagePack unpacker.
102+
#
103+
# When deserializing a proxy:
104+
# - If the object is bound locally, return the actual object (round-trip scenario)
105+
# - If the object is not found locally, create a proxy pointing to this connection
106+
# (the proxy was forwarded from another connection and should point back to the sender)
107+
#
108+
# @parameter unpacker [MessagePack::Unpacker] The unpacker to read from.
109+
# @returns [Object | Proxy] The actual object if bound locally, or a proxy pointing to this connection.
110+
def unpack_proxy(unpacker)
111+
@connection.proxy_object(unpacker.read)
112+
end
113+
77114
# Pack an exception into a MessagePack packer.
78115
# @parameter exception [Exception] The exception to pack.
79116
# @parameter packer [MessagePack::Packer] The packer to write to.
@@ -98,6 +135,14 @@ def unpack_exception(unpacker)
98135

99136
return exception
100137
end
138+
139+
def pack_reference(object, packer)
140+
packer.write(@connection.proxy_name(object))
141+
end
142+
143+
def unpack_reference(unpacker)
144+
@connection.proxy_object(unpacker.read)
145+
end
101146
end
102147
end
103148
end

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Add support for multi-hop proxying.
6+
37
## v0.2.0
48

59
- Fix handling of temporary objects.

test/async/bus/client.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def initialize(endpoint, connected_count)
4444
end
4545
end
4646

47-
client_task = client_instance.run
47+
client_task = Async {client_instance.run}
4848

4949
# Wait for initial connection
5050
reactor.sleep(0.01)
@@ -87,7 +87,7 @@ def initialize(endpoint, connected_count, connection_count)
8787
end
8888
end
8989

90-
client_task = client_instance.run
90+
client_task = Async {client_instance.run}
9191

9292
# Wait for initial connection
9393
reactor.sleep(0.02)
@@ -130,7 +130,7 @@ def initialize(endpoint, state)
130130
end
131131
end
132132

133-
client_task = client_instance.run
133+
client_task = Async {client_instance.run}
134134

135135
# Wait for initial connection
136136
reactor.sleep(0.01)
@@ -172,7 +172,7 @@ def initialize(endpoint, error_count)
172172
end
173173
end
174174

175-
client_task = client_instance.run
175+
client_task = Async {client_instance.run}
176176

177177
# Wait for reconnection after initial failure
178178
# The first attempt fails, sleeps (rand, so 0-1 seconds), then retries

0 commit comments

Comments
 (0)