Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,29 @@

logger = get_logger(__name__)

# Retries for lock-contended mv operations
_MV_MAX_RETRIES = 3
_MV_RETRY_BASE_DELAY = 0.3 # seconds


async def _mv_with_retry(viking_fs, src: str, dst: str, ctx=None) -> None:
"""Move a file/dir with retry on transient lock contention."""
for attempt in range(_MV_MAX_RETRIES):
try:
await viking_fs.mv(src, dst, ctx=ctx)
return
except Exception as e:
is_lock_error = "lock" in str(e).lower()
if is_lock_error and attempt < _MV_MAX_RETRIES - 1:
delay = _MV_RETRY_BASE_DELAY * (attempt + 1)
logger.warning(
f"[SyncDiff] mv lock contention (attempt {attempt + 1}/{_MV_MAX_RETRIES}), "
f"retrying in {delay}s: {src} -> {dst}"
)
await asyncio.sleep(delay)
else:
raise


@dataclass
class DiffResult:
Expand Down Expand Up @@ -611,7 +634,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
f"[SyncDiff] Failed to remove old file before update: {target_file}, error={e}"
)
try:
await viking_fs.mv(root_file, target_file, ctx=ctx)
await _mv_with_retry(viking_fs, root_file, target_file, ctx=ctx)
except Exception as e:
logger.error(
f"[SyncDiff] Failed to move updated file: {root_file} -> {target_file}, error={e}"
Expand All @@ -622,7 +645,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
diff.added_files.append(root_file)
target_file_uri = VikingURI(target_dir).join(name).uri
try:
await viking_fs.mv(root_file, target_file_uri, ctx=ctx)
await _mv_with_retry(viking_fs, root_file, target_file_uri, ctx=ctx)
except Exception as e:
logger.error(
f"[SyncDiff] Failed to move added file: {root_file} -> {target_file_uri}, error={e}"
Expand Down Expand Up @@ -659,7 +682,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
diff.added_dirs.append(root_subdir)
target_subdir_uri = VikingURI(target_dir).join(name).uri
try:
await viking_fs.mv(root_subdir, target_subdir_uri, ctx=ctx)
await _mv_with_retry(viking_fs, root_subdir, target_subdir_uri, ctx=ctx)
except Exception as e:
logger.error(
f"[SyncDiff] Failed to move added directory: {root_subdir} -> {target_subdir_uri}, error={e}"
Expand All @@ -675,7 +698,7 @@ async def sync_dir(root_dir: str, target_dir: str) -> None:
if parent_uri:
await viking_fs.mkdir(parent_uri.uri, exist_ok=True, ctx=ctx)
diff.added_dirs.append(root_uri)
await viking_fs.mv(root_uri, target_uri, ctx=ctx)
await _mv_with_retry(viking_fs, root_uri, target_uri, ctx=ctx)
return diff

await sync_dir(root_uri, target_uri)
Expand Down
4 changes: 2 additions & 2 deletions openviking/storage/transaction/lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class LockManager:
def __init__(
self,
agfs: AGFSClient,
lock_timeout: float = 0.0,
lock_timeout: float = 5.0,
lock_expire: float = 300.0,
):
self._agfs = agfs
Expand Down Expand Up @@ -236,7 +236,7 @@ async def _enqueue_semantic(self, **params: Any) -> None:

def init_lock_manager(
agfs: AGFSClient,
lock_timeout: float = 0.0,
lock_timeout: float = 5.0,
lock_expire: float = 300.0,
) -> LockManager:
global _lock_manager
Expand Down
15 changes: 8 additions & 7 deletions openviking_cli/utils/config/transaction_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
class TransactionConfig(BaseModel):
"""Configuration for the transaction mechanism.

By default, lock acquisition does not wait (``lock_timeout=0``): if a
conflicting lock is held the operation fails immediately with
``LockAcquisitionError``. Set ``lock_timeout`` to a positive value to
allow the caller to block and retry for up to that many seconds.
By default, lock acquisition waits up to ``lock_timeout`` seconds
(``lock_timeout=5``): if a conflicting lock is held the caller blocks
and retries for up to 5 seconds before raising ``LockAcquisitionError``.
Set ``lock_timeout=0`` to fail immediately, or increase it for
high-contention workloads.
"""

lock_timeout: float = Field(
default=0.0,
default=5.0,
description=(
"Path lock acquisition timeout (seconds). "
"0 = fail immediately if locked (default). "
"> 0 = wait/retry up to this many seconds before raising LockAcquisitionError."
"0 = fail immediately if locked. "
"> 0 = wait/retry up to this many seconds before raising LockAcquisitionError (default: 5)."
),
)

Expand Down
Loading