Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2d5bfdf
first step
slampy97 Oct 22, 2025
96c0ff1
also driver
slampy97 Oct 22, 2025
a4a292e
working coord client plus not working lock/ conenction problemsg
slampy97 Oct 22, 2025
d9de575
add wrappers, simple test is working, but problem to got structure back
slampy97 Oct 29, 2025
7701a05
several fixes plus wrappers
slampy97 Oct 29, 2025
82669fc
delete path from pr
slampy97 Oct 29, 2025
9fd7073
fix add gitmodules
slampy97 Oct 29, 2025
44a20a6
add alter node
slampy97 Oct 29, 2025
d749eb0
fix
slampy97 Oct 29, 2025
a5a36f6
fix config
slampy97 Oct 29, 2025
53ebed9
erase wrappers
slampy97 Oct 29, 2025
678598b
fix public wrappers
slampy97 Oct 31, 2025
1a6e166
fix last drivers coordination client inclusion
slampy97 Oct 31, 2025
aa8eaf9
fix tox -e style
slampy97 Oct 31, 2025
bd96246
tox -e black-format
slampy97 Oct 31, 2025
c3324cf
fix review remarks
slampy97 Nov 4, 2025
41323e6
fix review remarks
slampy97 Nov 4, 2025
65edbea
fix flake8 mistakes
slampy97 Nov 4, 2025
d2d25d4
fix review remarks plus styles checks
slampy97 Nov 5, 2025
9850b78
working async client, session_init + ping pong + lock release/acquire
slampy97 Nov 17, 2025
3f55d20
working async client, session_init + ping pong + lock release/acquire…
slampy97 Nov 17, 2025
89fe18d
some fix / redundant logic
slampy97 Nov 17, 2025
31e4fb6
fix lock logic
slampy97 Nov 17, 2025
194942f
refactor logic lock + reconnecor + stream - > lock should be resosibl…
slampy97 Nov 17, 2025
a81db7e
add describe -> start making crud lock object
slampy97 Nov 18, 2025
8defd59
crud lock object + acquire and release.
slampy97 Nov 18, 2025
53d547b
Merge branch 'coordination-service-impementation' into coordination-lock
slampy97 Nov 18, 2025
7d78387
add style checking
slampy97 Nov 18, 2025
4f66aad
fix linter
slampy97 Nov 18, 2025
45fc213
Add public wrappers for describe lock result
slampy97 Nov 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion tests/coordination/test_coordination_client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import asyncio

import pytest

import ydb
from ydb import aio
from ydb import aio, StatusCode

from ydb.coordination import (
NodeConfig,
ConsistencyMode,
RateLimiterCountersMode,
CoordinationClient,
CreateSemaphoreResult,
DescribeLockResult,
)


Expand Down Expand Up @@ -93,3 +97,96 @@ async def test_coordination_node_lifecycle_async(self, aio_connection):

with pytest.raises(ydb.SchemeError):
await client.describe_node(node_path)

async def test_coordination_lock_full_lifecycle(self, aio_connection):
client = aio.CoordinationClient(aio_connection)

node_path = "/local/test_lock_full_lifecycle"

try:
await client.delete_node(node_path)
except ydb.SchemeError:
pass

await client.create_node(
node_path,
NodeConfig(
session_grace_period_millis=1000,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.STRICT,
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
self_check_period_millis=0,
),
)

lock = client.lock("test_lock", node_path)

create_resp: CreateSemaphoreResult = await lock.create(init_limit=1, init_data=b"init-data")
assert create_resp.status == StatusCode.SUCCESS

describe_resp: DescribeLockResult = await lock.describe()
assert describe_resp.status == StatusCode.SUCCESS
assert describe_resp.name == "test_lock"
assert describe_resp.data == b"init-data"
assert describe_resp.count == 0
assert describe_resp.ephemeral is False
assert list(describe_resp.owners) == []
assert list(describe_resp.waiters) == []

update_resp = await lock.update(new_data=b"updated-data")
assert update_resp.status == StatusCode.SUCCESS

describe_resp2: DescribeLockResult = await lock.describe()
assert describe_resp2.status == StatusCode.SUCCESS
assert describe_resp2.name == "test_lock"
assert describe_resp2.data == b"updated-data"
assert describe_resp2.count == 0
assert describe_resp2.ephemeral is False
assert list(describe_resp2.owners) == []
assert list(describe_resp2.waiters) == []

lock2_started = asyncio.Event()
lock2_acquired = asyncio.Event()

async def second_lock_task():
lock2_started.set()
async with client.lock("test_lock", node_path):
lock2_acquired.set()
await asyncio.sleep(0.5)

async with client.lock("test_lock", node_path) as lock1:
assert lock1._stream is not None
assert lock1._stream.session_id is not None

resp: DescribeLockResult = await lock1.describe()
assert resp.status == StatusCode.SUCCESS
assert resp.name == "test_lock"
assert resp.data == b"updated-data"
assert resp.count == 1
assert resp.ephemeral is False
assert len(list(resp.owners)) == 1
assert list(resp.waiters) == []

t2 = asyncio.create_task(second_lock_task())
await lock2_started.wait()

await asyncio.sleep(0.5)

assert lock1._stream is not None

await asyncio.wait_for(lock2_acquired.wait(), timeout=5)
await asyncio.wait_for(t2, timeout=5)

async with client.lock("test_lock", node_path) as lock3:
assert lock3._stream is not None
assert lock3._stream.session_id is not None

resp3: DescribeLockResult = await lock3.describe()
assert resp3.status == StatusCode.SUCCESS
assert resp3.count == 1

delete_resp = await lock.delete()
assert delete_resp.status == StatusCode.SUCCESS

describe_after_delete: DescribeLockResult = await lock.describe()
assert describe_after_delete.status == StatusCode.NOT_FOUND
4 changes: 2 additions & 2 deletions ydb/_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ class QueryService(object):

class CoordinationService(object):
Stub = ydb_coordination_v1_pb2_grpc.CoordinationServiceStub

Session = "Session"
CreateNode = "CreateNode"
AlterNode = "AlterNode"
DropNode = "DropNode"
DescribeNode = "DescribeNode"
SessionRequest = "SessionRequest"
Session = "Session"
186 changes: 183 additions & 3 deletions ydb/_grpc/grpcwrapper/ydb_coordination.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import typing
from dataclasses import dataclass

from .ydb_coordination_public_types import NodeConfig

if typing.TYPE_CHECKING:
from ..v4.protos import ydb_coordination_pb2
Expand All @@ -14,7 +13,7 @@
@dataclass
class CreateNodeRequest(IToProto):
path: str
config: typing.Optional[NodeConfig]
config: typing.Any

def to_proto(self) -> ydb_coordination_pb2.CreateNodeRequest:
cfg_proto = self.config.to_proto() if self.config else None
Expand All @@ -27,7 +26,7 @@ def to_proto(self) -> ydb_coordination_pb2.CreateNodeRequest:
@dataclass
class AlterNodeRequest(IToProto):
path: str
config: NodeConfig
config: typing.Any

def to_proto(self) -> ydb_coordination_pb2.AlterNodeRequest:
cfg_proto = self.config.to_proto() if self.config else None
Expand Down Expand Up @@ -55,3 +54,184 @@ def to_proto(self) -> ydb_coordination_pb2.DropNodeRequest:
return ydb_coordination_pb2.DropNodeRequest(
path=self.path,
)


@dataclass
class SessionStart(IToProto):
path: str
timeout_millis: int
description: str = ""
session_id: int = 0
seq_no: int = 0
protection_key: bytes = b""

def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(
session_start=ydb_coordination_pb2.SessionRequest.SessionStart(
path=self.path,
session_id=self.session_id,
timeout_millis=self.timeout_millis,
description=self.description,
seq_no=self.seq_no,
protection_key=self.protection_key,
)
)


@dataclass
class SessionStop(IToProto):
def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(session_stop=ydb_coordination_pb2.SessionRequest.SessionStop())


@dataclass
class Ping(IToProto):
opaque: int = 0

def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(
ping=ydb_coordination_pb2.SessionRequest.PingPong(opaque=self.opaque)
)


@dataclass
class CreateSemaphore(IToProto):
name: str
req_id: int
limit: int
data: bytes = b""

def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(
create_semaphore=ydb_coordination_pb2.SessionRequest.CreateSemaphore(
req_id=self.req_id, name=self.name, limit=self.limit, data=self.data
)
)


@dataclass
class UpdateSemaphore(IToProto):
name: str
req_id: int
data: bytes

def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(
update_semaphore=ydb_coordination_pb2.SessionRequest.UpdateSemaphore(
req_id=self.req_id, name=self.name, data=self.data
)
)


@dataclass
class DeleteSemaphore(IToProto):
name: str
req_id: int
force: bool = False

def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(
delete_semaphore=ydb_coordination_pb2.SessionRequest.DeleteSemaphore(
req_id=self.req_id, name=self.name, force=self.force
)
)


@dataclass
class AcquireSemaphore(IToProto):
name: str
req_id: int
count: int = 1
timeout_millis: int = 0
data: bytes = b""
ephemeral: bool = False

def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(
acquire_semaphore=ydb_coordination_pb2.SessionRequest.AcquireSemaphore(
req_id=self.req_id,
name=self.name,
timeout_millis=self.timeout_millis,
count=self.count,
data=self.data,
ephemeral=self.ephemeral,
)
)


@dataclass
class ReleaseSemaphore(IToProto):
name: str
req_id: int

def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(
release_semaphore=ydb_coordination_pb2.SessionRequest.ReleaseSemaphore(req_id=self.req_id, name=self.name)
)


@dataclass
class DescribeSemaphore(IToProto):
include_owners: bool
include_waiters: bool
name: str
req_id: int
watch_data: bool
watch_owners: bool

def to_proto(self) -> ydb_coordination_pb2.SessionRequest:
return ydb_coordination_pb2.SessionRequest(
describe_semaphore=ydb_coordination_pb2.SessionRequest.DescribeSemaphore(
include_owners=self.include_owners,
include_waiters=self.include_waiters,
name=self.name,
req_id=self.req_id,
watch_data=self.watch_data,
watch_owners=self.watch_owners,
)
)


@dataclass
class FromServer:
raw: ydb_coordination_pb2.SessionResponse

@staticmethod
def from_proto(resp: ydb_coordination_pb2.SessionResponse) -> "FromServer":
return FromServer(raw=resp)

def __getattr__(self, name: str):
return getattr(self.raw, name)

@property
def session_started(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.SessionStarted]:
s = self.raw.session_started
return s if s.session_id else None

@property
def opaque(self) -> typing.Optional[int]:
if self.raw.HasField("ping"):
return self.raw.ping.opaque
return None

@property
def acquire_semaphore_result(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.AcquireSemaphoreResult]:
return self.raw.acquire_semaphore_result if self.raw.HasField("acquire_semaphore_result") else None

@property
def create_semaphore_result(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.CreateSemaphoreResult]:
return self.raw.create_semaphore_result if self.raw.HasField("create_semaphore_result") else None

@property
def delete_semaphore_result(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.DeleteSemaphoreResult]:
return self.raw.delete_semaphore_result if self.raw.HasField("delete_semaphore_result") else None

@property
def update_semaphore_result(self) -> typing.Optional[ydb_coordination_pb2.SessionResponse.UpdateSemaphoreResult]:
return self.raw.update_semaphore_result if self.raw.HasField("update_semaphore_result") else None

@property
def describe_semaphore_result(
self,
) -> typing.Optional[ydb_coordination_pb2.SessionResponse.DescribeSemaphoreResult]:
return self.raw.describe_semaphore_result if self.raw.HasField("describe_semaphore_result") else None
Loading
Loading