From a5db77f8f67b07701ab8a59f4c9c20deb5ea0eb3 Mon Sep 17 00:00:00 2001 From: ZeroGameStudio <837757433@qq.com> Date: Sun, 31 May 2026 13:08:20 +0800 Subject: [PATCH] Serialize expensive editor operations with leases --- .../services/tools/editor_operation_lease.py | 241 ++++++++++++++++++ Server/src/services/tools/refresh_unity.py | 27 ++ Server/src/services/tools/run_tests.py | 94 ++++--- .../test_editor_operation_lease.py | 216 ++++++++++++++++ ...-05-31-multi-agent-operation-lease-spec.md | 68 +++++ 5 files changed, 607 insertions(+), 39 deletions(-) create mode 100644 Server/src/services/tools/editor_operation_lease.py create mode 100644 Server/tests/integration/test_editor_operation_lease.py create mode 100644 docs/plans/2026-05-31-multi-agent-operation-lease-spec.md diff --git a/Server/src/services/tools/editor_operation_lease.py b/Server/src/services/tools/editor_operation_lease.py new file mode 100644 index 000000000..81828e5af --- /dev/null +++ b/Server/src/services/tools/editor_operation_lease.py @@ -0,0 +1,241 @@ +from __future__ import annotations + +import json +import logging +import os +import re +import time +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from models import MCPResponse + +logger = logging.getLogger(__name__) + +LEASE_DIR_ENV = "UNITY_MCP_OPERATION_LEASE_DIR" +LEASE_TTL_ENV = "UNITY_MCP_OPERATION_LEASE_TTL_S" +DEFAULT_LEASE_TTL_S = 120.0 +_SAFE_KEY_RE = re.compile(r"[^A-Za-z0-9_.@-]+") + + +@dataclass(frozen=True) +class EditorOperationLeaseInfo: + instance_id: str + operation: str + owner: str + started_unix_ms: int + expires_unix_ms: int + pid: int | None = None + path: str | None = None + + +@dataclass +class EditorOperationLease: + path: Path + token: str + info: EditorOperationLeaseInfo + reentrant: bool = False + _released: bool = False + + @property + def instance_id(self) -> str: + return self.info.instance_id + + @property + def operation(self) -> str: + return self.info.operation + + @property + def owner(self) -> str: + return self.info.owner + + def release(self) -> None: + if self._released: + return + if self.reentrant: + self._released = True + return + try: + payload = _read_payload(self.path) + if payload and payload.get("token") == self.token: + self.path.unlink(missing_ok=True) + except Exception as exc: # pragma: no cover - defensive cleanup path + logger.debug("Failed to release editor operation lease %s: %r", self.path, exc) + finally: + self._released = True + + +def operation_owner_from_context(ctx: Any) -> str: + return f"pid:{os.getpid()}:ctx:{id(ctx)}" + + +def operation_busy_response( + lease_info: EditorOperationLeaseInfo, + *, + retry_after_ms: int = 2000, +) -> MCPResponse: + return MCPResponse( + success=False, + error="operation_busy", + message=f"Unity editor operation already in progress: {lease_info.operation}", + hint="retry", + data={ + "reason": "operation_busy", + "retry_after_ms": int(retry_after_ms), + "instance_id": lease_info.instance_id, + "operation": lease_info.operation, + "owner": lease_info.owner, + "pid": lease_info.pid, + "expires_unix_ms": lease_info.expires_unix_ms, + }, + ) + + +def try_acquire_editor_operation_lease( + unity_instance: str | None, + operation: str, + *, + owner: str | None = None, + ttl_s: float | None = None, +) -> tuple[EditorOperationLease | None, EditorOperationLeaseInfo | None]: + instance_id = unity_instance or "default" + lease_dir = _operation_lease_dir() + lease_dir.mkdir(parents=True, exist_ok=True) + path = lease_dir / f"{_safe_lease_key(instance_id)}.json" + owner = owner or f"pid:{os.getpid()}" + ttl_ms = int(_lease_ttl_s(ttl_s) * 1000) + + while True: + now_ms = _now_ms() + token = uuid.uuid4().hex + payload = { + "instance_id": instance_id, + "operation": operation, + "owner": owner, + "pid": os.getpid(), + "token": token, + "started_unix_ms": now_ms, + "expires_unix_ms": now_ms + ttl_ms, + } + + try: + fd = os.open(str(path), os.O_WRONLY | os.O_CREAT | os.O_EXCL) + except FileExistsError: + existing = _read_payload(path) + if _is_lease_expired(existing, path, now_ms, ttl_ms): + try: + path.unlink() + except FileNotFoundError: + continue + except OSError: + return None, _payload_to_info(existing, path, instance_id) + continue + if existing and existing.get("owner") == owner: + return ( + EditorOperationLease( + path, + str(existing.get("token") or ""), + _payload_to_info(existing, path, instance_id), + reentrant=True, + ), + None, + ) + return None, _payload_to_info(existing, path, instance_id) + + with os.fdopen(fd, "w", encoding="utf-8") as lease_file: + json.dump(payload, lease_file, separators=(",", ":"), sort_keys=True) + return EditorOperationLease(path, token, _payload_to_info(payload, path, instance_id)), None + + +def _operation_lease_dir() -> Path: + configured = os.environ.get(LEASE_DIR_ENV) + if configured: + return Path(configured) + return Path.home() / ".unity-mcp" / "operation-leases" + + +def _lease_ttl_s(ttl_s: float | None) -> float: + if ttl_s is None: + raw = os.environ.get(LEASE_TTL_ENV) + if raw is None: + return DEFAULT_LEASE_TTL_S + try: + ttl_s = float(raw) + except ValueError: + return DEFAULT_LEASE_TTL_S + + try: + value = float(ttl_s) + except (TypeError, ValueError): + return DEFAULT_LEASE_TTL_S + return max(0.001, min(value, 3600.0)) + + +def _safe_lease_key(instance_id: str) -> str: + key = _SAFE_KEY_RE.sub("_", instance_id).strip("._-") + return key or "default" + + +def _now_ms() -> int: + return int(time.time() * 1000) + + +def _read_payload(path: Path) -> dict[str, Any] | None: + try: + raw = path.read_text(encoding="utf-8") + payload = json.loads(raw) + return payload if isinstance(payload, dict) else None + except FileNotFoundError: + return None + except Exception: + return None + + +def _payload_to_info( + payload: dict[str, Any] | None, + path: Path, + fallback_instance_id: str, +) -> EditorOperationLeaseInfo: + payload = payload or {} + started = _int_or_default(payload.get("started_unix_ms"), _mtime_ms(path)) + expires = _int_or_default( + payload.get("expires_unix_ms"), + started + int(DEFAULT_LEASE_TTL_S * 1000), + ) + pid = payload.get("pid") + return EditorOperationLeaseInfo( + instance_id=str(payload.get("instance_id") or fallback_instance_id), + operation=str(payload.get("operation") or "unknown"), + owner=str(payload.get("owner") or "unknown"), + started_unix_ms=started, + expires_unix_ms=expires, + pid=pid if isinstance(pid, int) else None, + path=str(path), + ) + + +def _is_lease_expired( + payload: dict[str, Any] | None, + path: Path, + now_ms: int, + ttl_ms: int, +) -> bool: + if payload and isinstance(payload.get("expires_unix_ms"), int): + return payload["expires_unix_ms"] <= now_ms + try: + return _mtime_ms(path) + ttl_ms <= now_ms + except OSError: + return True + + +def _mtime_ms(path: Path) -> int: + try: + return int(path.stat().st_mtime * 1000) + except OSError: + return _now_ms() + + +def _int_or_default(value: Any, default: int) -> int: + return value if isinstance(value, int) else default diff --git a/Server/src/services/tools/refresh_unity.py b/Server/src/services/tools/refresh_unity.py index bc147dc09..972448939 100644 --- a/Server/src/services/tools/refresh_unity.py +++ b/Server/src/services/tools/refresh_unity.py @@ -13,6 +13,11 @@ from models import MCPResponse from services.registry import mcp_for_unity_tool from services.tools import get_unity_instance_from_context +from services.tools.editor_operation_lease import ( + operation_busy_response, + operation_owner_from_context, + try_acquire_editor_operation_lease, +) import transport.unity_transport as unity_transport import transport.legacy.unity_connection as _legacy_conn from transport.legacy.unity_connection import _extract_response_reason @@ -183,7 +188,29 @@ async def refresh_unity( "If true, wait until editor_state.advice.ready_for_tools is true"] = True, ) -> MCPResponse | dict[str, Any]: unity_instance = await get_unity_instance_from_context(ctx) + lease, busy_lease = try_acquire_editor_operation_lease( + unity_instance, + "refresh_unity", + owner=operation_owner_from_context(ctx), + ) + if busy_lease is not None: + return operation_busy_response(busy_lease) + + try: + return await _refresh_unity_locked(ctx, unity_instance, mode, scope, compile, wait_for_ready) + finally: + if lease is not None: + lease.release() + +async def _refresh_unity_locked( + ctx: Context, + unity_instance: str | None, + mode: str, + scope: str, + compile: str, + wait_for_ready: bool, +) -> MCPResponse | dict[str, Any]: params: dict[str, Any] = { "mode": mode, "scope": scope, diff --git a/Server/src/services/tools/run_tests.py b/Server/src/services/tools/run_tests.py index 0426e63b5..574a24c94 100644 --- a/Server/src/services/tools/run_tests.py +++ b/Server/src/services/tools/run_tests.py @@ -13,6 +13,11 @@ from models import MCPResponse from services.registry import mcp_for_unity_tool from services.tools import get_unity_instance_from_context +from services.tools.editor_operation_lease import ( + operation_busy_response, + operation_owner_from_context, + try_acquire_editor_operation_lease, +) from services.tools.preflight import preflight import transport.unity_transport as unity_transport from transport.legacy.unity_connection import async_send_command_with_retry @@ -175,49 +180,60 @@ async def run_tests( return MCPResponse(success=False, error="init_timeout must be a positive integer (milliseconds) or None") unity_instance = await get_unity_instance_from_context(ctx) - - gate = await preflight(ctx, requires_no_tests=True, wait_for_no_compile=True, refresh_if_dirty=True) - if isinstance(gate, MCPResponse): - return gate - - def _coerce_string_list(value) -> list[str] | None: - if value is None: - return None - if isinstance(value, str): - return [value] if value.strip() else None - if isinstance(value, list): - result = [str(v).strip() for v in value if v and str(v).strip()] - return result if result else None - return None - - params: dict[str, Any] = {"mode": mode} - if (t := _coerce_string_list(test_names)): - params["testNames"] = t - if (g := _coerce_string_list(group_names)): - params["groupNames"] = g - if (c := _coerce_string_list(category_names)): - params["categoryNames"] = c - if (a := _coerce_string_list(assembly_names)): - params["assemblyNames"] = a - if include_failed_tests: - params["includeFailedTests"] = True - if include_details: - params["includeDetails"] = True - if init_timeout is not None and init_timeout > 0: - params["initTimeout"] = init_timeout - - response = await unity_transport.send_with_unity_instance( - async_send_command_with_retry, + lease, busy_lease = try_acquire_editor_operation_lease( unity_instance, "run_tests", - params, + owner=operation_owner_from_context(ctx), ) + if busy_lease is not None: + return operation_busy_response(busy_lease) - if isinstance(response, dict): - if not response.get("success", True): - return MCPResponse(**response) - return RunTestsStartResponse(**response) - return MCPResponse(success=False, error=str(response)) + try: + gate = await preflight(ctx, requires_no_tests=True, wait_for_no_compile=True, refresh_if_dirty=True) + if isinstance(gate, MCPResponse): + return gate + + def _coerce_string_list(value) -> list[str] | None: + if value is None: + return None + if isinstance(value, str): + return [value] if value.strip() else None + if isinstance(value, list): + result = [str(v).strip() for v in value if v and str(v).strip()] + return result if result else None + return None + + params: dict[str, Any] = {"mode": mode} + if (t := _coerce_string_list(test_names)): + params["testNames"] = t + if (g := _coerce_string_list(group_names)): + params["groupNames"] = g + if (c := _coerce_string_list(category_names)): + params["categoryNames"] = c + if (a := _coerce_string_list(assembly_names)): + params["assemblyNames"] = a + if include_failed_tests: + params["includeFailedTests"] = True + if include_details: + params["includeDetails"] = True + if init_timeout is not None and init_timeout > 0: + params["initTimeout"] = init_timeout + + response = await unity_transport.send_with_unity_instance( + async_send_command_with_retry, + unity_instance, + "run_tests", + params, + ) + + if isinstance(response, dict): + if not response.get("success", True): + return MCPResponse(**response) + return RunTestsStartResponse(**response) + return MCPResponse(success=False, error=str(response)) + finally: + if lease is not None: + lease.release() @mcp_for_unity_tool( diff --git a/Server/tests/integration/test_editor_operation_lease.py b/Server/tests/integration/test_editor_operation_lease.py new file mode 100644 index 000000000..b435fcf4f --- /dev/null +++ b/Server/tests/integration/test_editor_operation_lease.py @@ -0,0 +1,216 @@ +import time + +import pytest + +from .test_helpers import DummyContext + + +@pytest.fixture(autouse=True) +def isolated_operation_lease_dir(tmp_path, monkeypatch): + monkeypatch.setenv("UNITY_MCP_OPERATION_LEASE_DIR", str(tmp_path)) + monkeypatch.delenv("UNITY_MCP_OPERATION_LEASE_TTL_S", raising=False) + + +def test_operation_lease_blocks_second_owner(): + from services.tools.editor_operation_lease import ( + operation_busy_response, + try_acquire_editor_operation_lease, + ) + + lease, busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "refresh_unity", + owner="agent-a", + ttl_s=30, + ) + assert lease is not None + assert busy is None + + try: + second_lease, second_busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "run_tests", + owner="agent-b", + ttl_s=30, + ) + assert second_lease is None + assert second_busy is not None + assert second_busy.owner == "agent-a" + assert second_busy.operation == "refresh_unity" + + response = operation_busy_response(second_busy) + assert response.success is False + assert response.error == "operation_busy" + assert response.hint == "retry" + assert response.data["reason"] == "operation_busy" + assert response.data["operation"] == "refresh_unity" + finally: + lease.release() + + +def test_operation_lease_reclaims_expired_file(): + from services.tools.editor_operation_lease import try_acquire_editor_operation_lease + + lease, busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "refresh_unity", + owner="agent-a", + ttl_s=0.001, + ) + assert lease is not None + assert busy is None + + time.sleep(0.01) + + replacement, replacement_busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "run_tests", + owner="agent-b", + ttl_s=30, + ) + assert replacement is not None + assert replacement_busy is None + assert replacement.owner == "agent-b" + replacement.release() + + +def test_operation_owner_is_per_tool_context(): + from services.tools.editor_operation_lease import operation_owner_from_context + + first = DummyContext() + second = DummyContext() + + assert operation_owner_from_context(first) == operation_owner_from_context(first) + assert operation_owner_from_context(first) != operation_owner_from_context(second) + + +def test_operation_lease_allows_reentrant_acquire_for_same_owner(): + from services.tools.editor_operation_lease import try_acquire_editor_operation_lease + + lease, busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "run_tests", + owner="agent-a", + ttl_s=30, + ) + assert lease is not None + assert busy is None + + try: + nested, nested_busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "refresh_unity", + owner="agent-a", + ttl_s=30, + ) + assert nested is not None + assert nested_busy is None + assert nested.owner == "agent-a" + + nested.release() + + other, other_busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "refresh_unity", + owner="agent-b", + ttl_s=30, + ) + assert other is None + assert other_busy is not None + assert other_busy.owner == "agent-a" + finally: + lease.release() + + +@pytest.mark.asyncio +async def test_run_tests_returns_retry_when_another_agent_holds_refresh_lease(monkeypatch): + from services.tools.editor_operation_lease import try_acquire_editor_operation_lease + from services.tools.run_tests import run_tests + import services.tools.run_tests as mod + + ctx = DummyContext() + await ctx.set_state("unity_instance", "UnityMCPTests@abc123") + + lease, busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "refresh_unity", + owner="agent-a", + ttl_s=30, + ) + assert lease is not None + assert busy is None + + async def fail_if_dispatched(*args, **kwargs): + raise AssertionError("run_tests should not dispatch while operation lease is held") + + monkeypatch.setattr(mod.unity_transport, "send_with_unity_instance", fail_if_dispatched) + + try: + response = await run_tests(ctx, mode="EditMode") + assert response.success is False + assert response.error == "operation_busy" + assert response.hint == "retry" + assert response.data["operation"] == "refresh_unity" + finally: + lease.release() + + +@pytest.mark.asyncio +async def test_refresh_unity_returns_retry_when_another_agent_holds_run_tests_lease(monkeypatch): + from services.tools.editor_operation_lease import try_acquire_editor_operation_lease + from services.tools.refresh_unity import refresh_unity + import services.tools.refresh_unity as mod + + ctx = DummyContext() + await ctx.set_state("unity_instance", "UnityMCPTests@abc123") + + lease, busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "run_tests", + owner="agent-a", + ttl_s=30, + ) + assert lease is not None + assert busy is None + + async def fail_if_dispatched(*args, **kwargs): + raise AssertionError("refresh_unity should not dispatch while operation lease is held") + + monkeypatch.setattr(mod.unity_transport, "send_with_unity_instance", fail_if_dispatched) + + try: + response = await refresh_unity(ctx, wait_for_ready=False) + assert response.success is False + assert response.error == "operation_busy" + assert response.hint == "retry" + assert response.data["operation"] == "run_tests" + finally: + lease.release() + + +@pytest.mark.asyncio +async def test_run_tests_releases_start_lease_after_dispatch(monkeypatch): + from services.tools.editor_operation_lease import try_acquire_editor_operation_lease + from services.tools.run_tests import run_tests + import services.tools.run_tests as mod + + ctx = DummyContext() + await ctx.set_state("unity_instance", "UnityMCPTests@abc123") + + async def fake_send_with_unity_instance(send_fn, unity_instance, command_type, params, **kwargs): + return {"success": True, "data": {"job_id": "job-1", "status": "running", "mode": "EditMode"}} + + monkeypatch.setattr(mod.unity_transport, "send_with_unity_instance", fake_send_with_unity_instance) + + response = await run_tests(ctx, mode="EditMode") + assert response.success is True + + lease, busy = try_acquire_editor_operation_lease( + "UnityMCPTests@abc123", + "refresh_unity", + owner="agent-b", + ttl_s=30, + ) + assert lease is not None + assert busy is None + lease.release() diff --git a/docs/plans/2026-05-31-multi-agent-operation-lease-spec.md b/docs/plans/2026-05-31-multi-agent-operation-lease-spec.md new file mode 100644 index 000000000..c12d0c300 --- /dev/null +++ b/docs/plans/2026-05-31-multi-agent-operation-lease-spec.md @@ -0,0 +1,68 @@ +# Multi-agent operation lease + +Date: 2026-05-31 + +## Problem + +Multiple MCP clients can point at the same Unity project. Expensive editor +operations such as `refresh_unity` and `run_tests` are destructive enough that +parallel starts make the editor slower and less predictable: + +- two agents can request refresh/compile at the same time; +- one agent can start tests while another starts a refresh; +- the existing Unity-side busy checks only help after the editor state has + already changed. + +## Goals + +- Add a server-side, cross-process guard for a single Unity instance. +- Cover only the small high-impact surface first: `refresh_unity` and the + `run_tests` start path. +- Fail fast with a structured retry response when another agent owns the + editor operation window. +- Recover automatically from stale locks left by crashed MCP server processes. + +## Non-goals + +- No central command queue. +- No client-visible scheduler UI. +- No ordering guarantees across all MCP tools. +- No replacement for Unity's existing `tests_running` or compile-state + preflight checks. + +## Design + +The Python server creates one atomic file lease per Unity instance in a shared +per-user directory. The lease file contains owner, operation, PID, token, start +time, and expiry time. + +Acquire uses exclusive file creation so separate MCP server processes agree on +one owner. If the file exists and is not expired, the tool returns: + +```json +{ + "success": false, + "error": "operation_busy", + "hint": "retry", + "data": { + "reason": "operation_busy", + "operation": "refresh_unity", + "owner": "session:...", + "retry_after_ms": 2000 + } +} +``` + +`refresh_unity` holds the lease for the whole refresh/wait operation. + +`run_tests` holds the lease only while it performs preflight and starts the +async Unity test job. The actual test execution continues to be represented by +Unity's test state and `get_test_job`; this keeps the change small while +removing the most common race window at job startup. + +## Verification + +- Unit-test lease acquire, busy response, release, and stale lease recovery. +- Tool-test that `run_tests` and `refresh_unity` return retryable busy responses + before dispatching to Unity when another owner holds the lease. +- Tool-test that `run_tests` releases the startup lease after dispatch.