Skip to content

Commit bab4a10

Browse files
Restore binary message support in message queue setups (Fixes #1508) (#1509)
1 parent 383eeaf commit bab4a10

File tree

6 files changed

+194
-30
lines changed

6 files changed

+194
-30
lines changed

src/socketio/async_pubsub_manager.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import asyncio
2+
import base64
23
from functools import partial
34
import uuid
45

56
from engineio import json
67

78
from .async_manager import AsyncManager
9+
from .packet import Packet
810

911

1012
class AsyncPubSubManager(AsyncManager):
@@ -64,8 +66,12 @@ async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
6466
callback = (room, namespace, id)
6567
else:
6668
callback = None
69+
binary = Packet.data_is_binary(data)
70+
if binary:
71+
data, attachments = Packet.deconstruct_binary(data)
72+
data = [data, *[base64.b64encode(a).decode() for a in attachments]]
6773
message = {'method': 'emit', 'event': event, 'data': data,
68-
'namespace': namespace, 'room': room,
74+
'binary': binary, 'namespace': namespace, 'room': room,
6975
'skip_sid': skip_sid, 'callback': callback,
7076
'host_id': self.host_id}
7177
await self._handle_emit(message) # handle in this host
@@ -145,7 +151,11 @@ async def _handle_emit(self, message):
145151
*remote_callback)
146152
else:
147153
callback = None
148-
await super().emit(message['event'], message['data'],
154+
data = message['data']
155+
if message.get('binary'):
156+
attachments = [base64.b64decode(a) for a in data[1:]]
157+
data = Packet.reconstruct_binary(data[0], attachments)
158+
await super().emit(message['event'], data,
149159
namespace=message.get('namespace'),
150160
room=message.get('room'),
151161
skip_sid=message.get('skip_sid'),

src/socketio/packet.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__(self, packet_type=EVENT, data=None, namespace=None, id=None,
2929
self.namespace = namespace
3030
self.id = id
3131
if self.uses_binary_events and \
32-
(binary or (binary is None and self._data_is_binary(
32+
(binary or (binary is None and self.data_is_binary(
3333
self.data))):
3434
if self.packet_type == EVENT:
3535
self.packet_type = BINARY_EVENT
@@ -51,7 +51,7 @@ def encode(self):
5151
"""
5252
encoded_packet = str(self.packet_type)
5353
if self.packet_type == BINARY_EVENT or self.packet_type == BINARY_ACK:
54-
data, attachments = self._deconstruct_binary(self.data)
54+
data, attachments = self.deconstruct_binary(self.data)
5555
encoded_packet += str(len(attachments)) + '-'
5656
else:
5757
data = self.data
@@ -119,61 +119,65 @@ def add_attachment(self, attachment):
119119
raise ValueError('Unexpected binary attachment')
120120
self.attachments.append(attachment)
121121
if self.attachment_count == len(self.attachments):
122-
self.reconstruct_binary(self.attachments)
122+
self.data = self.reconstruct_binary(self.data, self.attachments)
123123
return True
124124
return False
125125

126-
def reconstruct_binary(self, attachments):
126+
@classmethod
127+
def reconstruct_binary(cls, data, attachments):
127128
"""Reconstruct a decoded packet using the given list of binary
128129
attachments.
129130
"""
130-
self.data = self._reconstruct_binary_internal(self.data,
131-
self.attachments)
131+
return cls._reconstruct_binary_internal(data, attachments)
132132

133-
def _reconstruct_binary_internal(self, data, attachments):
133+
@classmethod
134+
def _reconstruct_binary_internal(cls, data, attachments):
134135
if isinstance(data, list):
135-
return [self._reconstruct_binary_internal(item, attachments)
136+
return [cls._reconstruct_binary_internal(item, attachments)
136137
for item in data]
137138
elif isinstance(data, dict):
138139
if data.get('_placeholder') and 'num' in data:
139140
return attachments[data['num']]
140141
else:
141-
return {key: self._reconstruct_binary_internal(value,
142-
attachments)
142+
return {key: cls._reconstruct_binary_internal(value,
143+
attachments)
143144
for key, value in data.items()}
144145
else:
145146
return data
146147

147-
def _deconstruct_binary(self, data):
148+
@classmethod
149+
def deconstruct_binary(cls, data):
148150
"""Extract binary components in the packet."""
149151
attachments = []
150-
data = self._deconstruct_binary_internal(data, attachments)
152+
data = cls._deconstruct_binary_internal(data, attachments)
151153
return data, attachments
152154

153-
def _deconstruct_binary_internal(self, data, attachments):
155+
@classmethod
156+
def _deconstruct_binary_internal(cls, data, attachments):
154157
if isinstance(data, bytes):
155158
attachments.append(data)
156159
return {'_placeholder': True, 'num': len(attachments) - 1}
157160
elif isinstance(data, list):
158-
return [self._deconstruct_binary_internal(item, attachments)
161+
return [cls._deconstruct_binary_internal(item, attachments)
159162
for item in data]
160163
elif isinstance(data, dict):
161-
return {key: self._deconstruct_binary_internal(value, attachments)
164+
return {key: cls._deconstruct_binary_internal(value, attachments)
162165
for key, value in data.items()}
163166
else:
164167
return data
165168

166-
def _data_is_binary(self, data):
169+
@classmethod
170+
def data_is_binary(cls, data):
167171
"""Check if the data contains binary components."""
168172
if isinstance(data, bytes):
169173
return True
170174
elif isinstance(data, list):
171175
return functools.reduce(
172-
lambda a, b: a or b, [self._data_is_binary(item)
176+
lambda a, b: a or b, [cls.data_is_binary(item)
173177
for item in data], False)
174178
elif isinstance(data, dict):
175179
return functools.reduce(
176-
lambda a, b: a or b, [self._data_is_binary(item)
180+
lambda a, b: a or b, [cls.data_is_binary(item)
177181
for item in data.values()],
178182
False)
179183
else:

src/socketio/pubsub_manager.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import base64
12
from functools import partial
23
import uuid
34

45
from engineio import json
56

67
from .manager import Manager
8+
from .packet import Packet
79

810

911
class PubSubManager(Manager):
@@ -61,8 +63,12 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
6163
callback = (room, namespace, id)
6264
else:
6365
callback = None
66+
binary = Packet.data_is_binary(data)
67+
if binary:
68+
data, attachments = Packet.deconstruct_binary(data)
69+
data = [data, *[base64.b64encode(a).decode() for a in attachments]]
6470
message = {'method': 'emit', 'event': event, 'data': data,
65-
'namespace': namespace, 'room': room,
71+
'binary': binary, 'namespace': namespace, 'room': room,
6672
'skip_sid': skip_sid, 'callback': callback,
6773
'host_id': self.host_id}
6874
self._handle_emit(message) # handle in this host
@@ -141,7 +147,11 @@ def _handle_emit(self, message):
141147
*remote_callback)
142148
else:
143149
callback = None
144-
super().emit(message['event'], message['data'],
150+
data = message['data']
151+
if message.get('binary'):
152+
attachments = [base64.b64decode(a) for a in data[1:]]
153+
data = Packet.reconstruct_binary(data[0], attachments)
154+
super().emit(message['event'], data,
145155
namespace=message.get('namespace'),
146156
room=message.get('room'),
147157
skip_sid=message.get('skip_sid'), callback=callback)

tests/async/test_pubsub_manager.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ async def test_emit(self):
5757
{
5858
'method': 'emit',
5959
'event': 'foo',
60+
'binary': False,
6061
'data': 'bar',
6162
'namespace': '/',
6263
'room': None,
@@ -66,13 +67,44 @@ async def test_emit(self):
6667
}
6768
)
6869

70+
async def test_emit_binary(self):
71+
await self.pm.emit('foo', b'bar')
72+
self.pm._publish.assert_awaited_once_with(
73+
{
74+
'method': 'emit',
75+
'event': 'foo',
76+
'binary': True,
77+
'data': [{'_placeholder': True, 'num': 0}, 'YmFy'],
78+
'namespace': '/',
79+
'room': None,
80+
'skip_sid': None,
81+
'callback': None,
82+
'host_id': '123456',
83+
}
84+
)
85+
await self.pm.emit('foo', {'foo': b'bar'})
86+
self.pm._publish.assert_awaited_with(
87+
{
88+
'method': 'emit',
89+
'event': 'foo',
90+
'binary': True,
91+
'data': [{'foo': {'_placeholder': True, 'num': 0}}, 'YmFy'],
92+
'namespace': '/',
93+
'room': None,
94+
'skip_sid': None,
95+
'callback': None,
96+
'host_id': '123456',
97+
}
98+
)
99+
69100
async def test_emit_with_to(self):
70101
sid = 'room-mate'
71102
await self.pm.emit('foo', 'bar', to=sid)
72103
self.pm._publish.assert_awaited_once_with(
73104
{
74105
'method': 'emit',
75106
'event': 'foo',
107+
'binary': False,
76108
'data': 'bar',
77109
'namespace': '/',
78110
'room': sid,
@@ -88,6 +120,7 @@ async def test_emit_with_namespace(self):
88120
{
89121
'method': 'emit',
90122
'event': 'foo',
123+
'binary': False,
91124
'data': 'bar',
92125
'namespace': '/baz',
93126
'room': None,
@@ -103,6 +136,7 @@ async def test_emit_with_room(self):
103136
{
104137
'method': 'emit',
105138
'event': 'foo',
139+
'binary': False,
106140
'data': 'bar',
107141
'namespace': '/',
108142
'room': 'baz',
@@ -118,6 +152,7 @@ async def test_emit_with_skip_sid(self):
118152
{
119153
'method': 'emit',
120154
'event': 'foo',
155+
'binary': False,
121156
'data': 'bar',
122157
'namespace': '/',
123158
'room': None,
@@ -136,6 +171,7 @@ async def test_emit_with_callback(self):
136171
{
137172
'method': 'emit',
138173
'event': 'foo',
174+
'binary': False,
139175
'data': 'bar',
140176
'namespace': '/',
141177
'room': 'baz',
@@ -241,6 +277,37 @@ async def test_handle_emit(self):
241277
callback=None,
242278
)
243279

280+
async def test_handle_emit_binary(self):
281+
with mock.patch.object(
282+
async_manager.AsyncManager, 'emit'
283+
) as super_emit:
284+
await self.pm._handle_emit({
285+
'event': 'foo',
286+
'binary': True,
287+
'data': [{'_placeholder': True, 'num': 0}, 'YmFy'],
288+
})
289+
super_emit.assert_awaited_once_with(
290+
'foo',
291+
b'bar',
292+
namespace=None,
293+
room=None,
294+
skip_sid=None,
295+
callback=None,
296+
)
297+
await self.pm._handle_emit({
298+
'event': 'foo',
299+
'binary': True,
300+
'data': [{'foo': {'_placeholder': True, 'num': 0}}, 'YmFy'],
301+
})
302+
super_emit.assert_awaited_with(
303+
'foo',
304+
{'foo': b'bar'},
305+
namespace=None,
306+
room=None,
307+
skip_sid=None,
308+
callback=None,
309+
)
310+
244311
async def test_handle_emit_with_namespace(self):
245312
with mock.patch.object(
246313
async_manager.AsyncManager, 'emit'

tests/common/test_packet.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -266,16 +266,24 @@ def test_decode_dash_in_payload(self):
266266
assert pkt.data["a"] == "0123456789-"
267267
assert pkt.attachment_count == 0
268268

269+
def test_deconstruct_binary(self):
270+
datas = [b'foo', [b'foo', b'bar'], ['foo', b'bar'], {'foo': b'bar'},
271+
{'foo': 'bar', 'baz': b'qux'}, {'foo': [b'bar']}]
272+
for data in datas:
273+
bdata, attachments = packet.Packet.deconstruct_binary(data)
274+
rdata = packet.Packet.reconstruct_binary(bdata, attachments)
275+
assert data == rdata
276+
269277
def test_data_is_binary_list(self):
270278
pkt = packet.Packet()
271-
assert not pkt._data_is_binary(['foo'])
272-
assert not pkt._data_is_binary([])
273-
assert pkt._data_is_binary([b'foo'])
274-
assert pkt._data_is_binary(['foo', b'bar'])
279+
assert not pkt.data_is_binary(['foo'])
280+
assert not pkt.data_is_binary([])
281+
assert pkt.data_is_binary([b'foo'])
282+
assert pkt.data_is_binary(['foo', b'bar'])
275283

276284
def test_data_is_binary_dict(self):
277285
pkt = packet.Packet()
278-
assert not pkt._data_is_binary({'a': 'foo'})
279-
assert not pkt._data_is_binary({})
280-
assert pkt._data_is_binary({'a': b'foo'})
281-
assert pkt._data_is_binary({'a': 'foo', 'b': b'bar'})
286+
assert not pkt.data_is_binary({'a': 'foo'})
287+
assert not pkt.data_is_binary({})
288+
assert pkt.data_is_binary({'a': b'foo'})
289+
assert pkt.data_is_binary({'a': 'foo', 'b': b'bar'})

0 commit comments

Comments
 (0)