Skip to content

Commit b38c3b0

Browse files
committed
lock
1 parent 1a7f76c commit b38c3b0

File tree

3 files changed

+122
-1
lines changed

3 files changed

+122
-1
lines changed

thingsdb/misc/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .lock import lock as ti_lock

thingsdb/misc/lock.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import asyncio
2+
from contextlib import asynccontextmanager
3+
from ..client import Client
4+
from ..room import Room, event
5+
6+
7+
async def setup(client: Client, collection: str = 'lock'):
8+
has_collection = await client.query("""//ti
9+
has_collection(name);
10+
""", name=collection, scope='/t')
11+
12+
if has_collection:
13+
return
14+
15+
await client.query("""//ti
16+
new_collection(name);
17+
""", name=collection, scope='/t')
18+
19+
await client.query("""//ti
20+
21+
set_type('Inner', {
22+
room: 'room',
23+
task: 'task',
24+
timeout: 'int',
25+
set_task: |this, lock_id| {
26+
this.task = task(
27+
datetime().move('seconds', this.timeout),
28+
|_, lock_id, room_id| wse(Lock(lock_id).release(room_id)),
29+
[lock_id, this.room.id()],
30+
);
31+
nil;
32+
},
33+
});
34+
35+
set_type('Lock', {
36+
queue: '[Inner]',
37+
go: |this| {
38+
if (!this.queue) return nil;
39+
inner = this.queue.first();
40+
inner.set_task(this.id());
41+
inner.room.set_name('go');
42+
inner.room.emit('go');
43+
},
44+
acquire: |this, timeout| {
45+
immediately = this.queue.len() == 0;
46+
inner = Inner{timeout:,};
47+
this.queue.push(inner);
48+
immediately ? inner.set_task(this.id()) : inner.room.id();
49+
},
50+
release: |this, room_id| try({
51+
if (this.queue.first().room.id() == room_id) {
52+
this.queue.first().task.del();
53+
this.queue.shift();
54+
this.go();
55+
} else {
56+
this.queue
57+
.remove(|inner| inner.room.id() == room_id)
58+
.each(|inner| inner.task.del());
59+
};
60+
nil;
61+
}),
62+
});
63+
64+
set_type('Root', {
65+
lock: 'thing<Lock>',
66+
version: 'int'
67+
});
68+
69+
new_procedure('acquire', |name, timeout| {
70+
.lock.get(name, .lock[name] = Lock{}).acquire(timeout);
71+
});
72+
73+
new_procedure('test', |room_id| {
74+
room(room_id).name() == 'go';
75+
});
76+
77+
new_procedure('release', |name, room_id| {
78+
wse(.lock[name].release(room_id));
79+
});
80+
81+
.to_type('Root');
82+
""")
83+
84+
85+
class _InnerRoom(Room):
86+
87+
future: asyncio.Future
88+
89+
def on_init(self) -> None:
90+
self.future = asyncio.Future()
91+
92+
async def on_join(self) -> None:
93+
# We might have missed the event during the join. If so, set the
94+
# future result to continue.
95+
ok = await self.client.run('test', self.id, scope=self.scope)
96+
if ok:
97+
self.future.set_result(None)
98+
99+
@event('go')
100+
def on_go(self):
101+
self.future.set_result(None)
102+
103+
104+
@asynccontextmanager
105+
async def lock(client: Client, name: str,
106+
scope: str = '//lock',
107+
timeout: int = 60):
108+
109+
room_id: int | None = \
110+
await client.run('acquire', name, timeout, scope=scope)
111+
112+
if room_id is not None:
113+
room = _InnerRoom(room_id, scope=scope)
114+
await room.join(client, wait=None)
115+
await room.future
116+
117+
try:
118+
yield room_id # Lock Id assigned to the 'as' target (not required)
119+
finally:
120+
await client.run('release', room_id, scope=scope)

thingsdb/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '1.1.6'
1+
__version__ = '1.1.7'

0 commit comments

Comments
 (0)