Skip to content

Commit ce44133

Browse files
redis message queue for asyncio
1 parent 2e55d7f commit ce44133

14 files changed

+654
-38
lines changed

docs/index.rst

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -363,11 +363,13 @@ type of installation, each server processes owns the connections to a subset
363363
of the clients. To make broadcasting work in this environment, the servers
364364
communicate with each other through the message queue.
365365

366-
The message queue service needs to be installed and configured separately. One
367-
of the options offered by this package is to use
368-
`Kombu <http://kombu.readthedocs.org/en/latest/>`_ to access the message
369-
queue, which means that any message queue supported by this package can be
370-
used. Kombu can be installed with pip::
366+
Kombu
367+
~~~~~
368+
369+
One of the messaging options offered by this package to access the message
370+
queue is `Kombu <http://kombu.readthedocs.org/en/latest/>`_ , which means that
371+
any message queue supported by this package can be used. Kombu can be installed
372+
with pip::
371373

372374
pip install kombu
373375

@@ -378,7 +380,8 @@ package for Redis installed as well::
378380

379381
pip install redis
380382

381-
To configure a Socket.IO server to connect to a message queue, the
383+
The appropriate message queue service, such as RabbitMQ or Redis, must also be
384+
installed. To configure a Socket.IO server to connect to a Kombu queue, the
382385
``client_manager`` argument must be passed in the server creation. The
383386
following example instructs the server to connect to a Redis service running
384387
on the same host and on the default port::
@@ -392,39 +395,63 @@ credentials, the configuration is as follows::
392395
mgr = socketio.KombuManager('amqp://')
393396
sio = socketio.Server(client_manager=mgr)
394397

395-
The URL passed to the ``KombuManager`` constructor is passed directly to
398+
The URL passed to the :class:`KombuManager` constructor is passed directly to
396399
Kombu's `Connection object
397400
<http://kombu.readthedocs.org/en/latest/userguide/connections.html>`_, so
398401
the Kombu documentation should be consulted for information on how to
399402
connect to the message queue appropriately.
400403

401-
If the use of Kombu is not desired, native Redis support is also offered
402-
through the ``RedisManager`` class. This class takes the same arguments as
403-
``KombuManager``, but connects directly to a Redis store using the queue's
404-
pub/sub functionality::
404+
Note that Kombu currently does not support asyncio, so it cannot be used with
405+
the :class:`socketio.AsyncServer` class.
406+
407+
Redis
408+
~~~~~
409+
410+
To use a Redis message queue, the Python package for Redis must also be
411+
installed::
412+
413+
# WSGI server
414+
pip install redis
415+
416+
# asyncio server
417+
pip install aioredis
418+
419+
Native Redis support is accessed through the :class:`socketio.RedisManager` and
420+
:class:`socketio.AsyncRedisManager` classes. These classes connect directly to
421+
the Redis store and use the queue's pub/sub functionality::
405422

423+
# WSGI server
406424
mgr = socketio.RedisManager('redis://')
407425
sio = socketio.Server(client_manager=mgr)
408426

409-
If multiple Socket.IO servers are connected to a message queue, they
427+
# asyncio server
428+
mgr = socketio.AsyncRedisManager('redis://')
429+
sio = socketio.AsyncServer(client_manager=mgr)
430+
431+
Horizontal scaling
432+
~~~~~~~~~~~~~~~~~~
433+
434+
If multiple Socket.IO servers are connected to the same message queue, they
410435
automatically communicate with each other and manage a combined client list,
411-
without any need for additional configuration. To have a process other than
412-
a server connect to the queue to emit a message, the same ``KombuManager``
413-
and ``RedisManager`` classes can be used as standalone object. In this case,
414-
the ``write_only`` argument should be set to ``True`` to disable the creation
415-
of a listening thread. For example::
436+
without any need for additional configuration. When a load balancer such as
437+
nginx is used, this provides virtually unlimited scaling capabilities for the
438+
server.
439+
440+
Emitting from external processes
441+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
442+
443+
To have a process other than a server connect to the queue to emit a message,
444+
the same client manager classes can be used as standalone objects. In this
445+
case, the ``write_only`` argument should be set to ``True`` to disable the
446+
creation of a listening thread, which only makes sense in a server. For
447+
example::
416448

417449
# connect to the redis queue through Kombu
418450
external_sio = socketio.KombuManager('redis://', write_only=True)
419451
420452
# emit an event
421453
external_sio.emit('my event', data={'foo': 'bar'}, room='my room')
422454

423-
Note: when using a third party package to manage a message queue such as Redis
424-
or RabbitMQ in conjunction with eventlet or gevent, it is necessary to monkey
425-
patch the Python standard library, so that these packages access coroutine
426-
friendly library functions and classes.
427-
428455
Deployment
429456
----------
430457

@@ -663,3 +690,6 @@ API Reference
663690
.. autoclass:: AsyncManager
664691
:members:
665692
:inherited-members:
693+
694+
.. autoclass:: AsyncRedisManager
695+
:members:

examples/aiohttp/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async def close(sid, message):
5454
await sio.emit('my response',
5555
{'data': 'Room ' + message['room'] + ' is closing.'},
5656
room=message['room'], namespace='/test')
57-
sio.close_room(message['room'], namespace='/test')
57+
await sio.close_room(message['room'], namespace='/test')
5858

5959

6060
@sio.on('my room event', namespace='/test')

socketio/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,18 @@
1212
from .asyncio_server import AsyncServer
1313
from .asyncio_manager import AsyncManager
1414
from .asyncio_namespace import AsyncNamespace
15+
from .asyncio_redis_manager import AsyncRedisManager
1516
else: # pragma: no cover
1617
AsyncServer = None
1718
AsyncManager = None
1819
AsyncNamespace = None
20+
AsyncRedisManager = None
1921

2022
__version__ = '1.6.3'
2123

2224
__all__ = ['__version__', 'Middleware', 'Server', 'BaseManager',
2325
'PubSubManager', 'KombuManager', 'RedisManager', 'ZmqManager',
2426
'Namespace']
2527
if AsyncServer is not None: # pragma: no cover
26-
__all__.append('AsyncServer')
27-
__all__.append('AsyncNamespace')
28-
__all__.append('AsyncManager')
28+
__all__ += ['AsyncServer', 'AsyncNamespace', 'AsyncManager',
29+
'AsyncRedisManager']

socketio/asyncio_manager.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ async def emit(self, event, data, namespace, room=None, skip_sid=None,
2525
namespace, id))
2626
await asyncio.wait(tasks)
2727

28+
async def close_room(self, room, namespace):
29+
"""Remove all participants from a room.
30+
31+
Note: this method is a coroutine.
32+
"""
33+
return super().close_room(room, namespace)
34+
2835
async def trigger_callback(self, sid, namespace, id, data):
2936
"""Invoke an application callback.
3037

socketio/asyncio_namespace.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,18 @@ async def send(self, data, room=None, skip_sid=None, namespace=None,
7070
namespace=namespace or self.namespace,
7171
callback=callback)
7272

73+
async def close_room(self, room, namespace=None):
74+
"""Close a room.
75+
76+
The only difference with the :func:`socketio.Server.close_room` method
77+
is that when the ``namespace`` argument is not given the namespace
78+
associated with the class is used.
79+
80+
Note: this method is a coroutine.
81+
"""
82+
return await self.server.close_room(
83+
room, namespace=namespace or self.namespace)
84+
7385
async def disconnect(self, sid, namespace=None):
7486
"""Disconnect a client.
7587

socketio/asyncio_pubsub_manager.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
from functools import partial
2+
import uuid
3+
4+
import json
5+
import pickle
6+
import six
7+
8+
from .asyncio_manager import AsyncManager
9+
10+
11+
class AsyncPubSubManager(AsyncManager):
12+
"""Manage a client list attached to a pub/sub backend under asyncio.
13+
14+
This is a base class that enables multiple servers to share the list of
15+
clients, with the servers communicating events through a pub/sub backend.
16+
The use of a pub/sub backend also allows any client connected to the
17+
backend to emit events addressed to Socket.IO clients.
18+
19+
The actual backends must be implemented by subclasses, this class only
20+
provides a pub/sub generic framework for asyncio applications.
21+
22+
:param channel: The channel name on which the server sends and receives
23+
notifications.
24+
"""
25+
name = 'asyncpubsub'
26+
27+
def __init__(self, channel='socketio', write_only=False):
28+
super().__init__()
29+
self.channel = channel
30+
self.write_only = write_only
31+
self.host_id = uuid.uuid4().hex
32+
33+
def initialize(self):
34+
super().initialize()
35+
if not self.write_only:
36+
self.thread = self.server.start_background_task(self._thread)
37+
self.server.logger.info(self.name + ' backend initialized.')
38+
39+
async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
40+
callback=None, **kwargs):
41+
"""Emit a message to a single client, a room, or all the clients
42+
connected to the namespace.
43+
44+
This method takes care or propagating the message to all the servers
45+
that are connected through the message queue.
46+
47+
The parameters are the same as in :meth:`.Server.emit`.
48+
49+
Note: this method is a coroutine.
50+
"""
51+
if kwargs.get('ignore_queue'):
52+
return await super().emit(
53+
event, data, namespace=namespace, room=room, skip_sid=skip_sid,
54+
callback=callback)
55+
namespace = namespace or '/'
56+
if callback is not None:
57+
if self.server is None:
58+
raise RuntimeError('Callbacks can only be issued from the '
59+
'context of a server.')
60+
if room is None:
61+
raise ValueError('Cannot use callback without a room set.')
62+
id = self._generate_ack_id(room, namespace, callback)
63+
callback = (room, namespace, id)
64+
else:
65+
callback = None
66+
await self._publish({'method': 'emit', 'event': event, 'data': data,
67+
'namespace': namespace, 'room': room,
68+
'skip_sid': skip_sid, 'callback': callback})
69+
70+
async def close_room(self, room, namespace=None):
71+
await self._publish({'method': 'close_room', 'room': room,
72+
'namespace': namespace or '/'})
73+
74+
async def _publish(self, data):
75+
"""Publish a message on the Socket.IO channel.
76+
77+
This method needs to be implemented by the different subclasses that
78+
support pub/sub backends.
79+
"""
80+
raise NotImplementedError('This method must be implemented in a '
81+
'subclass.') # pragma: no cover
82+
83+
async def _listen(self):
84+
"""Return the next message published on the Socket.IO channel,
85+
blocking until a message is available.
86+
87+
This method needs to be implemented by the different subclasses that
88+
support pub/sub backends.
89+
"""
90+
raise NotImplementedError('This method must be implemented in a '
91+
'subclass.') # pragma: no cover
92+
93+
async def _handle_emit(self, message):
94+
# Events with callbacks are very tricky to handle across hosts
95+
# Here in the receiving end we set up a local callback that preserves
96+
# the callback host and id from the sender
97+
remote_callback = message.get('callback')
98+
if remote_callback is not None and len(remote_callback) == 3:
99+
callback = partial(self._return_callback, self.host_id,
100+
*remote_callback)
101+
else:
102+
callback = None
103+
await super().emit(message['event'], message['data'],
104+
namespace=message.get('namespace'),
105+
room=message.get('room'),
106+
skip_sid=message.get('skip_sid'),
107+
callback=callback)
108+
109+
async def _handle_callback(self, message):
110+
if self.host_id == message.get('host_id'):
111+
try:
112+
sid = message['sid']
113+
namespace = message['namespace']
114+
id = message['id']
115+
args = message['args']
116+
except KeyError:
117+
return
118+
await self.trigger_callback(sid, namespace, id, args)
119+
120+
async def _return_callback(self, host_id, sid, namespace, callback_id,
121+
*args):
122+
# When an event callback is received, the callback is returned back
123+
# the sender, which is identified by the host_id
124+
await self._publish({'method': 'callback', 'host_id': host_id,
125+
'sid': sid, 'namespace': namespace,
126+
'id': callback_id, 'args': args})
127+
128+
async def _handle_close_room(self, message):
129+
await super().close_room(
130+
room=message.get('room'), namespace=message.get('namespace'))
131+
132+
async def _thread(self):
133+
while True:
134+
try:
135+
message = await self._listen()
136+
except:
137+
import traceback
138+
traceback.print_exc()
139+
break
140+
data = None
141+
if isinstance(message, dict):
142+
data = message
143+
else:
144+
if isinstance(message, six.binary_type): # pragma: no cover
145+
try:
146+
data = pickle.loads(message)
147+
except:
148+
pass
149+
if data is None:
150+
try:
151+
data = json.loads(message)
152+
except:
153+
pass
154+
if data and 'method' in data:
155+
if data['method'] == 'emit':
156+
await self._handle_emit(data)
157+
elif data['method'] == 'callback':
158+
await self._handle_callback(data)
159+
elif data['method'] == 'close_room':
160+
await self._handle_close_room(data)

0 commit comments

Comments
 (0)