Skip to content

Commit e9e3fb6

Browse files
committed
refactor connection to use MessagePacker's internal stream
1 parent 7d85c0e commit e9e3fb6

File tree

4 files changed

+30
-90
lines changed

4 files changed

+30
-90
lines changed

lib/async/container/supervisor/connection.rb

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ def initialize(stream, id = 0, **state)
234234
@stream = stream
235235
@id = id
236236
@state = state
237-
@message_wrapper = MessageWrapper.new
237+
@message_wrapper = MessageWrapper.new(@stream)
238238
@reader = nil
239239
@calls = {}
240240
end
@@ -259,37 +259,16 @@ def next_id
259259
#
260260
# @parameter message [Hash] The message to write.
261261
def write(**message)
262-
data = @message_wrapper.pack(message)
263-
# Write 4-byte length prefix
264-
data_length = [data.bytesize].pack("N")
265-
@stream.write(data_length + data)
266-
@stream.flush
262+
@message_wrapper.write(message)
267263
end
268264

269265
# Read a message from the connection stream.
270266
#
271267
# @returns [Hash, nil] The parsed message or nil if stream is closed.
272268
def read
273-
length_data = @stream&.read(4)
274-
return nil unless length_data && length_data.bytesize == 4
275-
276-
# Unpack 32-bit integer
277-
length = length_data.unpack1("N")
278-
279-
# Validate message size
280-
if length > MAX_MESSAGE_SIZE
281-
Console.error(self, "Message too large: #{length} bytes (max: #{MAX_MESSAGE_SIZE})")
282-
return nil
283-
end
284-
285-
# Read the exact amount of data specified
286-
data = @stream.read(length)
287-
288-
unless data && data.bytesize == length
289-
raise EOFError, "Failed to read complete message"
290-
end
291-
292-
@message_wrapper.unpack(data)
269+
@message_wrapper.read
270+
rescue EOFError, IOError
271+
nil
293272
end
294273

295274
# Iterate over all messages from the connection.

lib/async/container/supervisor/message_wrapper.rb

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,25 @@ module Async
99
module Container
1010
module Supervisor
1111
class MessageWrapper
12-
def initialize
12+
def initialize(stream)
1313
@factory = MessagePack::Factory.new
1414

1515
register_types
1616

17-
@packer = @factory.packer
17+
@packer = @factory.packer(stream)
18+
@unpacker = @factory.unpacker(stream)
19+
end
20+
21+
def write(message)
22+
data = pack(message)
23+
# Console.logger.info("Sending data: #{message.inspect}")
24+
@packer.write(data)
25+
end
26+
27+
def read
28+
data = @unpacker.read
29+
# Console.logger.info("Received data: #{data.inspect}")
30+
data
1831
end
1932

2033
def pack(message)

test/async/container/connection.rb

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@ def dispatch(call)
2121
describe Async::Container::Supervisor::Connection do
2222
let(:stream) {StringIO.new}
2323
let(:connection) {Async::Container::Supervisor::Connection.new(stream)}
24-
let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new}
24+
let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new(stream)}
25+
26+
def write_message(message)
27+
message_wrapper.write(message)
28+
stream.rewind
29+
end
2530

2631
with "dispatch" do
2732
it "handles failed writes when dispatching a call" do
28-
stream.write(JSON.dump({id: 1, do: :test}) << "\n")
29-
stream.rewind
33+
write_message({id: 1, do: :test})
3034

3135
expect(stream).to receive(:write).and_raise(IOError, "Write error")
3236

@@ -44,8 +48,7 @@ def dispatch(call)
4448
end
4549

4650
it "closes the queue when the connection fails" do
47-
stream.write(JSON.dump({id: 1, do: :test}) << "\n")
48-
stream.rewind
51+
write_message({id: 1, do: :test})
4952

5053
expect(stream).to receive(:write).and_raise(IOError, "Write error")
5154

@@ -146,48 +149,6 @@ def dispatch(call)
146149
end
147150
end
148151

149-
it "writes length-prefixed MessagePack data" do
150-
connection.write(id: 1, do: :test)
151-
152-
stream.rewind
153-
154-
# Read 2-byte length prefix
155-
length_data = stream.read(4)
156-
expect(length_data.bytesize).to be == 4
157-
158-
length = length_data.unpack1("N")
159-
expect(length).to be > 0
160-
161-
# Read MessagePack data
162-
data = stream.read(length)
163-
expect(data.bytesize).to be == length
164-
165-
# Parse MessagePack
166-
parsed = message_wrapper.unpack(data)
167-
expect(parsed).to have_keys(
168-
id: be == 1,
169-
do: be == :test
170-
)
171-
end
172-
173-
it "reads length-prefixed MessagePack data" do
174-
# Create MessagePack data
175-
message = {id: 1, do: "test"}
176-
data = message_wrapper.pack(message)
177-
178-
# Write with length prefix
179-
stream.string = [data.bytesize].pack("N") + data
180-
stream.rewind
181-
182-
parsed = connection.read
183-
184-
# Keys are symbols
185-
expect(parsed).to have_keys(
186-
id: be == 1,
187-
do: be == "test"
188-
)
189-
end
190-
191152
it "returns nil when stream is closed" do
192153
stream.string = ""
193154
stream.rewind

test/async/container/server.rb

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,12 @@
1010
include Async::Container::Supervisor::AServer
1111
include Sus::Fixtures::Console::CapturedLogger
1212

13-
let(:message_wrapper) {Async::Container::Supervisor::MessageWrapper.new}
14-
15-
# Helper to write length-prefixed MessagePack data
1613
def write_message(stream, message)
17-
data = message_wrapper.pack(message)
18-
stream.write([data.bytesize].pack("N") + data)
19-
stream.flush
14+
Async::Container::Supervisor::MessageWrapper.new(stream).write(message)
2015
end
2116

22-
# Helper to read length-prefixed MessagePack data
2317
def read_message(stream)
24-
length_data = stream.read(4)
25-
return nil unless length_data && length_data.bytesize == 4
26-
27-
length = length_data.unpack1("N")
28-
data = stream.read(length)
29-
return nil unless data && data.bytesize == length
30-
31-
message_wrapper.unpack(data)
18+
Async::Container::Supervisor::MessageWrapper.new(stream).read
3219
end
3320

3421
it "can handle unexpected failures" do

0 commit comments

Comments
 (0)