Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dvc/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
move,
params,
plots,
purge,
queue,
remote,
remove,
Expand Down Expand Up @@ -90,6 +91,7 @@
move,
params,
plots,
purge,
queue,
remote,
remove,
Expand Down
99 changes: 99 additions & 0 deletions dvc/commands/purge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os

from dvc.cli import formatter
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.log import logger
from dvc.ui import ui

logger = logger.getChild(__name__)


class CmdPurge(CmdBase):
def run(self):
if not self.args.dry_run:
msg = "This will permanently remove local DVC-tracked outputs "
else:
msg = "This will show what local DVC-tracked outputs would be removed "
if self.args.targets:
msg += "for the following targets:\n - " + "\n - ".join(
[os.path.abspath(t) for t in self.args.targets]
)
else:
msg += "for the entire workspace."

if self.args.recursive:
msg += "\nRecursive purge is enabled."

if self.args.dry_run:
msg += "\n(dry-run: showing what would be removed, no changes)."

logger.warning(msg)

if (
not self.args.force
and not self.args.dry_run
and not self.args.yes
and not ui.confirm("Are you sure you want to proceed?")
):
return 1

# Call repo API
self.repo.purge(
targets=self.args.targets,
recursive=self.args.recursive,
force=self.args.force,
dry_run=self.args.dry_run,
)
return 0


def add_parser(subparsers, parent_parser):
PURGE_HELP = "Remove tracked outputs and their cache."
PURGE_DESCRIPTION = (
"Removes cache objects and workspace copies of DVC-tracked outputs.\n"
"Metadata remains intact, and non-DVC files are untouched."
)
purge_parser = subparsers.add_parser(
"purge",
parents=[parent_parser],
description=append_doc_link(PURGE_DESCRIPTION, "purge"),
help=PURGE_HELP,
formatter_class=formatter.RawDescriptionHelpFormatter,
)

purge_parser.add_argument(
"targets",
nargs="*",
help="Optional list of files/directories to purge (default: entire repo).",
)
purge_parser.add_argument(
"-r",
"--recursive",
action="store_true",
default=False,
help="Recursively purge directories.",
)
purge_parser.add_argument(
"--dry-run",
dest="dry_run",
action="store_true",
default=False,
help="Only print what would be removed without actually removing.",
)
purge_parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="Force purge, bypassing safety checks and prompts.",
)
purge_parser.add_argument(
"-y",
"--yes",
action="store_true",
default=False,
help="Do not prompt for confirmation (respects safety checks).",
)

purge_parser.set_defaults(func=CmdPurge)
1 change: 1 addition & 0 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class Repo:
from dvc.repo.ls_url import ls_url as _ls_url # type: ignore[misc]
from dvc.repo.move import move # type: ignore[misc]
from dvc.repo.pull import pull # type: ignore[misc]
from dvc.repo.purge import purge # type: ignore[misc]
from dvc.repo.push import push # type: ignore[misc]
from dvc.repo.remove import remove # type: ignore[misc]
from dvc.repo.reproduce import reproduce # type: ignore[misc]
Expand Down
161 changes: 161 additions & 0 deletions dvc/repo/purge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from typing import TYPE_CHECKING, Optional

from dvc.config import NoRemoteError, RemoteNotFoundError
from dvc.exceptions import DvcException
from dvc.log import logger

from . import locked

if TYPE_CHECKING:
from dvc.output import Output
from dvc.repo import Repo

logger = logger.getChild(__name__)


class PurgeError(DvcException):
"""Raised when purge fails due to safety or internal errors."""


def _flatten_stages_or_outs(items) -> list["Output"]:
"""Normalize collect() results into a flat list of Output objects."""
outs = []
for item in items:
if isinstance(item, list):
outs.extend(_flatten_stages_or_outs(item))
elif hasattr(item, "outs"): # Stage
outs.extend(item.outs)
elif hasattr(item, "use_cache"): # Already an Output
outs.append(item)
else:
logger.debug("Skipping non-stage item in collect(): %r", item)
return outs


def _check_dirty(outs, force: bool) -> None:
dirty = [o for o in outs if o.use_cache and o.changed()]
if dirty and not force:
raise PurgeError(
"Some tracked outputs have uncommitted changes. "
"Use `--force` to purge anyway.\n - "
+ "\n - ".join(str(o) for o in dirty)
)


def _get_remote_odb(repo: "Repo"):
try:
return repo.cloud.get_remote_odb(None)
except (RemoteNotFoundError, NoRemoteError):
return None


def _check_remote_backup(repo: "Repo", outs, force: bool) -> None:
remote_odb = _get_remote_odb(repo)

if not remote_odb:
if not force:
raise PurgeError(
"No default remote configured. "
"Cannot safely purge outputs without verifying remote backup.\n"
"Use `--force` to purge anyway."
)
logger.warning(
"No default remote configured. Proceeding with purge due to --force. "
"Outputs may be permanently lost."
)
return

# remote exists, check objects
not_in_remote = [
str(o)
for o in outs
if o.use_cache
and o.hash_info
and o.hash_info.value
and not remote_odb.exists(o.hash_info.value)
]
if not_in_remote and not force:
raise PurgeError(
"Some outputs are not present in the remote cache and would be "
"permanently lost if purged:\n - "
+ "\n - ".join(not_in_remote)
+ "\nUse `--force` to purge anyway."
)
if not_in_remote and force:
logger.warning(
"Some outputs are not present in the remote cache and may be "
"permanently lost:\n - %s",
"\n - ".join(not_in_remote),
)


def _remove_outs(outs, dry_run: bool) -> int:
removed = 0
for out in outs:
if dry_run:
logger.info("[dry-run] Would remove %s", out)
continue

try:
# remove workspace file
if out.exists:
out.remove(ignore_remove=False)

# remove cache entry
if out.use_cache and out.hash_info:
cache_path = out.cache.oid_to_path(out.hash_info.value)
if out.cache.fs.exists(cache_path):
out.cache.fs.remove(cache_path, recursive=True)

removed += 1
except Exception:
logger.exception("Failed to remove %s", out)
return removed


@locked
def purge(
self: "Repo",
targets: Optional[list[str]] = None,
recursive: bool = False,
force: bool = False,
dry_run: bool = False,
) -> int:
"""
Purge removes local copies of DVC-tracked outputs and their cache.

- Collects outs from .dvc files and dvc.yaml.
- Ensures safety (no dirty outs unless --force).
- Ensures outputs are backed up to remote (unless --force).
- Removes both workspace copies and cache objects.
- Metadata remains intact.
"""
from dvc.repo.collect import collect
from dvc.stage.exceptions import StageFileDoesNotExistError

try:
items = (
collect(self, targets=targets, recursive=recursive)
if targets
else list(self.index.stages)
)
except StageFileDoesNotExistError as e:
raise PurgeError(str(e)) from e

outs = _flatten_stages_or_outs(items)
if not outs:
logger.info("No DVC-tracked outputs found to purge.")
return 0

# Run safety checks
_check_dirty(outs, force)
_check_remote_backup(self, outs, force)

# Remove outs
removed = _remove_outs(outs, dry_run)

if removed:
logger.info("Removed %d outputs (workspace + cache).", removed)
else:
logger.info("Nothing to purge.")
return 0
Loading