Skip to content

Commit 93249ab

Browse files
committedApr 12, 2018
'async for' support in streams in python 3.5+
prepare cancleable calls python 2/3 str/unicode fixes documentation of protocol messages
1 parent 5d60279 commit 93249ab

10 files changed

+596
-230
lines changed
 

‎.gitignore

+2-23
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# CeRPCerus
22
testing/
33
js/
4+
*.bak
5+
*.log
46

57
# Twisted
68
_trial_temp/
@@ -26,18 +28,6 @@ parts/
2628
sdist/
2729
var/
2830
*.egg-info/
29-
.installed.cfg
30-
*.egg
31-
32-
# PyInstaller
33-
# Usually these files are written by a python script from a template
34-
# before PyInstaller builds the exe, so as to inject date/other infos into it.
35-
*.manifest
36-
*.spec
37-
38-
# Installer logs
39-
pip-log.txt
40-
pip-delete-this-directory.txt
4131

4232
# Unit test / coverage reports
4333
htmlcov/
@@ -47,17 +37,6 @@ htmlcov/
4737
nosetests.xml
4838
coverage.xml
4939

50-
# Translations
51-
*.mo
52-
*.pot
53-
54-
# Django stuff:
55-
*.log
56-
5740
# Sphinx documentation
5841
docs/
5942
docs/_build/
60-
61-
# PyBuilder
62-
target/
63-
install.bat

‎cerpcerus/iter_deferred.py

+69-53
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,48 @@
22

33
import queue, logging
44

5-
from twisted.internet import defer, reactor, protocol
5+
from twisted.internet import defer
6+
7+
from future.utils import PY2
8+
9+
if PY2:
10+
StopAsyncIteration = Exception
611

712
logger = logging.getLogger(__name__)
813

914
"""
1015
call self.transport.pauseProducing() and self.transport.resumeProducing() if stream is sending too fast (queue gets too large
1116
"""
1217

13-
class MultiDeferredIterator(object):
18+
class MultiDeferredIterator(object): # inherit from RemoteObjectGeneric?
1419

1520
"""should this be made pause/resume-able?
1621
should be stoppable for sure
1722
maybe the stream should include information if it can be paused
1823
1924
if this stops the producer/transport, it will be stopped for all streams.
2025
It would be better to have on collector with one queue for all streams, so it can be stopped with regard to all streams.
21-
if every single iterator can pause/resume, there will be nterference among them
26+
if every single iterator can pause/resume, there will be interference among them
2227
"""
2328

24-
def __init__(self):
29+
def __init__(self, conn, sequid, classname):
30+
self._conn = conn
31+
self._sequid = sequid
32+
self._classname = classname
33+
2534
self.queue = queue.Queue()
2635
self.deferred = None
2736
self.transport = None # interfaces.IPushProducer
2837

2938
# called by client
3039

40+
def abort(self): # this should be called from the generators .close() method as well to stop the server from sending
41+
self._conn._cancel_stream(self._sequid)
42+
3143
def __iter__(self):
3244
return self
3345

34-
def next(self): # __next__ python3, this can hang if user is not careful
46+
def __next__(self): # this can hang if user is not careful
3547
#self.transport.resumeProducing() # if queue too empty
3648
try:
3749
deferred = self.queue.get_nowait()
@@ -40,6 +52,24 @@ def next(self): # __next__ python3, this can hang if user is not careful
4052
self.deferred = defer.Deferred()
4153
return self.deferred
4254

55+
next = __next__
56+
57+
def __aiter__(self):
58+
return self
59+
60+
@defer.inlineCallbacks
61+
def __anext__(self):
62+
try:
63+
try:
64+
result = yield self.queue.get_nowait() # or yield from?
65+
defer.returnValue(result) # return result
66+
except queue.Empty:
67+
self.deferred = defer.Deferred()
68+
result = yield self.deferred # or yield from?
69+
defer.returnValue(result) # return result
70+
except StopIteration: # translate, as interface which is used by the user (iter/aiter) is unknown at time of raise
71+
raise StopAsyncIteration()
72+
4373
def stop(self):
4474
pass
4575

@@ -71,68 +101,49 @@ def errback(self, val):
71101
def completed(self):
72102
return self.errback(StopIteration())
73103

74-
""" py 3 only
75-
class AsyncMultiDeferredIterator:
104+
def async_completed(self):
105+
return self.errback(StopAsyncIteration())
76106

77-
def __init__(self):
78-
self.queue = queue.Queue()
79-
self.deferred = None
80-
81-
def __aiter__(self):
82-
return self
107+
# tests
83108

84-
async def __anext__(self):
85-
try:
86-
result = await self.queue.get_nowait()
87-
if result == "stop":
88-
raise StopAsyncIteration
89-
return result
90-
except queue.Empty:
91-
self.deferred = defer.Deferred()
92-
result = await self.deferred
93-
if result == "stop":
94-
raise StopAsyncIteration
95-
return result
96-
97-
def callback(self, val): # called multiple times
98-
if self.deferred and not self.deferred.called:
99-
print("call directly")
100-
self.deferred.callback(val)
101-
else:
102-
print("add to queue")
103-
self.queue.put_nowait(defer.succeed(val))
104-
105-
def errback(self, val):
106-
if self.deferred and not self.deferred.called:
107-
print("call directly")
108-
self.deferred.errback(val)
109-
else:
110-
print("add to queue")
111-
self.queue.put_nowait(defer.fail(val))
109+
from twisted.internet import reactor, protocol
112110

111+
# python >= 3.5
112+
""" syntax error for older python
113113
async def async_recv_stream(async_iter):
114+
from twisted.internet import error
115+
114116
print("start")
115-
async for deferred in async_iter:
116-
print(deferred)
117+
try:
118+
async for result in async_iter:
119+
print(result)
120+
await sleep(1)
121+
print("slept for a second")
122+
123+
except error.ConnectionDone:
124+
print("ConnectionDone")
125+
except error.ConnectionLost:
126+
print("ConnectionLost")
117127
print("end")
118-
return True
119128
"""
120129

121-
from .utils import sleep
122-
130+
# python <= 3.4
123131
@defer.inlineCallbacks
124132
def recv_stream(async_iter):
133+
print("start")
125134
for deferred in async_iter:
126135
try:
127136
result = yield deferred
128137
print(result)
129138
except StopIteration:
130-
print("stop")
139+
print("end")
131140
break
132141
except GeneratorExit:
133142
logger.exception("GeneratorExit in recv")
143+
break
134144
except Exception:
135145
logger.exception("Exception in recv")
146+
break
136147
yield sleep(1)
137148
print("slept for a second")
138149
reactor.stop()
@@ -144,10 +155,14 @@ def __init__(self, mdit):
144155

145156
def dataReceived(self, data):
146157
if data == b"\x1b": # ESCAPE in TELNET
147-
self.mdit.stop()
158+
#self.mdit.stop()
159+
self.mdit.completed()
148160
else:
149161
self.mdit.callback(data)
150162

163+
def connectionLost(self, reason):
164+
self.mdit.errback(reason)
165+
151166
class RecvFactory(protocol.Factory):
152167

153168
def __init__(self, mdit):
@@ -156,18 +171,19 @@ def __init__(self, mdit):
156171
def buildProtocol(self, addr):
157172
return Recv(self.mdit)
158173

159-
def main1():
174+
def main1(): # connect with telnet
160175
mdit = MultiDeferredIterator()
161176
reactor.listenTCP(8000, RecvFactory(mdit))
162177
reactor.callWhenRunning(recv_stream, mdit)
163178
reactor.run()
164179

165-
def main2():
166-
import asyncio
167-
mdit = AsyncMultiDeferredIterator()
180+
def main2(): # connect with telnet
181+
from twisted.internet import task
182+
mdit = MultiDeferredIterator()
168183
reactor.listenTCP(8000, RecvFactory(mdit))
169-
reactor.callWhenRunning(defer.Deferred.fromFuture(asyncio.ensure_future(async_recv_stream)), mdit)
184+
task.react(lambda reactor: defer.ensureDeferred(async_recv_stream(mdit)))
170185
reactor.run()
171186

172187
if __name__ == "__main__":
188+
from utils import sleep
173189
main1()

‎cerpcerus/rpc.py

+59-29
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from __future__ import absolute_import, division, print_function, unicode_literals
2+
from builtins import str
23

34
from future.utils import iteritems
45

@@ -45,6 +46,7 @@ class CallAnyPublic(object):
4546
"""Baseclass which delegates method calls based on leading "_"."""
4647

4748
def __getattr__(self, name): # only gets invoked for attributes that are not already defined. __getattribute__ would be called always
49+
# name is byte string in python 2 and unicode string in python 3
4850
if not name.startswith("_"):
4951
return self._callpublic(name)
5052
else:
@@ -84,11 +86,10 @@ def _callpublic(self, name):
8486
return partial(self._call, self._alias.get(name, name))
8587

8688
def _callprivate(self, name):
87-
8889
"""called from user, so return user friendly exception """
89-
90+
9091
# vs: __class__.__name__?
91-
raise AttributeError("{} instance has no attribute '{}'".format(type(self).__name__, name))
92+
raise AttributeError("{} instance has no attribute '{}'".format(type(self).__name__, name)) # str(name) needed for python2?
9293

9394
@property
9495
def _connid(self):
@@ -134,27 +135,37 @@ def __init__(self, conn):
134135
RemoteObjectGeneric.__init__(self, conn)
135136

136137
def _call(self, _name, *args, **kwargs):
137-
# type: (unicode, *Any, **Any) -> RemoteResultDeferred
138-
_name = unicode(_name)
138+
# type: (str, *Any, **Any) -> RemoteResultDeferred
139+
_name = str(_name) # this is needed for python2 is it?
139140
return self._conn._call(_name, *args, **kwargs)
140141

141142
def _call_with_streams(self, _name, *args, **kwargs):
142-
# type: (unicode, *Any, **Any) -> RemoteResultDeferred
143-
_name = unicode(_name)
143+
# type: (str, *Any, **Any) -> RemoteResultDeferred
144+
_name = str(_name)
144145
return self._conn._call_with_streams(_name, *args, **kwargs)
145146

146147
def _notify(self, _name, *args, **kwargs):
147-
# type: (unicode, *Any, **Any) -> None
148-
_name = unicode(_name)
148+
# type: (str, *Any, **Any) -> None
149+
_name = str(_name)
149150
self._conn._notify(_name, *args, **kwargs)
150151

152+
def _notify_with_streams(self, _name, *args, **kwargs):
153+
# type: (str, *Any, **Any) -> None
154+
_name = str(_name)
155+
self._conn._notify_with_streams(_name, *args, **kwargs)
156+
151157
def _stream(self, _name, *args, **kwargs): # could return a async iterator in the far future
152-
# type: (unicode, *Any, **Any) -> MultiDeferredIterator
153-
_name = unicode(_name)
158+
# type: (str, *Any, **Any) -> MultiDeferredIterator
159+
_name = str(_name)
154160
return self._conn._stream(_name, *args, **kwargs)
155161

162+
def _stream_with_streams(self, _name, *args, **kwargs): # could return a async iterator in the far future
163+
# type: (str, *Any, **Any) -> MultiDeferredIterator
164+
_name = str(_name)
165+
return self._conn._stream_with_streams(_name, *args, **kwargs)
166+
156167
def __repr__(self):
157-
# type: () -> unicode
168+
# type: () -> str
158169
return "'<RemoteObject object to {} at {}>'".format(self._conn.name, self._conn.addr)
159170

160171
def __dir__(self):
@@ -169,29 +180,42 @@ class RemoteInstance(RemoteObjectGeneric):
169180

170181
def __init__(self, conn, objectid, classname):
171182
RemoteObjectGeneric.__init__(self, conn)
172-
assert isinstance(classname, unicode)
183+
assert isinstance(classname, str)
173184
self._objectid = objectid
174185
self._classname = classname
175186

176187
def _call(self, _name, *args, **kwargs):
177-
# type: (unicode, *Any, **Any) -> RemoteResultDeferred
188+
# type: (str, *Any, **Any) -> RemoteResultDeferred
189+
_name = str(_name) # this is needed for python2 is it?
178190
return self._conn._callmethod(self._objectid, _name, *args, **kwargs)
179191

180192
def _call_with_streams(self, _name, *args, **kwargs):
181-
# type: (unicode, *Any, **Any) -> RemoteResultDeferred
182-
_name = unicode(_name)
193+
# type: (str, *Any, **Any) -> RemoteResultDeferred
194+
_name = str(_name)
183195
return self._conn._callmethod_with_streams(_name, *args, **kwargs)
184196

185197
def _notify(self, _name, *args, **kwargs):
186-
# type: (unicode, *Any, **Any) -> None
198+
# type: (str, *Any, **Any) -> None
199+
_name = str(_name)
187200
self._conn._notifymethod(self._objectid, _name, *args, **kwargs)
188201

202+
def _notify_with_streams(self, _name, *args, **kwargs):
203+
# type: (str, *Any, **Any) -> None
204+
_name = str(_name)
205+
self._conn._notifymethod_with_streams(self._objectid, _name, *args, **kwargs)
206+
189207
def _stream(self, _name, *args, **kwargs): # could return a async iterator in the far future
190-
# type: (unicode, *Any, **Any) -> MultiDeferredIterator
208+
# type: (str, *Any, **Any) -> MultiDeferredIterator
209+
_name = str(_name)
191210
return self._conn._streammethod(self._objectid, _name, *args, **kwargs)
192211

212+
def _stream_with_streams(self, _name, *args, **kwargs): # could return a async iterator in the far future
213+
# type: (str, *Any, **Any) -> MultiDeferredIterator
214+
_name = str(_name)
215+
return self._conn._streammethod_with_streams(self._objectid, _name, *args, **kwargs)
216+
193217
def __repr__(self):
194-
# type: () -> unicode
218+
# type: () -> str
195219
return "'<RemoteInstance object {} [{}] to {} at {}>'".format(self._classname, self._objectid, self._conn.name, self._conn.addr)
196220

197221
def __dir__(self):
@@ -219,17 +243,24 @@ class RemoteResult(RemoteObjectGeneric):
219243

220244
def __init__(self, conn, sequid, classname):
221245
RemoteObjectGeneric.__init__(self, conn)
246+
assert isinstance(classname, str)
222247
self._sequid = sequid
223248
self._classname = classname
224249

225250
def _call(self, _name, *args, **kwargs):
226-
return self._conn._callmethod_onresult(self._sequid, _name, *args, **kwargs) # type: CallOnDeferred
251+
# type: (str, *Any, **Any) -> RemoteResultDeferred
252+
_name = str(_name) # this is needed for python2 is it?
253+
return self._conn._callmethod_by_result(self._sequid, _name, *args, **kwargs)
227254

228255
def _notify(self, _name, *args, **kwargs):
229-
self._conn._notifymethod_onresult(self._sequid, _name, *args, **kwargs) # type: None
256+
# type: (str, *Any, **Any) -> None
257+
_name = str(_name)
258+
self._conn._notifymethod_by_result(self._sequid, _name, *args, **kwargs)
230259

231260
def _stream(self, _name, *args, **kwargs): # could return a async iterator in the far future
232-
return self._conn._streammethod_onresult(self._sequid, _name, *args, **kwargs) # type: MultiDeferredIterator
261+
# type: (str, *Any, **Any) -> MultiDeferredIterator
262+
_name = str(_name)
263+
return self._conn._streammethod_by_result(self._sequid, _name, *args, **kwargs)
233264

234265
def __repr__(self):
235266
return "'<RemoteResult object {} [{}] to {} at {}>'".format(self._classname, self._sequid, self._conn.name, self._conn.addr)
@@ -241,8 +272,7 @@ def __dir__(self):
241272
def __del__(self): # called when garbage collected
242273
# RemoteObjectGeneric.__del__() # should be called, but doesn't exist
243274
try:
244-
#self._conn._delinstanceonresult(self._sequid) # this tries to delete results which are not actually objects
245-
pass
275+
self._conn._delinstance_by_result(self._sequid)
246276
except NotAuthenticated:
247277
pass
248278

@@ -293,7 +323,7 @@ def call_and_catch_signature_error(_attr, *args, **kwargs):
293323
return _attr(*args, **kwargs)
294324
except TypeError as e:
295325
try:
296-
signature(_attr).bind(*args, **kwargs) #do sig test only in error case to preserve resources on successful calls
326+
signature(_attr).bind(*args, **kwargs) # do sig test only in error case to preserve resources on successful calls
297327
raise
298328
except TypeError:
299329
raise RPCInvalidArguments(e)
@@ -331,11 +361,11 @@ def __delitem__(self, key):
331361

332362
@classmethod
333363
def _aliases(cls, aliases):
334-
364+
335365
""" this is intended to be a decorator used for functions in services.
336366
yet the decorator is applied at class construction time and has thus no
337367
access to self. but class wide aliases would be bad."""
338-
368+
339369
def decorator(func):
340370
for alias in aliases:
341371
cls._alias[alias] = func.__name__
@@ -419,7 +449,7 @@ def _delete(self, connid, objectid):
419449
except KeyError:
420450
raise RPCInvalidObject("No object with id {}".format(objectid))
421451

422-
def _deleteAllObjects(self, connid):
452+
def _delete_all_objects(self, connid):
423453
"""Deletes all objects created by this connection."""
424454
try:
425455
self._objects = {oid: (obj, connid_) for oid, (obj, connid_) in iteritems(self._objects) if connid_ != connid}
@@ -451,7 +481,7 @@ def __init__(self, *args, **kwargs):
451481
# generator
452482
def random(self, size, num):
453483
return random(size, num)
454-
484+
455485
def echo(self, data):
456486
return data
457487

‎cerpcerus/rpcbase.py

+333-124
Large diffs are not rendered by default.

‎cerpcerus/test/test_utils.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import absolute_import, division, print_function, unicode_literals
22

3+
from builtins import str
4+
35
import inspect, itertools
46
import unittest
57

‎cerpcerus/utils.py

+11
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,17 @@ def IProtocolConnector(transport, protocol):
210210
"""
211211
transport: not a twisted transport, but a protocol
212212
protocol: not a twisted protocol but any class (e.g. RPCBase)
213+
214+
this is needed because RPCBase cannot inherit from user defined class. But if different protocols
215+
are to be supported, there needs to be a way to connect the twisted protcol callbacks.
216+
217+
btw:
218+
def asd(protcol_class):
219+
class rpc_class(protcol_class):
220+
pass
221+
return rpc_class
222+
a = asd()
223+
that works...
213224
"""
214225
transport.recv_data = protocol.recv_data
215226
transport.connection_made = protocol.connection_made

‎examples/filesend_server.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,24 @@
77
import cerpcerus
88
from cerpcerus.rpcbase import RPCUserError
99

10+
from utils import randblob
11+
1012
def scandir_rec(path):
1113
for direntry in os.scandir(path):
1214
if direntry.is_dir():
1315
yield from scandir_rec(direntry)
1416
elif direntry.is_file():
1517
yield direntry
1618

19+
randomdata = randblob(1024*1024) # 16MB
20+
1721
class TestService(cerpcerus.rpc.DebugService):
1822
def __init__(self, reactor, conn):
1923
cerpcerus.rpc.DebugService.__init__(self, True)
2024
self.shared = Path("D:/PUBLIC").resolve(strict=True) # needs to be absolute, existing path
2125

2226
def list(self):
23-
return tuple((Path(direntry).relative_to(self.shared).path, direntry.stat().st_size) for direntry in scandir_rec(self.shared))
27+
return tuple((str(Path(direntry).relative_to(self.shared)), direntry.stat().st_size) for direntry in scandir_rec(self.shared))
2428

2529
def file(self, relpath, seek=0):
2630
try:
@@ -44,6 +48,10 @@ def file(self, relpath, seek=0):
4448
except OSError:
4549
raise RPCUserError("Invalid file")
4650

51+
def infinite(self):
52+
while True:
53+
yield randomdata
54+
4755
class MySSLContextFactory(cerpcerus.GenericRPCSSLContextFactory):
4856

4957
def __init__(self):

‎message-types.txt

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
-- RPC message type --
2+
3+
total messages: 3*2 + 3*2 + 2 + 3 + 6 = 23
4+
5+
guidelines:
6+
don't create new client-to-server messages based on return value
7+
sometimes different api client functions might be needed, but the same messages should be used
8+
9+
distinguish based on sent information, so that messages can be handled by server quickly and easily
10+
_with_streams would not be strictly necessary, but it increases performance if the server does not have to find out
11+
if any of the arguments are streams and need to be handles differently
12+
13+
tldr:
14+
make client-to-server and server-to-client messages orthogonal
15+
16+
# Client to server, no response, errors are ignored silently
17+
18+
NOTIFY:
19+
function call
20+
21+
NOTIFYMETHOD
22+
method call on OBJECT
23+
24+
NOTIFYMETHOD_BY_RESULT
25+
method call on OBJECT, referred to by RESULT
26+
27+
*_WITH_STREAMS # not implemented for all methods yet
28+
arguments can contain `Streamables`
29+
separate messages for performance reasons
30+
could be implemented as another parameter in request. eg.
31+
client could send 0, 1, 2:
32+
0: auto, server has to figure out if streams are included
33+
1: client says there are no streams
34+
2: client says there are streams
35+
actually 0 is not neccessary as the work done would be the same as 2. "no streams" is just a performance optimisation
36+
37+
# Client to server, return values are
38+
- RESULT, OBJECT, ERROR
39+
- STREAM_RESULT, STREAM_END, STREAM_ERROR
40+
41+
CALL
42+
function call
43+
44+
CALLMETHOD
45+
method call on OBJECT
46+
47+
CALLMETHOD_BY_RESULT
48+
method call on OBJECT, referred to by RESULT
49+
50+
*_WITH_STREAMS # not implemented for all methods yet
51+
arguments can contain `Streamables`
52+
separate messages for performance reasons
53+
54+
# Client to server, no response, errors are ignored silently
55+
56+
DELINSTANCE # should be DELOBJECT for symmetry?
57+
deletes remote OBJECT
58+
59+
DELINSTANCE_BY_RESULT
60+
deletes remote OBJECT, referred to by RESULT
61+
62+
# Client to server, streams, only indirectly user initiated
63+
64+
STREAM_ARGUMENT
65+
used with the *_WITH_STREAMS messages to deliver data
66+
67+
ARGUMENT_ERROR
68+
used with the *_WITH_STREAMS messages to signal errors
69+
70+
ARGUMENT_END
71+
used with the *_WITH_STREAMS messages to mark the end of a stream
72+
73+
# Server to client
74+
75+
RESULT
76+
normal value, i.e. no `ObjectId` or `GeneratorType` which are handled separately
77+
can send *METHOD_BY_RESULT messages using this return type
78+
79+
OBJECT
80+
object instance
81+
can send *METHOD messages using this return type
82+
83+
ERROR
84+
signals an error, possible errors are
85+
- GeneralError
86+
- NoSuchFunction
87+
- WrongArguments
88+
- UserError
89+
- NoService
90+
- Deferred
91+
- InvalidObject
92+
- ResourceExhausted
93+
94+
# Server to client, streams, only indirectly user initiated
95+
96+
STREAM_RESULT
97+
stream value, basically an iterable result
98+
99+
STREAM_ERROR
100+
signals error during stream
101+
102+
STREAM_END
103+
marks the end of a stream
104+
105+
# Notes
106+
"STREAM" messages are not needed, but the client needs to call special `_stream*` functions on the library so the returned STREAM_* messages can be handles correctly

‎requirements.txt

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
future
2+
twisted
3+
msgpack-python
4+
pyOpenSSL

‎setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
packages=["cerpcerus"],
1717
test_suite="test",
1818
install_requires=[
19+
"future",
1920
"twisted",
2021
"msgpack-python",
2122
"pyOpenSSL",

0 commit comments

Comments
 (0)
Please sign in to comment.