Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
remove unstable code (#24)
Browse files Browse the repository at this point in the history
* remove unstable code

* use dask without znflow

* update readme

* bump version

* remove znflow import
  • Loading branch information
PythonFZ authored Apr 19, 2023
1 parent 437fefd commit 556449c
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 596 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
[DVC](dvc.org) provides tools for building and executing the computational graph locally through various methods.
The `dask4dvc` package combines [Dask Distributed](https://distributed.dask.org/) with DVC to make it easier to use with HPC managers like [Slurm](https://github.com/SchedMD/slurm).

The `dask4dvc` package will try to run the DVC graph in parallel.

## Usage
Dask4DVC provides a CLI similar to DVC.

- `dvc repro` becomes `dask4dvc repro`.
- `dvc exp run --run-all` becomes `dask4dvc run`.

### SLURM Cluster

Expand All @@ -33,3 +34,13 @@ cluster.adapt()
```

with this setup you can then run `dask4dvc repro --address 127.0.0.1:31415` on the example port `31415`.

You can also use config files with `dask4dvc repro --config myconfig.yaml`.

```yaml
default:
SGECluster:
queue: regular
cores: 10
memory: 16 GB
```
137 changes: 17 additions & 120 deletions dask4dvc/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import importlib.metadata
import logging
import shutil
import subprocess
import typing

import dask.distributed
import typer
Expand All @@ -28,42 +25,23 @@ class Help:
"Split the DVC Graph into individual Nodes and run them in parallel if possible."
)
leave: str = "Ask before stopping the client"
option: str = (
"Additional options to pass to 'dvc repro'. E.g. '--option=--force"
" --option=--downstream'. Notice that some options like '--force' might show"
" unexpected behavior."
)
target: str = "Names of the stage to reproduce. Leave empty to run all stages."
detach: str = (
"Run the process in detached mode (Ctrl + C will not close 'dask4dvc' in the"
" background)."
)
config: str = "path to config file, e.g. 'dask4dvc.yaml'"
retries: str = "Number of retries to acquire dvc lock."
max_workers: str = (
"Maximum number of workers to use. Using '1' will be the same as 'dvc repro' but"
" slower."
)


@app.command()
def repro(
address: str = typer.Option(None, help=Help.address),
option: typing.List[str] = typer.Option(None, help=Help.option),
leave: bool = typer.Option(True, help=Help.leave),
detach: bool = typer.Option(False, "--detach", "-d", help=Help.detach),
config: str = typer.Option(None, help=Help.config),
target: list[str] = typer.Argument(None, help=Help.target, show_default=False),
parallel: bool = typer.Option(True, help=Help.parallel),
max_workers: int = typer.Option(None, help="Maximum number of workers to use."),
retries: int = typer.Option(10, help="Number of retries for each stage."),
max_workers: int = typer.Option(None, help=Help.max_workers),
retries: int = typer.Option(10, help=Help.retries),
) -> None:
"""Replicate 'dvc repro' command using dask."""
if detach:
cmd = ["dask4dvc", "repro"]
if address is not None:
cmd += ["--address", address]
if config is not None:
cmd += ["--config", config]
_ = subprocess.Popen(cmd, start_new_session=True)
# TODO add all kwargs!
return

utils.CONFIG.retries = retries

if config is not None:
Expand All @@ -74,102 +52,21 @@ def repro(
if max_workers is not None:
client.cluster.adapt(minimum=1, maximum=max_workers)
log.info(client)
if parallel:
result = methods.parallel_submit(client)
else:
result = client.submit(
utils.dvc.repro, targets=target, options=option, pure=False
)

utils.dask.wait_for_futures(result)
results = methods.parallel_submit(client)

utils.dask.wait_for_futures(results)
if not leave:
utils.main.wait()


@app.command()
def run(
address: str = typer.Option(None, help=Help.address),
option: typing.List[str] = typer.Option(None, help=Help.option),
leave: bool = typer.Option(True, help=Help.leave),
load: bool = typer.Option(
True,
help=(
"Use 'dvc exp run' to load the experiments from run cache. If this option is"
" not selected, the experiments will only be available through the run cache"
" and the queue will not be cleared. Do not use with 'always_changed = True'."
),
),
delete: typing.List[str] = typer.Option(
["branches", "temp"],
"-D",
"--delete",
help="Remove the temporary branches and directories",
),
detach: bool = typer.Option(False, "--detach", "-d", help=Help.detach),
config: str = typer.Option(None, help=Help.config),
) -> None:
"""Replicate 'dvc exp run --run-all' command using dask.
This will run the available experiments in parallel using dask.
When finished, it will load the experiments using 'dvc exp run --run-all'.
"""
if config is not None:
assert address is None, "Can not use address and config file"
address = utils.dask.get_cluster_from_config(config)

if detach:
cmd = ["dask4dvc", "run"]
if address is not None:
cmd += ["--address", address]
if config is not None:
cmd += ["--config", config]
# TODO add all kwargs!
_ = subprocess.Popen(cmd, start_new_session=True)
return

with methods.get_experiment_repos(delete=delete) as repos:
with dask.distributed.Client(address) as client:
log.info(client)
results = {
name: client.submit(
utils.dvc.repro,
options=option,
cwd=repo.working_dir,
pure=False,
key=f"repro_{name[4:]}", # cut the 'tmp_' in front
)
for name, repo in repos.items()
}

utils.dask.wait_for_futures(results)
if load:
run_all = client.submit(
utils.dvc.exp_run_all, n_jobs=len(results), pure=False, deps=results
)
utils.dask.wait_for_futures(run_all)

if not leave:
utils.main.wait()


@app.command()
def clean(
branches: bool = typer.Option(
False,
help=(
"Remove all branches created by 'dask4dvc' / all branches starting with"
" 'tmp_'."
),
),
temp: bool = typer.Option(
False, help="Remove all temporary clones by removing the '.dask4dvc' directory."
),
) -> None:
"""Helpers to clean up 'dask4dvc' if something went wrong."""
if branches:
utils.git.remove_tmp_branches()
if temp:
shutil.rmtree(".dask4dvc/", ignore_errors=True)
def run() -> None:
"""Run DVC experiments in parallel using dask."""
typer.echo(
"'dask4dvc run' was removed due to instability. Latest version including this"
" feature is 'pip install dask4dvc==0.1.4'."
)
raise typer.Exit()


def version_callback(value: bool) -> None:
Expand Down
115 changes: 6 additions & 109 deletions dask4dvc/methods.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Some general 'dask4dvc' methods."""

import contextlib
import pathlib
import typing
import logging

Expand All @@ -12,10 +10,6 @@
import dvc.utils.strictyaml
import dvc.stage
from dvc.stage.cache import RunCacheNotFoundError
import git
import tqdm
import typer
import znflow
import random
import time
import subprocess
Expand All @@ -25,98 +19,6 @@
log = logging.getLogger(__name__)


@utils.main.timeit
def _exp_branch(queued_experiments: dict) -> list:
"""Create a branch for every experiment."""
repo_names = []
for exp, name in tqdm.tqdm(
queued_experiments.items(),
ncols=100,
disable=len(queued_experiments) < utils.CONFIG.tqdm_threshold,
desc="dvc exp branch",
):
name = f"tmp_{exp[:7]}" if name is None else f"tmp_{name}"
repo_names.append(name)
utils.dvc.exp_branch(experiment=exp, branch=name)
return repo_names


@utils.main.timeit
def _clone_branch(repo_names: list) -> typing.Dict[str, git.Repo]:
"""Make a clone of every branch to a temporary directory."""
temp_dir = pathlib.Path(".dask4dvc")
return {
name: git.Repo.clone_from(url=".", to_path=temp_dir / name, branch=name)
for name in repo_names
}


@utils.main.timeit
def _update_run_cache(repos: typing.List[git.Repo]) -> None:
"""Update the run cache for the given repos.
Because it is using '--local' in a new repository,
we replace 'dvc cache dir --local <path>' with writing it directly.
"""
config_file = pathlib.Path(".dvc/config.local")

for repo in repos:
cache_dir = pathlib.Path.cwd().resolve() / ".dvc" / "cache"
with open(repo.working_dir / config_file, "a") as file:
file.write("[cache]\n")
file.write(f"\t dir = {cache_dir}\n")


@contextlib.contextmanager
def get_experiment_repos(delete: list) -> typing.Dict[str, git.Repo]:
"""Prepare DVC experiments for parallel execution.
This contextmanager does:
1. Get the queued experiments.
2. Promote them to branches.
3. Create clones.
4. Set dvc cache.
and then finishes by:
1. removing the temporary branches.
2. removing the temporary clones.
Parameters
----------
delete: list[str]
remove the "branches" and "temp" afterwards.
Yields
------
dict[str, Repo]: temporary repositories set up for running 'dvc repro' with
a shared run cache.
Raises
------
typer.Exit: if no queued experiments are available.
"""
if utils.git.update_gitignore(ignore=".dask4dvc/"):
raise ValueError("'.gitignore' file was updated. Please commit changes.")

queued_experiments = utils.main.timeit(utils.dvc.exp_show_queued)()

if len(queued_experiments) == 0:
typer.echo("Skipping: no experiments were found in the queue.")
raise typer.Exit()

repo_names = _exp_branch(queued_experiments)
repos = _clone_branch(repo_names)
_update_run_cache(list(repos.values()))

try:
yield repos
finally:
if "branches" in delete:
git.Repo(".").delete_head(*list(repos), force=True)
if "temp" in delete:
utils.main.remove_paths([clone.working_dir for clone in repos.values()])


def _run_locked_cmd(
repo: dvc.repo.Repo, func: typing.Callable, *args: tuple, **kwargs: dict
) -> typing.Any:
Expand Down Expand Up @@ -151,7 +53,7 @@ def _run_locked_cmd(

def _load_run_cache(repo: dvc.repo.Repo, stage: dvc.stage.Stage) -> None:
"""Load the run cache for the given stage.
Raises
------
RunCacheNotFoundError:
Expand All @@ -166,7 +68,6 @@ def _load_run_cache(repo: dvc.repo.Repo, stage: dvc.stage.Stage) -> None:
)


@znflow.nodify
def submit_stage(name: str, successors: list) -> str:
"""Submit a stage to the Dask cluster."""
repo = dvc.repo.Repo()
Expand Down Expand Up @@ -201,20 +102,16 @@ def parallel_submit(
client: dask.distributed.Client,
) -> typing.Dict[str, dask.distributed.Future]:
"""Submit all stages to the Dask cluster."""
graph = znflow.DiGraph()
mapping = {}
repo = dvc.repo.Repo()

for node in repo.index.graph.nodes:
successors = [
mapping[successor] for successor in repo.index.graph.successors(node)
]
with graph:
mapping[node] = submit_stage(
node.name,
successors=successors,
)
deployment = znflow.deployment.Deployment(graph=graph, client=client)
deployment.submit_graph()

return deployment.results
mapping[node] = client.submit(
submit_stage, node.name, successors=successors, pure=False
)

return mapping
4 changes: 2 additions & 2 deletions dask4dvc/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Helper functions."""

from dask4dvc.utils import dask, dvc, git, main
from dask4dvc.utils import dask, main
from dask4dvc.utils.config import CONFIG

__all__ = ["dvc", "dask", "git", "main", "CONFIG"]
__all__ = ["dask", "main", "CONFIG"]
8 changes: 2 additions & 6 deletions dask4dvc/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ class Config:
Attributes
----------
tqdm_threshold: int
The minimum number of experiments to show a tqdm bar.
use_dvc_api: bool
Instead of subprocess calls use the internal DVC API.
retries : int
number of retries for acquiring lock
"""

tqdm_threshold: int = 10
use_dvc_api: bool = True
retries: int = 1000


Expand Down
Loading

0 comments on commit 556449c

Please sign in to comment.