Skip to content

Conversation

@jacobtomlinson
Copy link
Contributor

Dask Control has a selection of lifecycle functions that can be used within Python to manage
your Dask clusters. You can list clusters, get instances of an existing cluster, create new ones, scale and delete them.

Currently these methods are implemented as async closures within a sync function that starts the loop. This PR breaks things out so that users can choose to import the async method directly from the dask_ctl.asyncio submodule or choose to use the sync version from dask_ctl.

Sync

from dask_ctl import list_clusters

clusters = list_clusters()

Async

 from dask_ctl.asyncio import list_clusters

clusters = await list_clusters()

@jacobtomlinson
Copy link
Contributor Author

I really appreciate the review here @graingert.

Perhaps it is worth taking a step back and talking about goals. Much of this work is pushing my asyncio knowledge to the limit, and it's a moving target that I struggle to keep up with at the best of times.

The goal is for the Python API in dask-ctl to be implemented as async functions first. Most of which can be imported from dask_ctl.asyncio. But I also want sync implementations to exist, these will be the default and imported directly from dask_ctl.

To avoid code duplication I'm trying to just wrap the async functions in a sync function that starts a loop and runs them. I'm also conscious that folks may use the sync functions within async environments (interactively in IPython is the simplest example) so the sync methods need to also behave nicely when the loop is already running. To handle this I made a little dask_ctl.utils.run_sync function to try and work around this, but it feels hacky and as you say isn't thread-safe.

If you or other folks who know more about asyncio have suggestions on good ways to implement this I'd be really keen to hear and to increase my understanding.

@graingert
Copy link

graingert commented May 18, 2022

How about something like this?

async def canary():
    pass

try:
    asyncio.run(canary())
except RuntimeError:
    # event loop is already running and not running with jupyter (eg nest-asyncio)
    pass
else:
    return asyncio.run(async_fn())

with concurrent.futures.ThreadPoolExecutor(1) as tpe:
    return tpe.submit(asyncio.run, async_fn()).result()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants