Skip to content

Commit f621917

Browse files
committed
WIP making tests pass with the new async servers
1 parent c6b0cf4 commit f621917

File tree

6 files changed

+225
-144
lines changed

6 files changed

+225
-144
lines changed

examples/asyncio_example.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
async def osc_app(address, port):
77
osc = OSCAsyncioServer(encoding='utf8')
88
osc.listen(address=address, port=port, default=True)
9+
sock2 = osc.listen(address=address, port=port + 1)
910

1011
@osc.address("/example")
1112
async def example(*values):
@@ -19,11 +20,12 @@ async def test(*values):
1920
await asyncio.sleep(4)
2021
print("done sleeping")
2122

22-
@osc.address("/stop")
23+
@osc.address("/stop", sock=sock2)
2324
async def stop(*values):
2425
print(f"time to leave!")
2526
osc.terminate_server()
2627

28+
print(sock2.getsockname())
2729
asyncio.get_event_loop().create_task(osc.process())
2830
await osc.join_server()
2931

oscpy/server/__init__.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ def _execute_callbacks(self, callbacks_list, address, values):
522522
cb(*values)
523523
except Exception:
524524
if self.intercept_errors:
525-
logger.error("Unhandled exception caught in oscpy server", exc_info=True)
525+
logger.exception("Unhandled exception caught in oscpy server")
526526
else:
527527
raise
528528

@@ -577,7 +577,6 @@ def join_server(self, timeout=None):
577577
Returns True if and only if the inner thread exited before timeout."""
578578
return self._termination_event.wait(timeout=timeout)
579579

580-
581580
# backward compatibility
582581

583582
from oscpy.server.thread_server import OSCThreadServer

oscpy/server/asyncio_server.py

+35-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import asyncio
22
import socket
33
from functools import partial
4+
from logging import getLogger
45

56
from oscpy.server import OSCBaseServer
67

78

9+
logger = getLogger(__name__)
10+
11+
812
class OSCAsyncioServer(OSCBaseServer):
913
def __init__(self, *args, **kwargs):
1014
super().__init__(*args, **kwargs)
@@ -13,17 +17,20 @@ def __init__(self, *args, **kwargs):
1317

1418
def listen(self, address='localhost', port=0, default=False, family='inet', **kwargs):
1519
loop = asyncio.get_event_loop()
16-
addr = (address, port)
20+
if family == 'unix':
21+
addr = address
22+
else:
23+
addr = (address, port)
1724
sock = self.get_socket(
1825
family=socket.AF_UNIX if family == 'unix' else socket.AF_INET,
1926
addr=addr,
2027
)
21-
self.listeners[addr] = awaitable = loop.create_datagram_endpoint(
28+
self.listeners[(address, port or sock.getsockname()[1])] = loop.create_datagram_endpoint(
2229
partial(OSCProtocol, self.handle_message, sock),
2330
sock=sock,
2431
)
2532
self.add_socket(sock, default)
26-
return awaitable
33+
return sock
2734

2835
async def process(self):
2936
return await asyncio.gather(
@@ -36,7 +43,6 @@ async def handle_message(self, data, sender, sender_socket):
3643
await self._execute_callbacks(callbacks, address, values)
3744

3845
async def _execute_callbacks(self, callbacks_list, address, values):
39-
print(locals())
4046
for cb, get_address in callbacks_list:
4147
try:
4248
if get_address:
@@ -45,10 +51,29 @@ async def _execute_callbacks(self, callbacks_list, address, values):
4551
await cb(*values)
4652
except Exception:
4753
if self.intercept_errors:
48-
logger.error("Unhandled exception caught in oscpy server", exc_info=True)
54+
logger.exception("Unhandled exception caught in oscpy server")
4955
else:
5056
raise
5157

58+
def stop(self, sock=None):
59+
"""Close and remove a socket from the server's sockets.
60+
61+
If `sock` is None, uses the default socket for the server.
62+
63+
"""
64+
if not sock and self.default_socket:
65+
sock = self.default_socket
66+
67+
if sock in self.sockets:
68+
sock.close()
69+
self.sockets.remove(sock)
70+
else:
71+
raise RuntimeError('{} is not one of my sockets!'.format(sock))
72+
73+
def stop_all(self):
74+
for sock in self.sockets[:]:
75+
self.stop(sock)
76+
5277
async def join_server(self, timeout=None):
5378
"""Wait for the server to exit (`terminate_server()` must have been called before).
5479
@@ -60,13 +85,16 @@ class OSCProtocol(asyncio.DatagramProtocol):
6085
def __init__(self, message_handler, sock, **kwargs):
6186
super().__init__(**kwargs)
6287
self.message_handler = message_handler
63-
self._socket = sock
88+
self.socket = sock
6489
self.loop = asyncio.get_event_loop()
6590

6691
def connection_made(self, transport):
6792
self.transport = transport
6893

6994
def datagram_received(self, data, addr):
7095
self.loop.call_soon(
71-
lambda: asyncio.ensure_future(self.message_handler(data, addr, self._socket))
96+
lambda: asyncio.ensure_future(self.message_handler(data, addr, self.socket))
7297
)
98+
99+
def getsockname(self):
100+
return self.socket.getsockname()

oscpy/server/curio_server.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ async def process(self):
6161
async def stop_all(self):
6262
await self.tasks_group.cancel_remaining()
6363

64-
async def stop(self, sock):
65-
g = self.task_groups.pop(sock)
66-
await g.cancel_remaining()
64+
async def stop(self, sock=None):
65+
if not sock and self.default_socket:
66+
sock = self.default_socket
67+
68+
if sock in self.sockets:
69+
g = self.task_groups.pop(sock)
70+
await g.cancel_remaining()
71+
else:
72+
raise RuntimeError('{} is not one of my sockets!'.format(sock))

oscpy/server/thread_server.py

+5
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,8 @@ def _listen(self):
8383
continue
8484

8585
self.handle_message(data, sender, sender_socket)
86+
87+
def join_server(self, timeout=None):
88+
result = super(OSCThreadServer, self).join_server(timeout=timeout)
89+
self._thread.join(timeout=timeout)
90+
return result

0 commit comments

Comments
 (0)