Skip to content

Commit b64c920

Browse files
author
Jeroen van der Heijden
committed
Added Emitter class, issue #8
1 parent 755c56c commit b64c920

File tree

5 files changed

+98
-2
lines changed

5 files changed

+98
-2
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ dist/
1212

1313
# Temporary test files
1414
api_test.py
15-
sample_test.py
15+
sample*.py
16+
1617
socket_test.py

thingsdb/client/client.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,16 @@ def add_event_handler(self, event_handler: Events) -> None:
8080
"""
8181
self._event_handlers.append(event_handler)
8282

83+
def remove_event_handler(self, event_handler: Events) -> None:
84+
"""Remove an event handler.
85+
86+
Args:
87+
event_handler (Events):
88+
An instance of Events (see thingsdb.client.abc.events).
89+
"""
90+
self._event_handlers.remove(event_handler)
91+
92+
8393
def get_event_loop(self) -> asyncio.AbstractEventLoop:
8494
"""Can be used to get the event loop.
8595

thingsdb/model/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
from .enummember import EnumMember
33
from .collection import Collection
44
from .thing import Thing, ThingStrict, ThingHash
5+
from .emitter import Emitter

thingsdb/model/emitter.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import asyncio
2+
import logging
3+
from ..client import Client
4+
from ..client.abc.events import Events
5+
6+
7+
class Emitter(Events):
8+
9+
_ev_handlers = dict()
10+
11+
def __init__(self, client: Client, emitter: str, scope=None):
12+
super().__init__()
13+
self._event_id = 0
14+
self._client = client
15+
self._thing_id = None
16+
self._scope = scope or client.get_scope()
17+
self._code = \
18+
f'{{{emitter}}}.watch(); {{{emitter}}}.id();'
19+
client.add_event_handler(self)
20+
asyncio.ensure_future(self._watch())
21+
22+
def __init_subclass__(cls):
23+
cls._ev_handlers = {}
24+
25+
for key, val in cls.__dict__.items():
26+
if not key.startswith('__') and \
27+
callable(val) and hasattr(val, '_ev'):
28+
cls._ev_handlers[val._ev] = val
29+
30+
async def _watch(self):
31+
self._thing_id = await self._client.query(
32+
self._code,
33+
scope=self._scope)
34+
35+
def on_reconnect(self):
36+
asyncio.ensure_future(self._watch())
37+
38+
def on_node_status(self, _status):
39+
pass
40+
41+
def on_warning(self, warn):
42+
logging.warning(f'{warn["warn_msg"]} ({warn["warn_code"]})')
43+
44+
def on_watch_init(self, data):
45+
pass
46+
47+
def on_event(self, ev, *args):
48+
cls = self.__class__
49+
fun = cls._ev_handlers.get(ev)
50+
if fun is None:
51+
logging.debug(f'no event handler for {ev} on {cls.__name__}')
52+
return
53+
fun(self, *args)
54+
55+
def on_watch_update(self, data):
56+
thing_id = data['#']
57+
if thing_id != self._thing_id:
58+
return
59+
60+
event_id, jobs = data['event'], data.pop('jobs')
61+
62+
if self._event_id > event_id:
63+
logging.warning(
64+
f'ignore event because the current event `{self._event_id}` '
65+
f'is greather than the received event `{event_id}`')
66+
return
67+
self._event_id = event_id
68+
69+
for job_dict in jobs:
70+
for name, job in job_dict.items():
71+
if name == 'event':
72+
self.on_event(*job)
73+
74+
def on_watch_delete(self, data):
75+
thing_id = data['#']
76+
if thing_id == self._thing_id:
77+
logging.debug(f'emitter with id {thing_id} is removed')
78+
self._client.remove_event_handler(self)
79+
80+
def on_watch_stop(self, data):
81+
thing_id = data['#']
82+
if thing_id == self._thing_id:
83+
logging.debug(f'emitter with id {thing_id} is stopped')
84+
self._client.remove_event_handler(self)

thingsdb/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.6.6'
1+
__version__ = '0.6.7'

0 commit comments

Comments
 (0)