From 0494fea77f87d69461b5d258c070a0d22c8e97cb Mon Sep 17 00:00:00 2001 From: wzr <2668940489@qq.com> Date: Sun, 29 Mar 2026 04:38:07 +0800 Subject: [PATCH] fix: prevent mv lock timeout causing missing L0/L1 files Two-part fix for SemanticProcessor failing to move generated layer files (.abstract.md, .overview.md) from temp to target directory. 1. Change default lock_timeout from 0.0 to 5.0 (seconds) - LockManager and TransactionConfig both defaulted to 0.0, meaning any lock contention caused immediate failure - Updated to 5.0s: enough for transient contention, not so long that a real deadlock blocks indefinitely - Users can still set lock_timeout=0 for fail-fast behavior 2. Add retry logic with backoff for mv operations in SemanticProcessor._sync_topdown_recursive() - New _mv_with_retry() helper: retries up to 3 times with increasing delay (0.3s, 0.6s, 0.9s) on lock errors - Applied to all 4 viking_fs.mv() call sites - Logs warning on each retry attempt for debugging Closes #1047 --- .../storage/queuefs/semantic_processor.py | 31 ++++++++++++++++--- .../storage/transaction/lock_manager.py | 4 +-- .../utils/config/transaction_config.py | 15 ++++----- 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index dd5e284cf..28867053d 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -40,6 +40,29 @@ logger = get_logger(__name__) +# Retries for lock-contended mv operations +_MV_MAX_RETRIES = 3 +_MV_RETRY_BASE_DELAY = 0.3 # seconds + + +async def _mv_with_retry(viking_fs, src: str, dst: str, ctx=None) -> None: + """Move a file/dir with retry on transient lock contention.""" + for attempt in range(_MV_MAX_RETRIES): + try: + await viking_fs.mv(src, dst, ctx=ctx) + return + except Exception as e: + is_lock_error = "lock" in str(e).lower() + if is_lock_error and attempt < _MV_MAX_RETRIES - 1: + delay = _MV_RETRY_BASE_DELAY * (attempt + 1) + logger.warning( + f"[SyncDiff] mv lock contention (attempt {attempt + 1}/{_MV_MAX_RETRIES}), " + f"retrying in {delay}s: {src} -> {dst}" + ) + await asyncio.sleep(delay) + else: + raise + @dataclass class DiffResult: @@ -611,7 +634,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: f"[SyncDiff] Failed to remove old file before update: {target_file}, error={e}" ) try: - await viking_fs.mv(root_file, target_file, ctx=ctx) + await _mv_with_retry(viking_fs, root_file, target_file, ctx=ctx) except Exception as e: logger.error( f"[SyncDiff] Failed to move updated file: {root_file} -> {target_file}, error={e}" @@ -622,7 +645,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: diff.added_files.append(root_file) target_file_uri = VikingURI(target_dir).join(name).uri try: - await viking_fs.mv(root_file, target_file_uri, ctx=ctx) + await _mv_with_retry(viking_fs, root_file, target_file_uri, ctx=ctx) except Exception as e: logger.error( f"[SyncDiff] Failed to move added file: {root_file} -> {target_file_uri}, error={e}" @@ -659,7 +682,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: diff.added_dirs.append(root_subdir) target_subdir_uri = VikingURI(target_dir).join(name).uri try: - await viking_fs.mv(root_subdir, target_subdir_uri, ctx=ctx) + await _mv_with_retry(viking_fs, root_subdir, target_subdir_uri, ctx=ctx) except Exception as e: logger.error( f"[SyncDiff] Failed to move added directory: {root_subdir} -> {target_subdir_uri}, error={e}" @@ -675,7 +698,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None: if parent_uri: await viking_fs.mkdir(parent_uri.uri, exist_ok=True, ctx=ctx) diff.added_dirs.append(root_uri) - await viking_fs.mv(root_uri, target_uri, ctx=ctx) + await _mv_with_retry(viking_fs, root_uri, target_uri, ctx=ctx) return diff await sync_dir(root_uri, target_uri) diff --git a/openviking/storage/transaction/lock_manager.py b/openviking/storage/transaction/lock_manager.py index 2fec7e42a..99a99f387 100644 --- a/openviking/storage/transaction/lock_manager.py +++ b/openviking/storage/transaction/lock_manager.py @@ -22,7 +22,7 @@ class LockManager: def __init__( self, agfs: AGFSClient, - lock_timeout: float = 0.0, + lock_timeout: float = 5.0, lock_expire: float = 300.0, ): self._agfs = agfs @@ -236,7 +236,7 @@ async def _enqueue_semantic(self, **params: Any) -> None: def init_lock_manager( agfs: AGFSClient, - lock_timeout: float = 0.0, + lock_timeout: float = 5.0, lock_expire: float = 300.0, ) -> LockManager: global _lock_manager diff --git a/openviking_cli/utils/config/transaction_config.py b/openviking_cli/utils/config/transaction_config.py index 86d153f8c..69c992294 100644 --- a/openviking_cli/utils/config/transaction_config.py +++ b/openviking_cli/utils/config/transaction_config.py @@ -6,18 +6,19 @@ class TransactionConfig(BaseModel): """Configuration for the transaction mechanism. - By default, lock acquisition does not wait (``lock_timeout=0``): if a - conflicting lock is held the operation fails immediately with - ``LockAcquisitionError``. Set ``lock_timeout`` to a positive value to - allow the caller to block and retry for up to that many seconds. + By default, lock acquisition waits up to ``lock_timeout`` seconds + (``lock_timeout=5``): if a conflicting lock is held the caller blocks + and retries for up to 5 seconds before raising ``LockAcquisitionError``. + Set ``lock_timeout=0`` to fail immediately, or increase it for + high-contention workloads. """ lock_timeout: float = Field( - default=0.0, + default=5.0, description=( "Path lock acquisition timeout (seconds). " - "0 = fail immediately if locked (default). " - "> 0 = wait/retry up to this many seconds before raising LockAcquisitionError." + "0 = fail immediately if locked. " + "> 0 = wait/retry up to this many seconds before raising LockAcquisitionError (default: 5)." ), )