Skip to content

Commit 25873bd

Browse files
committed
remote/coordinator: prohibit multiple coordinators with the same name
Since the switch to gRPC, we only check that we don't have a connection from the same peer, but not with the same name, which would break assumptions elsewhere in the code and confuse users. Fix it by raising an exception in this case and shutting down both sides of the ExporterStream. Fixes: #1774 Signed-off-by: Jan Luebbe <[email protected]>
1 parent 70f6150 commit 25873bd

File tree

1 file changed

+29
-0
lines changed

1 file changed

+29
-0
lines changed

labgrid/remote/coordinator.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,8 @@ async def ExporterStream(self, request_iterator, context):
401401
command_queue = asyncio.Queue()
402402
pending_commands = []
403403

404+
startup_done = asyncio.Event()
405+
404406
out_msg = labgrid_coordinator_pb2.ExporterOutMessage()
405407
out_msg.hello.version = labgrid_version()
406408
yield out_msg
@@ -420,10 +422,15 @@ async def request_task():
420422
elif kind == "startup":
421423
version = in_msg.startup.version
422424
name = in_msg.startup.name
425+
if existing := self.get_exporter_by_name(name):
426+
raise ExporterError(
427+
f"exporter with name '{name}' is already connected from {existing.peer}"
428+
)
423429
session = self.exporters[peer] = ExporterSession(self, peer, name, command_queue, version)
424430
logging.debug("Exporters: %s", self.exporters)
425431
logging.debug("Received startup from %s with %s", name, version)
426432
asyncio.current_task().set_name(f"exporter-{peer}-rx/started-{name}")
433+
startup_done.set()
427434
elif kind == "resource":
428435
logging.debug("Received resource from %s with %s", name, in_msg.resource)
429436
action, _ = session.set_resource(
@@ -439,10 +446,32 @@ async def request_task():
439446
logging.debug("exporter request_task done: %s", context.done())
440447
except Exception:
441448
logging.exception("error in exporter message handler")
449+
raise
442450

443451
asyncio.current_task().set_name(f"exporter-{peer}-tx")
444452
running_request_task = self.loop.create_task(request_task(), name=f"exporter-{peer}-rx/init")
445453

454+
startup_done_task = self.loop.create_task(startup_done.wait())
455+
done, _ = await asyncio.wait(
456+
{startup_done_task, running_request_task},
457+
timeout=3,
458+
return_when=asyncio.FIRST_COMPLETED,
459+
)
460+
# clean up event task
461+
startup_done.set()
462+
await startup_done_task
463+
if running_request_task in done:
464+
# we probably had an exception during startup
465+
try:
466+
await running_request_task
467+
except ExporterError as e:
468+
await context.abort(grpc.StatusCode.ALREADY_EXISTS, f"startup failed: {e}")
469+
raise
470+
elif startup_done_task in done:
471+
await startup_done_task
472+
else:
473+
raise ExporterError(f"exporter connection from {peer} timed out during startup")
474+
446475
try:
447476
async for cmd in queue_as_aiter(command_queue):
448477
logging.debug("exporter cmd %s", cmd)

0 commit comments

Comments
 (0)