Skip to content

Commit 8265ad5

Browse files
committed
Lock
1 parent b38c3b0 commit 8265ad5

File tree

3 files changed

+96
-11
lines changed

3 files changed

+96
-11
lines changed

thingsdb/misc/README.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Lock
2+
3+
This lock provides distributed mutual exclusion, allowing you to synchronize
4+
access to shared resources or critical sections of code across multiple
5+
independent Python programs or services, even if they are running on different
6+
machines.
7+
8+
It functions similarly to `asyncio.Lock()`, which is designed for concurrency
9+
within a single process, but extends this capability to a multi-process,
10+
multi-host environment by leveraging ThingsDB as its backend. This ensures that
11+
only one client can acquire the lock at any given time, preventing race
12+
conditions and maintaining data integrity in a distributed system.
13+
14+
The `timeout` parameter defines the maximum duration a lock can be held.
15+
If a client fails to explicitly release the lock (e.g., due to a crash),
16+
ThingsDB will automatically release it after this period, preventing deadlocks.
17+
Separately, the expression `queue_size * timeout` indicates the total maximum
18+
time a client will actively attempt to acquire the lock if it's currently
19+
unavailable.
20+
21+
Example code:
22+
23+
```python
24+
import asyncio
25+
from functools import partial
26+
from thingsdb.client import Client
27+
from thingsdb.misc import lock
28+
29+
30+
async def main():
31+
# ThingsDB client
32+
client = Client()
33+
34+
# Multiple locks may be created, make sure you give each lock a unique name
35+
mylock = partial(lock.lock, client=client, name='my-lock', timeout=5)
36+
37+
await client.connect('localhost')
38+
try:
39+
await client.authenticate('admin', 'pass')
40+
41+
# This will set-up a lock collection
42+
# It will only do work the first time, but no harm in keep calling
43+
await lock.setup(client)
44+
45+
# Wait for a lock
46+
async with mylock():
47+
print('In here')
48+
await asyncio.sleep(5.0) # simulate some work
49+
print('Done here')
50+
51+
finally:
52+
await client.close_and_wait()
53+
54+
55+
if __name__ == '__main__':
56+
asyncio.run(main())
57+
```
58+
59+
To observe the distributed lock in action, you can execute the example Python
60+
script simultaneously in multiple separate terminal windows.
61+
62+
You can determine if a specific distributed lock is currently held by using
63+
the `lock.locked()` asynchronous function.
64+
65+
To check the lock's status:
66+
67+
```python
68+
is_locked = await lock.locked(client, 'my-lock')
69+
```

thingsdb/misc/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +0,0 @@
1-
from .lock import lock as ti_lock

thingsdb/misc/lock.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
from contextlib import asynccontextmanager
3+
from typing import AsyncGenerator
34
from ..client import Client
45
from ..room import Room, event
56

@@ -42,10 +43,12 @@ async def setup(client: Client, collection: str = 'lock'):
4243
inner.room.emit('go');
4344
},
4445
acquire: |this, timeout| {
45-
immediately = this.queue.len() == 0;
46+
size = this.queue.len();
47+
immediately = size == 0;
4648
inner = Inner{timeout:,};
4749
this.queue.push(inner);
48-
immediately ? inner.set_task(this.id()) : inner.room.id();
50+
immediately && inner.set_task(this.id());
51+
[immediately, inner.room.id(), size];
4952
},
5053
release: |this, room_id| try({
5154
if (this.queue.first().room.id() == room_id) {
@@ -74,12 +77,16 @@ async def setup(client: Client, collection: str = 'lock'):
7477
room(room_id).name() == 'go';
7578
});
7679
80+
new_procedure('locked', |name| {
81+
bool(.lock[name].queue);
82+
});
83+
7784
new_procedure('release', |name, room_id| {
7885
wse(.lock[name].release(room_id));
7986
});
8087
8188
.to_type('Root');
82-
""")
89+
""", scope=f'//{collection}')
8390

8491

8592
class _InnerRoom(Room):
@@ -93,28 +100,38 @@ async def on_join(self) -> None:
93100
# We might have missed the event during the join. If so, set the
94101
# future result to continue.
95102
ok = await self.client.run('test', self.id, scope=self.scope)
96-
if ok:
103+
if ok and not self.future.done():
97104
self.future.set_result(None)
98105

99106
@event('go')
100107
def on_go(self):
101-
self.future.set_result(None)
108+
if not self.future.done():
109+
self.future.set_result(None)
102110

103111

104112
@asynccontextmanager
105113
async def lock(client: Client, name: str,
106114
scope: str = '//lock',
107-
timeout: int = 60):
115+
timeout: int = 60) -> AsyncGenerator[int, None]:
108116

109-
room_id: int | None = \
117+
res: tuple[bool, int, int] = \
110118
await client.run('acquire', name, timeout, scope=scope)
111119

112-
if room_id is not None:
120+
immediately, room_id, size = res
121+
if not immediately:
113122
room = _InnerRoom(room_id, scope=scope)
114123
await room.join(client, wait=None)
115-
await room.future
124+
try:
125+
await asyncio.wait_for(room.future, timeout=timeout*size)
126+
except asyncio.TimeoutError:
127+
pass
116128

117129
try:
118130
yield room_id # Lock Id assigned to the 'as' target (not required)
119131
finally:
120-
await client.run('release', room_id, scope=scope)
132+
await client.run('release', name, room_id, scope=scope)
133+
134+
135+
async def locked(client: Client, name: str, scope: str = '//lock') -> bool:
136+
res: bool = await client.run('locked', name, scope=scope)
137+
return res

0 commit comments

Comments
 (0)