Is it possible to run dask code within prefect tas...
# prefect-community
Is it possible to run dask code within prefect task? Is there a way to get submit work to the dask cluster that runs my flow (KubeCluster to LocalDaskCluster)? I am looking for a way to make my core functionality rely on dask and still be able to leverage the orchestration through prefect...
That's definitely possible. I realized this is a common question so I wrote this more extensive Discourse topic to explain it in more detail for you and for posterity: In the section Prefect 1.0, there is an explanation of two ways you can use Prefect and Dask together: 1. Dask executor + mapping 2. Dask cluster set up with Prefect resource manager There is also this comparison of LocalDaskExecutor vs. DaskExecutor linked there: And for your second question regarding
, check out this thread: LMK if you still have any questions about that, happy to help more.
🔥 1
👋 1
🧐 1
Also see this
👀 1
I was just coming here to reply that the links @Anna Geller sent are too high-level and do not give me the ability to "highjack" the dask cluster workers but then I saw the link @Kevin Kho shared with seems more like what I was trying to do. If I understand this correctly, it means I can start a flow on top of dask executor (my aim is to run it on k8s) and then submit additional tasks to the same cluster for execution. Am I correct? Do you know if there are implications to doing this? How would doing this (high-jacking the dask cluster) compares to using orion in order to do arbitrary tasks?
Why do you think resource manager is too high level? 🙂 you can think of it as a “normal” context manager. The issue that Kevin shared is showing the same approach but without the benefits of a Prefect’s resource manager:
Copy code
def compute_describe(df):
    with worker_client():
        return df.describe().compute()
Copy code
# use ResourceManager object
   with DaskCluster(
           ) as client:
       push_events = get_github_data(filenames)
       df = to_dataframe(push_events)
Ah, I guess I missed that 🙈
👍 1
What are the advantages to use the resource manager here? In the example Kevin sent, by using worker_client, I am getting the dask cluster that the flow is running on, right? With the resource manager example - does it not create an "standalone"/"additional" cluster (depending of the flow runs in dask executor)
It depends on how you configure the cluster - it could either create a new one or use a standalone cluster. Either way, resource manager helps you manage the resources so that if something fails in your code, the cleanup method is executed rather than leaving some resources unhandled (exiting without closing DB connections, HTTP sessions, cluster resources, etc.) - check out this docs and this blog post for more
If I understand correctly, assuming I use dask cluster over k8s (KubernetesRun? KubeCluster?), using resource manager similar to the example you sent, will not give access to the same ad-hoc cluster created for the job. In the example @Kevin Kho shared, assuming the task is already running on some dask cluster, it will give access to the same cluster the task is running on. Am I correct?
Maybe it would help if you build an example and check for yourself? the resource manager is an abstraction to clean up some resources. To be honest, if you already have a long running cluster on K8, you don't need that
I am trying to use
with Orion and I am getting an error - not sure whether it should even work. Here is what I tried:
Copy code
from dask.distributed import Client, worker_client
from prefect import task, flow
def my_task(*args, **kwargs):
    with worker_client() as client:
        fut = client.submit(add, 30)
        return fut.result()

@flow(name="My Example Flow",
def my_flow(*args, **kwargs):
    # run parallel tasks and subflows with Dask
    return my_task()
Then, running this (all in ipython), I am getting this output:
Copy code
In [37]: state = my_flow()
10:01:41.229 | INFO    | prefect.engine - Created flow run 'cherubic-bloodhound' for flow 'My Example Flow'
10:01:41.229 | INFO    | Flow run 'cherubic-bloodhound' - Using task runner 'DaskTaskRunner'
10:01:41.230 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
10:01:42.100 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <>
10:01:42.174 | INFO    | Flow run 'cherubic-bloodhound' - Created task run 'my_task-ec9685da-0' for task 'my_task'
10:01:44.861 | ERROR   | Task run 'my_task-ec9685da-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/prefect/", line 703, in orchestrate_task_run
    result = await run_sync_in_worker_thread(task.fn, *args, **kwargs)
  File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/prefect/utilities/", line 51, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(, call, cancellable=True)
  File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/anyio/", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/anyio/_backends/", line 818, in run_sync_in_worker_thread
    return await future
  File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/anyio/_backends/", line 754, in run
    result =, *args)
  File "<ipython-input-35-75f9eaa5372f>", line 4, in my_task
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/", line 119, in __enter__
    return next(self.gen)
  File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/distributed/", line 55, in worker_client
    duration = time() - thread_state.start_time
AttributeError: '_thread._local' object has no attribute 'start_time'
/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/prefect/ UserWarning: No default storage has been set on the server. Using temporary local storage for results.
10:01:44.901 | ERROR   | Task run 'my_task-ec9685da-0' - Finished in state Failed('Task run encountered an exception.')
/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/prefect/ UserWarning: No default storage has been set on the server. Using temporary local storage for results.
10:01:45.354 | ERROR   | Flow run 'cherubic-bloodhound' - Finished in state Failed('1/1 states failed.')
Am I doing something wrong, or this is not possible in orion?
Thanks for the traceback. Will have to ask an engineer. I’m not sure if this is supported yet.
The plan is that Orion will support all the features of 1.0? I am specifically wondering about results / artifacts - the team I work with really liked it
Results I am sure. Artifact might be further out but let’s wait for someone else to chime in
We haven’t tested it, but there’s no reason it shouldn’t be working. Looks like it’s upset about retrieving the client in a worker thread, which we can disable when running sync tasks with the Dask task runner.
If you switch to an async task definition, I presume it’ll just work.
We’ll add unit tests for this and make sure it works eventually.
🙏 1
Generally I would like to keep away from async as much as possible as it kind of confused the team (mixed skill levels). I will give it a try though. Thanks
That’s fair 🙂 you can actually use async tasks from a sync flow without changing the call so it’d just be a change to the task function definition.
It may be naughty to run blocking code in the task though since I think it’ll run on Dask’s event loop.
Anyway, we’ll fix this in one of the next couple releases.