From 00c63dd21c8d7920c33c89c040dd6f2ead69f261 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 10 May 2022 16:11:37 +0100 Subject: [PATCH 1/5] Release 2022.05.0 From 01a5d9e3c9f61c99b3e56d59b53af35ad2e7f87b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 11 May 2022 10:34:05 +0100 Subject: [PATCH 2/5] Allow users to use lifecycle methods both sync and async --- dask_ctl/asyncio.py | 6 ++++ dask_ctl/lifecycle.py | 80 ++++++++++++++++++++++++++++++------------- docs/api.rst | 19 ++++++++++ 3 files changed, 81 insertions(+), 24 deletions(-) create mode 100644 dask_ctl/asyncio.py diff --git a/dask_ctl/asyncio.py b/dask_ctl/asyncio.py new file mode 100644 index 0000000..b490cda --- /dev/null +++ b/dask_ctl/asyncio.py @@ -0,0 +1,6 @@ +from .lifecycle import _create_cluster as create_cluster # noqa +from .lifecycle import _list_clusters as list_clusters # noqa +from .lifecycle import _get_cluster as get_cluster # noqa +from .lifecycle import _get_snippet as get_snippet # noqa +from .lifecycle import _scale_cluster as scale_cluster # noqa +from .lifecycle import _delete_cluster as delete_cluster # noqa diff --git a/dask_ctl/lifecycle.py b/dask_ctl/lifecycle.py index fbec23a..d1bb89d 100644 --- a/dask_ctl/lifecycle.py +++ b/dask_ctl/lifecycle.py @@ -38,18 +38,22 @@ def create_cluster(spec_path: str) -> Cluster: """ - async def _create_cluster(): - cm_module, cm_class, args, kwargs = load_spec(spec_path) - module = importlib.import_module(cm_module) - cluster_manager = getattr(module, cm_class) + return loop.run_sync(_create_cluster, spec_path) - kwargs = {key.replace("-", "_"): entry for key, entry in kwargs.items()} - cluster = await cluster_manager(*args, **kwargs, asynchronous=True) - cluster.shutdown_on_close = False - return cluster +async def _create_cluster(spec_path: str) -> Cluster: + cm_module, cm_class, args, kwargs = load_spec(spec_path) + module = importlib.import_module(cm_module) + cluster_manager = getattr(module, cm_class) - return loop.run_sync(_create_cluster) + kwargs = {key.replace("-", "_"): entry for key, entry in kwargs.items()} + + cluster = await cluster_manager(*args, **kwargs, asynchronous=True) + cluster.shutdown_on_close = False + return cluster + + +_create_cluster.__doc__ = create_cluster.__doc__ def list_clusters() -> List[Cluster]: @@ -71,15 +75,19 @@ def list_clusters() -> List[Cluster]: """ - async def _list_clusters(): - clusters = [] - async for cluster in discover_clusters(): - clusters.append(cluster) - return clusters - return loop.run_sync(_list_clusters) +async def _list_clusters() -> List[Cluster]: + clusters = [] + async for cluster in discover_clusters(): + clusters.append(cluster) + return clusters + + +_list_clusters.__doc__ = list_clusters.__doc__ + + def get_cluster(name: str) -> Cluster: """Get a cluster by name. @@ -102,13 +110,17 @@ def get_cluster(name: str) -> Cluster: """ - async def _get_cluster(): - async for cluster_name, cluster_class in discover_cluster_names(): - if cluster_name == name: - return cluster_class.from_name(name) - raise RuntimeError("No such cluster %s", name) + return loop.run_sync(_get_cluster, name) + + +async def _get_cluster(name: str) -> Cluster: + async for cluster_name, cluster_class in discover_cluster_names(): + if cluster_name == name: + return cluster_class.from_name(name) + raise RuntimeError("No such cluster %s", name) - return loop.run_sync(_get_cluster) + +_get_cluster.__doc__ = get_cluster.__doc__ def get_snippet(name: str) -> str: @@ -136,8 +148,11 @@ def get_snippet(name: str) -> str: client = Client(cluster) """ + return loop.run_sync(_get_snippet, name) + - cluster = get_cluster(name) +async def _get_snippet(name: str) -> str: + cluster = await _get_cluster(name) try: return cluster.get_snippet() except AttributeError: @@ -148,6 +163,9 @@ def get_snippet(name: str) -> str: ) +_get_snippet.__doc__ = get_snippet.__doc__ + + def scale_cluster(name: str, n_workers: int) -> None: """Scale a cluster by name. @@ -166,8 +184,15 @@ def scale_cluster(name: str, n_workers: int) -> None: >>> scale_cluster("mycluster", 10) # doctest: +SKIP """ + return loop.run_sync(_scale_cluster, name, n_workers) + - return get_cluster(name).scale(n_workers) +async def _scale_cluster(name: str, n_workers: int) -> None: + cluster = await _get_cluster(name) + return await cluster.scale(n_workers) + + +_scale_cluster.__doc__ = scale_cluster.__doc__ def delete_cluster(name: str) -> None: @@ -186,5 +211,12 @@ def delete_cluster(name: str) -> None: >>> delete_cluster("mycluster") # doctest: +SKIP """ + return loop.run_sync(_delete_cluster, name) + + +async def _delete_cluster(name: str) -> None: + cluster = await _get_cluster(name) + return await cluster.close() + - return get_cluster(name).close() +_delete_cluster.__doc__ = _delete_cluster.__doc__ diff --git a/docs/api.rst b/docs/api.rst index 45331e3..d7b256e 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -6,6 +6,25 @@ Python API Lifecycle --------- +Dask Control has a selection of lifecycle functions that can be used within Python to manage +your Dask clusters. You can list clusters, get instances of an existing cluster, create new ones, scale and delete them. + +You can either use these in a regular synchronous way by importing them from ``dask_ctl``. + +.. code-block:: python + + from dask_ctl import list_clusters + + clusters = list_clusters() + +Or alternatively you can use them in async code by importing from the ``dask_ctl.asyncio`` submodule. + +.. code-block:: python + + from dask_ctl.asyncio import list_clusters + + clusters = await list_clusters() + .. autosummary:: get_cluster create_cluster From 800e1d16071b916ac70cdec36af8ef1d77e3d690 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 11 May 2022 11:16:34 +0100 Subject: [PATCH 3/5] Remove tornado --- dask_ctl/cli.py | 8 ++++---- dask_ctl/lifecycle.py | 14 +++++++------- dask_ctl/utils.py | 12 +++++++----- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/dask_ctl/cli.py b/dask_ctl/cli.py index 021e610..37fe7bf 100644 --- a/dask_ctl/cli.py +++ b/dask_ctl/cli.py @@ -13,7 +13,7 @@ from distributed.core import Status from . import __version__ -from .utils import loop +from .utils import run_sync from .discovery import ( discover_clusters, discover_cluster_names, @@ -38,7 +38,7 @@ async def _autocomplete_cluster_names(): if incomplete in cluster ] - return loop.run_sync(_autocomplete_cluster_names) + return run_sync(_autocomplete_cluster_names) @click.group() @@ -144,7 +144,7 @@ async def _list(): console.print(table) - loop.run_sync(_list) + run_sync(_list) @cluster.command() @@ -292,7 +292,7 @@ async def _list_discovery(): ) console.print(table) - loop.run_sync(_list_discovery) + run_sync(_list_discovery) @discovery.command(name="enable") diff --git a/dask_ctl/lifecycle.py b/dask_ctl/lifecycle.py index d1bb89d..1c0e750 100644 --- a/dask_ctl/lifecycle.py +++ b/dask_ctl/lifecycle.py @@ -6,7 +6,7 @@ from distributed.deploy.cluster import Cluster from .discovery import discover_cluster_names, discover_clusters from .spec import load_spec -from .utils import loop +from .utils import run_sync def create_cluster(spec_path: str) -> Cluster: @@ -38,7 +38,7 @@ def create_cluster(spec_path: str) -> Cluster: """ - return loop.run_sync(_create_cluster, spec_path) + return run_sync(_create_cluster, spec_path) async def _create_cluster(spec_path: str) -> Cluster: @@ -75,7 +75,7 @@ def list_clusters() -> List[Cluster]: """ - return loop.run_sync(_list_clusters) + return run_sync(_list_clusters) async def _list_clusters() -> List[Cluster]: @@ -110,7 +110,7 @@ def get_cluster(name: str) -> Cluster: """ - return loop.run_sync(_get_cluster, name) + return run_sync(_get_cluster, name) async def _get_cluster(name: str) -> Cluster: @@ -148,7 +148,7 @@ def get_snippet(name: str) -> str: client = Client(cluster) """ - return loop.run_sync(_get_snippet, name) + return run_sync(_get_snippet, name) async def _get_snippet(name: str) -> str: @@ -184,7 +184,7 @@ def scale_cluster(name: str, n_workers: int) -> None: >>> scale_cluster("mycluster", 10) # doctest: +SKIP """ - return loop.run_sync(_scale_cluster, name, n_workers) + return run_sync(_scale_cluster, name, n_workers) async def _scale_cluster(name: str, n_workers: int) -> None: @@ -211,7 +211,7 @@ def delete_cluster(name: str) -> None: >>> delete_cluster("mycluster") # doctest: +SKIP """ - return loop.run_sync(_delete_cluster, name) + return run_sync(_delete_cluster, name) async def _delete_cluster(name: str) -> None: diff --git a/dask_ctl/utils.py b/dask_ctl/utils.py index 379ed94..9914792 100644 --- a/dask_ctl/utils.py +++ b/dask_ctl/utils.py @@ -1,11 +1,13 @@ import asyncio -from tornado.ioloop import IOLoop -from distributed.cli.utils import install_signal_handlers - -loop = IOLoop.current() -install_signal_handlers(loop) +def run_sync(f, *args, **kwargs): + loop = asyncio.get_event_loop() + try: + return loop.run_until_complete(f(*args, **kwargs)) + except RuntimeError: + f = asyncio.run_coroutine_threadsafe(f(*args, **kwargs), loop) + return f.result() class _AsyncTimedIterator: From 83ab7f796ac7e2ca823e1643dd5337a017cd690f Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 19 May 2022 09:44:02 +0100 Subject: [PATCH 4/5] Update run_sync with suggestion from @graingert --- dask_ctl/utils.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/dask_ctl/utils.py b/dask_ctl/utils.py index 9914792..b8f9a10 100644 --- a/dask_ctl/utils.py +++ b/dask_ctl/utils.py @@ -1,13 +1,22 @@ import asyncio +import concurrent.futures def run_sync(f, *args, **kwargs): - loop = asyncio.get_event_loop() + async def canary(): + """An empty coroutine to check if we are inside an event loop""" + pass + try: - return loop.run_until_complete(f(*args, **kwargs)) + asyncio.run(canary()) except RuntimeError: - f = asyncio.run_coroutine_threadsafe(f(*args, **kwargs), loop) - return f.result() + # event loop is already running and not running with jupyter (eg nest-asyncio) + pass + else: + return asyncio.run(f(*args, **kwargs)) + + with concurrent.futures.ThreadPoolExecutor(1) as tpe: + return tpe.submit(asyncio.run, f(*args, **kwargs)).result() class _AsyncTimedIterator: From db5a6917ff8ff3186678184154da7eb58a52c36b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 19 May 2022 09:45:59 +0100 Subject: [PATCH 5/5] Remove unused async wrapper --- dask_ctl/cli.py | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/dask_ctl/cli.py b/dask_ctl/cli.py index 37fe7bf..4f5fe35 100644 --- a/dask_ctl/cli.py +++ b/dask_ctl/cli.py @@ -273,26 +273,22 @@ def list_discovery(): methods registered on your system. """ - - async def _list_discovery(): - table = Table(box=box.SIMPLE) - table.add_column("Name", style="cyan", no_wrap=True) - table.add_column("Package", justify="right", style="magenta") - table.add_column("Version", style="green") - table.add_column("Path", style="yellow") - table.add_column("Enabled", justify="right", style="green") - - for method_name, method in list_discovery_methods().items(): - table.add_row( - method_name, - method["package"], - method["version"], - method["path"], - ":heavy_check_mark:" if method["enabled"] else ":cross_mark:", - ) - console.print(table) - - run_sync(_list_discovery) + table = Table(box=box.SIMPLE) + table.add_column("Name", style="cyan", no_wrap=True) + table.add_column("Package", justify="right", style="magenta") + table.add_column("Version", style="green") + table.add_column("Path", style="yellow") + table.add_column("Enabled", justify="right", style="green") + + for method_name, method in list_discovery_methods().items(): + table.add_row( + method_name, + method["package"], + method["version"], + method["path"], + ":heavy_check_mark:" if method["enabled"] else ":cross_mark:", + ) + console.print(table) @discovery.command(name="enable")