Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions docs/user_guide/command_line.md
Original file line number Diff line number Diff line change
Expand Up @@ -935,14 +935,16 @@ merlin database cleanup [OPTIONS]

**Options:**

| Name | Type | Description | Default |
| ------------ | ------- | ----------- | ------- |
| `-h`, `--help` | boolean | Show this help message and exit | `False` |
| `--dry-run` | boolean | Show what would be deleted without actually deleting anything | `False` |
| `--skip-runs` | boolean | Skip checking for runs with invalid workspaces | `False` |
| `--skip-workers` | boolean | Skip checking for orphaned workers | `False` |
| `--skip-studies` | boolean | Skip checking for empty studies | `False` |
| `-f`, `--force` | boolean | Skip confirmation prompt (use with caution) | `False` |
| Name | Type | Description | Default |
| ------------------------- | ------- | --------------------------------------------------------------- | ------- |
| `-h`, `--help` | boolean | Show this help message and exit | `False` |
| `--dry-run` | boolean | Show what would be deleted without actually deleting anything | `False` |
| `--skip-runs` | boolean | Skip checking for runs with invalid workspaces | `False` |
| `--skip-workers` | boolean | Skip checking for orphaned workers (both logical and physical) | `False` |
| `--skip-logical-workers` | boolean | Skip checking for orphaned logical workers | `False` |
| `--skip-physical-workers` | boolean | Skip checking for orphaned physical workers | `False` |
| `--skip-studies` | boolean | Skip checking for empty studies | `False` |
| `-f`, `--force` | boolean | Skip confirmation prompt (use with caution) | `False` |

**Examples:**

Expand Down
92 changes: 88 additions & 4 deletions docs/user_guide/database/garbage_collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,49 @@ The garbage collection process follows a cascading cleanup approach:

This cascading approach ensures that removing a run with a missing workspace automatically cleans up its dependent entities.

### Distributed Filesystem Awareness

!!! info "Multi-Machine Considerations"

Merlin is designed to work across distributed computing environments where different machines may have access to different filesystems. The garbage collector intelligently handles this by:

- **Detecting accessible mount points**: Automatically identifies which filesystems are mounted on the current machine
- **Skipping inaccessible workspaces**: Won't flag workspaces as invalid if they're on filesystems not mounted on the current machine
- **Logging inaccessible workspaces**: Provides informative warnings about workspaces that couldn't be verified

When garbage collection runs, it checks whether each workspace is on a filesystem that's accessible from the current machine:

- **Accessible workspaces** (e.g., on `/p/lustre3` when that filesystem is mounted): Checked for existence and flagged as invalid if missing
- **Inaccessible workspaces** (e.g., on `/p/lustre3` when that filesystem is NOT mounted): Skipped with a warning, not flagged as invalid
- **Local workspaces** (e.g., on root filesystem like `/tmp`): Always checked if they exist locally

??? example "Example: Running Garbage Collection on Different Machines"

**Scenario**: You have workspaces on `/p/lustre3` (shared filesystem).

**Machine A** (has `/p/lustre3` mounted):

```bash
$ merlin database gc --dry-run
[INFO] [GARBAGE COLLECTOR] Checking run workspaces for validity...
[INFO] [GARBAGE COLLECTOR] Found 2 runs with invalid workspaces.
...
```

**Machine B** (does NOT have `/p/lustre3` mounted):

```bash
$ merlin database gc --dry-run
[INFO] [GARBAGE COLLECTOR] Checking run workspaces for validity...
[INFO] [GARBAGE COLLECTOR] Found 0 runs with invalid workspaces.
[WARNING] [GARBAGE COLLECTOR] Found 5 runs with workspaces on file systems not
accessible from the current host 'machine-b'. Run garbage collection
from a machine with access to verify these.
...
```

The 5 workspaces on `/p/lustre3` are safely skipped on Machine B because that filesystem isn't mounted there. Run garbage collection from Machine A to properly verify those workspaces.

## Basic Usage

To preview what would be removed without actually deleting anything:
Expand All @@ -44,6 +87,7 @@ This displays a detailed report showing:
- Which runs have missing workspaces
- Which workers would be removed due to having no valid runs
- Which studies would be removed due to having no remaining runs
- Which workspaces are on filesystems not accessible from the current machine (for informational purposes)

??? example "Example Output for Dry Run"

Expand All @@ -57,6 +101,9 @@ This displays a detailed report showing:
[2025-10-08 10:27:31: INFO] Fetching all runs from Redis...
[2025-10-08 10:27:31: INFO] Successfully retrieved 2 runs from Redis.
[2025-10-08 10:27:31: INFO] [GARBAGE COLLECTOR] Found 1 runs with invalid workspaces.
[2025-10-08 10:27:31: WARNING] [GARBAGE COLLECTOR] Found 1 runs with workspaces on file systems not
accessible from the current host 'machine-a'. Run garbage collection
from a machine with access to verify these.
[2025-10-08 10:27:31: INFO] [GARBAGE COLLECTOR] Checking for orphaned logical workers...
[2025-10-08 10:27:31: INFO] Fetching all runs from Redis...
[2025-10-08 10:27:31: INFO] Successfully retrieved 2 runs from Redis.
Expand All @@ -82,16 +129,30 @@ This displays a detailed report showing:
============================================================

Invalid Runs: 1
- /path/to/hello_samples_20251008-102348
- /path/to/hello_samples_20251008-102348

Inaccessible Workspaces: 1
- /p/lustre3/other_workspace_20251008-101234
(on filesystem not accessible from current host)

Orphaned Logical Workers: 1
- hello_samples_worker (queues: [merlin]_step_1_queue, [merlin]_step_2_queue)
- hello_samples_worker (queues: [merlin]_step_1_queue, [merlin]_step_2_queue)

Orphaned Physical Workers: 1
- celery@hello_samples_worker.%dane13 (host: dane13)
- celery@hello_samples_worker.%dane13 (host: dane13)

Empty Studies: 1
- hello_samples
- hello_samples

============================================================

Potentially Inaccessible Runs: 1
- /p/lustre3/other_workspace_20251008-101234

You may need to re-run garbage collection on a machine that
can access these runs, or remove them manually if they are
local runs being flagged as inaccessible.

============================================================
```

Expand Down Expand Up @@ -150,6 +211,11 @@ Orphaned Physical Workers: 1

Empty Studies: 1
- hello_samples

============================================================

Potentially Inaccessible Runs: 0

============================================================
[2025-10-08 10:32:47: WARNING] [GARBAGE COLLECTOR] WARNING: This will permanently delete stale database entries. Run with --dry-run first to see what would be deleted.

Expand Down Expand Up @@ -204,10 +270,28 @@ merlin database gc --skip-runs --dry-run
merlin database gc --skip-workers --skip-studies
```

## Best Practices for Distributed Environments

When working across multiple machines with different filesystem access:

1. **Run from a machine with broad filesystem access**: For the most comprehensive cleanup, run garbage collection from a machine that has access to the most access shared file systems.

2. **Use --dry-run first**: Always preview what will be cleaned up before running the actual cleanup:

```bash
merlin database gc --dry-run
```

3. **Check inaccessible workspace warnings**: Pay attention to warnings about inaccessible workspaces. If you see many of these, you may need to run garbage collection from a different machine or manually delete certain runs.

4. **Understand your filesystem topology**: Know which filesystems are shared (e.g., `/p/lustre3`) vs. local (e.g., `/tmp`, `/home`) in your environment.

## Limitations

The garbage collection process:

- Only checks for missing workspace directories (it does not validate workspace contents)
- Does not clean up data in the workspace directories themselves (only database entries)
- Requires read access to the filesystem paths referenced in run entries
- **Cannot verify workspaces on filesystems that are not mounted on the current machine**: Workspaces on inaccessible filesystems are conservatively skipped to avoid incorrectly flagging valid workspaces as stale
- **Does not distinguish between truly deleted workspaces and temporarily inaccessible ones**: If a shared filesystem is temporarily unmounted or experiencing issues, those workspaces will be skipped but not flagged as invalid
112 changes: 107 additions & 5 deletions merlin/db_scripts/garbage_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@

import logging
import os
from typing import Dict, List
import socket
import textwrap
from pathlib import Path
from typing import Dict, List, Union

from merlin.db_scripts.entities.db_entity import DatabaseEntity
from merlin.db_scripts.merlin_db import MerlinDatabase
from merlin.exceptions import RunNotFoundError, StudyNotFoundError, WorkerNotFoundError
from merlin.utils import get_plural_of_entity
from merlin.utils import get_accessible_mounts, get_plural_of_entity


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -85,7 +88,13 @@ def __init__(self, merlin_db: MerlinDatabase = None):
merlin_db: Optional MerlinDatabase instance. Creates one if not provided.
"""
self.merlin_db = merlin_db or MerlinDatabase()
self._issues: Dict[str, List[DatabaseEntity]] = {"run": [], "logical_worker": [], "physical_worker": [], "study": []}
self._issues: Dict[str, List[DatabaseEntity]] = {
"run": [],
"logical_worker": [],
"physical_worker": [],
"study": [],
"inaccessible_runs": [],
}

def _prompt_for_confirmation(self) -> bool:
"""
Expand All @@ -105,6 +114,44 @@ def _prompt_for_confirmation(self) -> bool:
LOG.debug(f"[GARBAGE COLLECTOR] response: {response}")
return response in ["yes", "y"]

def _is_workspace_on_accessible_mount(self, workspace: Union[str, Path]) -> bool:
"""
Check if a workspace path is on an accessible mount point (excluding root).

This purposefully does NOT include '/' as an accessible mount point. We do this
since every workspace is relative to the root filesystem, and we want to
specifically check for other mounted filesystems that may not be accessible.

Args:
workspace: The workspace path to check.

Returns:
True if workspace is on an accessible mount (excluding root), False otherwise.
"""
if not isinstance(workspace, Path):
workspace = Path(workspace)

accessible_mounts = get_accessible_mounts(exclude_root=True)
workspace_path = workspace.resolve()

# Check if workspace path starts with any accessible mount
for mount in sorted(accessible_mounts, key=lambda p: len(str(p)), reverse=True):
# Sort by length (longest first) to match most specific mount point
try:
workspace_path.relative_to(mount)
# If we get here, workspace is under this mount
return True
except ValueError:
# Not relative to this mount, continue checking
continue

# If we didn't find any matching mount, it's not accessible or it's on the root filesystem
LOG.warning(
f"[GARBAGE COLLECTOR] Workspace '{workspace}' is either on the root filesystem or does not exist "
f"in a valid mounted file system on current host '{socket.gethostname()}'."
)
return False

def check_run_workspaces(self):
"""
Check all runs for valid workspace directories.
Expand All @@ -115,13 +162,47 @@ def check_run_workspaces(self):
LOG.info("[GARBAGE COLLECTOR] Checking run workspaces for validity...")

all_runs = self.merlin_db.runs.get_all()

for run in all_runs:
workspace = run.get_workspace()
if not os.path.exists(workspace):
LOG.debug(f"[GARBAGE COLLECTOR] Run {run.get_id()} has invalid workspace: {workspace}")

# Check if workspace is on an accessible NON-ROOT mount (e.g., a network filesystem).
# This will be False for workspaces on the local root filesystem.
is_accessible_mount = self._is_workspace_on_accessible_mount(workspace)

# Check if the workspace physically exists on the current host.
workspace_exists = os.path.exists(workspace)

if is_accessible_mount and not workspace_exists:
# Case 1: Workspace is on an accessible NON-ROOT mount (e.g., /mnt/nfs) but is missing.
# This is an invalid workspace issue.
LOG.debug(f"[GARBAGE COLLECTOR] Run {run.get_id()} has invalid workspace on an accessible mount: {workspace}")
self._issues["run"].append(run)

elif not is_accessible_mount and not workspace_exists:
# Case 2: Workspace is NOT on a non-root accessible mount AND does not physically exist on current host.
# This indicates the workspace is likely on an inaccessible mount *or* it was a local
# workspace that was deleted, but we treat this as *potentially* inaccessible
# to avoid premature deletion of runs accessible from another host.
LOG.debug(
f"[GARBAGE COLLECTOR] Run '{run.get_id()}' has workspace on potentially inaccessible mount "
f"and does not exist: {workspace}"
)
self._issues["inaccessible_runs"].append(run)

# Case 3: Workspace is NOT on a non-root accessible mount, but DOES exist.
# This is the expected state for a valid workspace on the local root filesystem.
# No action needed, it's considered valid.

# Case 4: is_accessible_mount and workspace_exists: Valid non-root workspace. No action.

LOG.info(f"[GARBAGE COLLECTOR] Found {len(self._issues['run'])} runs with invalid workspaces.")
if self._issues["inaccessible_runs"]:
LOG.warning(
f"[GARBAGE COLLECTOR] Found {len(self._issues['inaccessible_runs'])} runs with workspaces "
f"on file systems not accessible from the current host '{socket.gethostname()}'. Run garbage "
"collection from a machine with access to verify these."
)

def check_orphaned_logical_workers(self):
"""
Expand Down Expand Up @@ -306,6 +387,27 @@ def generate_report(self) -> str:
for study in self._issues["study"]:
report_lines.append(f" - {study.get_name()}")

report_lines.append("")

report_lines.append("=" * 60)

report_lines.append("")

# Potentially Inaccessible Runs section
report_lines.append(f"Potentially Inaccessible Runs: {len(self._issues['inaccessible_runs'])}")
if self._issues["inaccessible_runs"]:
for run in self._issues["inaccessible_runs"]:
report_lines.append(f" - {run.get_workspace()}")
report_lines.append("")
inaccessible_message = (
"You may need to re-run garbage collection on a machine that can access these runs, "
"or remove them manually if they are local runs being flagged as inaccessible."
)
wrapped_message = textwrap.fill(inaccessible_message, width=60)
report_lines.append(wrapped_message)

report_lines.append("")

report_lines.append("=" * 60)

return "\n".join(report_lines)
Expand Down
35 changes: 34 additions & 1 deletion merlin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from contextlib import contextmanager
from copy import deepcopy
from datetime import datetime, timedelta
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Callable, Dict, Generator, List, Tuple, Union
from typing import Any, Callable, Dict, Generator, List, Set, Tuple, Union

import numpy as np
import pkg_resources
Expand Down Expand Up @@ -465,6 +466,11 @@ def load_array_file(filename: str, ndmin: int = 2) -> np.ndarray:
return array


#################################
# File system utility functions #
#################################


def determine_protocol(fname: str) -> str:
"""
Determine the file protocol based on the file name extension.
Expand Down Expand Up @@ -537,6 +543,33 @@ def verify_dirpath(dirpath: str) -> str:
return dirpath


def get_accessible_mounts(exclude_root: bool = False) -> Set[Path]:
"""
Get set of mount points that are actually accessible on this machine.

By default, this likely includes the root filesystem (e.g., '/' or 'C:\\') as an
accessible mount point. You may want to exclude it if you are specifically
interested in other mounted filesystems.

Args:
exclude_root: If True, exclude the root filesystem from the list of
accessible mounts.

Returns:
Set of accessible mount point paths.
"""
accessible = set()

for part in psutil.disk_partitions(all=True):
mountpoint = Path(part.mountpoint)
if exclude_root and mountpoint == Path(mountpoint.anchor):
continue
accessible.add(mountpoint)

LOG.debug(f"Accessible mounts on {socket.gethostname()}: {accessible}")
return accessible


@contextmanager
def cd(path: str) -> Generator[None, None, None]: # pylint: disable=C0103
"""
Expand Down
Loading
Loading