Skip to content

Commit 7d85c0e

Browse files
committed
refactor connection to use MessagePack instead of JSON
1 parent e369f19 commit 7d85c0e

File tree

6 files changed

+197
-53
lines changed

6 files changed

+197
-53
lines changed

bake/async/container/supervisor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def memory_sample(duration: 10, connection_id:)
4444
operation = {do: :memory_sample, duration: duration}
4545

4646
# Use the forward operation to proxy the request to a worker:
47-
return connection.call(do: :forward, operation: operation, connection_id: connection_id)
47+
connection.call(do: :forward, operation: operation, connection_id: connection_id)
4848
end
4949
end
5050

examples/simple/simple.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def setup(container)
3333
Console.info(self, "Exiting...")
3434
end
3535
end
36-
end
36+
end
3737
end
3838

3939
service "sleep" do

lib/async/container/supervisor/connection.rb

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
require "json"
77
require "async"
8+
require_relative "message_wrapper"
89

910
module Async
1011
module Container
@@ -13,6 +14,8 @@ module Supervisor
1314
#
1415
# Handles message passing, call/response patterns, and connection lifecycle.
1516
class Connection
17+
MAX_MESSAGE_SIZE = 2 ** 32 - 1
18+
1619
# Represents a remote procedure call over a connection.
1720
#
1821
# Manages the call lifecycle, response queueing, and completion signaling.
@@ -231,7 +234,7 @@ def initialize(stream, id = 0, **state)
231234
@stream = stream
232235
@id = id
233236
@state = state
234-
237+
@message_wrapper = MessageWrapper.new
235238
@reader = nil
236239
@calls = {}
237240
end
@@ -251,19 +254,42 @@ def next_id
251254

252255
# Write a message to the connection stream.
253256
#
257+
# Uses a length-prefixed protocol: 2-byte length header (big-endian) followed by data.
258+
# This allows MessagePack data to contain newlines without breaking message boundaries.
259+
#
254260
# @parameter message [Hash] The message to write.
255261
def write(**message)
256-
@stream.write(JSON.dump(message) << "\n")
262+
data = @message_wrapper.pack(message)
263+
# Write 4-byte length prefix
264+
data_length = [data.bytesize].pack("N")
265+
@stream.write(data_length + data)
257266
@stream.flush
258267
end
259268

260269
# Read a message from the connection stream.
261270
#
262271
# @returns [Hash, nil] The parsed message or nil if stream is closed.
263272
def read
264-
if line = @stream&.gets
265-
JSON.parse(line, symbolize_names: true)
273+
length_data = @stream&.read(4)
274+
return nil unless length_data && length_data.bytesize == 4
275+
276+
# Unpack 32-bit integer
277+
length = length_data.unpack1("N")
278+
279+
# Validate message size
280+
if length > MAX_MESSAGE_SIZE
281+
Console.error(self, "Message too large: #{length} bytes (max: #{MAX_MESSAGE_SIZE})")
282+
return nil
266283
end
284+
285+
# Read the exact amount of data specified
286+
data = @stream.read(length)
287+
288+
unless data && data.bytesize == length
289+
raise EOFError, "Failed to read complete message"
290+
end
291+
292+
@message_wrapper.unpack(data)
267293
end
268294

269295
# Iterate over all messages from the connection.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "msgpack"
7+
8+
module Async
9+
module Container
10+
module Supervisor
11+
class MessageWrapper
12+
def initialize
13+
@factory = MessagePack::Factory.new
14+
15+
register_types
16+
17+
@packer = @factory.packer
18+
end
19+
20+
def pack(message)
21+
@packer.clear
22+
normalized_message = normalize(message)
23+
@packer.pack(normalized_message)
24+
@packer.full_pack
25+
end
26+
27+
def unpack(data)
28+
@factory.unpack(data)
29+
end
30+
31+
private
32+
33+
def normalize(obj)
34+
case obj
35+
when Hash
36+
obj.transform_values{|v| normalize(v)}
37+
when Array
38+
obj.map{|v| normalize(v)}
39+
else
40+
if obj.respond_to?(:as_json)
41+
normalize(obj.as_json)
42+
else
43+
obj
44+
end
45+
end
46+
end
47+
48+
def register_types
49+
@factory.register_type(0x00, Symbol)
50+
51+
@factory.register_type(
52+
0x01,
53+
Exception,
54+
packer: self.method(:pack_exception),
55+
unpacker: self.method(:unpack_exception),
56+
recursive: true,
57+
)
58+
59+
@factory.register_type(
60+
0x02,
61+
Class,
62+
packer: ->(klass) {klass.name},
63+
unpacker: ->(name) {name},
64+
)
65+
66+
@factory.register_type(
67+
MessagePack::Timestamp::TYPE,
68+
Time,
69+
packer: MessagePack::Time::Packer,
70+
unpacker: MessagePack::Time::Unpacker
71+
)
72+
end
73+
74+
def pack_exception(exception)
75+
[exception.class.name, exception.message, exception.backtrace].pack("A*")
76+
end
77+
78+
def unpack_exception(data)
79+
klass, message, backtrace = data.unpack("A*A*A*")
80+
klass = Object.const_get(klass)
81+
82+
exception = klass.new(message)
83+
exception.set_backtrace(backtrace)
84+
85+
return exception
86+
end
87+
end
88+
end
89+
end
90+
end

test/async/container/connection.rb

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
require "async/container/supervisor/connection"
77
require "sus/fixtures/async/scheduler_context"
88
require "stringio"
9+
require "msgpack"
910

1011
class TestTarget
1112
def initialize(&block)
@@ -20,6 +21,7 @@ def dispatch(call)
2021
describe Async::Container::Supervisor::Connection do
2122
let(:stream) {StringIO.new}
2223
let(:connection) {Async::Container::Supervisor::Connection.new(stream)}
24+
let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new}
2325

2426
with "dispatch" do
2527
it "handles failed writes when dispatching a call" do
@@ -88,16 +90,16 @@ def dispatch(call)
8890
parsed = JSON.parse(json)
8991

9092
expect(parsed).to have_keys(
91-
"do" => be == "test",
92-
"data" => be == "value"
93-
)
93+
"do" => be == "test",
94+
"data" => be == "value"
95+
)
9496
end
9597

9698
it "can get call message via as_json" do
9799
expect(test_call.as_json).to have_keys(
98-
do: be == :test,
99-
data: be == "value"
100-
)
100+
do: be == :test,
101+
data: be == "value"
102+
)
101103
end
102104

103105
it "can iterate over call responses with each" do
@@ -121,11 +123,11 @@ def dispatch(call)
121123

122124
response = test_call.pop
123125
expect(response).to have_keys(
124-
id: be == 1,
125-
finished: be == true,
126-
failed: be == true,
127-
error: be == "Something went wrong"
128-
)
126+
id: be == 1,
127+
finished: be == true,
128+
failed: be == true,
129+
error: be == "Something went wrong"
130+
)
129131

130132
expect(test_call.closed?).to be == true
131133
end
@@ -144,30 +146,43 @@ def dispatch(call)
144146
end
145147
end
146148

147-
it "writes JSON with newline" do
149+
it "writes length-prefixed MessagePack data" do
148150
connection.write(id: 1, do: :test)
149151

150152
stream.rewind
151-
output = stream.read
152153

153-
# Check it's valid JSON with a newline
154-
expect(output[-1]).to be == "\n"
154+
# Read 2-byte length prefix
155+
length_data = stream.read(4)
156+
expect(length_data.bytesize).to be == 4
155157

156-
parsed = JSON.parse(output.chomp)
158+
length = length_data.unpack1("N")
159+
expect(length).to be > 0
160+
161+
# Read MessagePack data
162+
data = stream.read(length)
163+
expect(data.bytesize).to be == length
164+
165+
# Parse MessagePack
166+
parsed = message_wrapper.unpack(data)
157167
expect(parsed).to have_keys(
158-
"id" => be == 1,
159-
"do" => be == "test"
168+
id: be == 1,
169+
do: be == :test
160170
)
161171
end
162172

163-
it "parses JSON lines" do
164-
stream.string = JSON.dump({id: 1, do: "test"}) << "\n"
173+
it "reads length-prefixed MessagePack data" do
174+
# Create MessagePack data
175+
message = {id: 1, do: "test"}
176+
data = message_wrapper.pack(message)
177+
178+
# Write with length prefix
179+
stream.string = [data.bytesize].pack("N") + data
165180
stream.rewind
166181

167-
message = connection.read
182+
parsed = connection.read
168183

169-
# Connection.read uses symbolize_names: true (keys are symbols, values are as-is)
170-
expect(message).to have_keys(
184+
# Keys are symbols
185+
expect(parsed).to have_keys(
171186
id: be == 1,
172187
do: be == "test"
173188
)

0 commit comments

Comments
 (0)