https://prefect.io logo
Title
a

Apostolos Papafragkakis

06/09/2022, 12:58 PM
Hello all, I recently started experimenting with Prefect for one of my projects. I would like to ask whether it is possible to use the asynchronous functionality provided by dask; there are some tasks that involve IO output and as such they could be executed asynchronously (I am using the latest release of Prefect 2.0).
âś… 1
a

Anna Geller

06/09/2022, 1:14 PM
I would be curious to hear more about your use case - what are you trying to do? Not to be against Dask, but Dask introduces a lot of overhead. In contrast, the
ConcurrentTaskRunner
(which is the default one in Prefect 2.0) is super lightweight and perfectly capable of solving the use cases you would otherwise have to delegate to Dask
a

Apostolos Papafragkakis

06/09/2022, 1:16 PM
Hi Anna, thanks for the reply. I would like to offload the computation to a cluster hence the need for a distributed framework like dask. I tried setting asynchronous keyword to initialize dask's async mode yet to now avail; I got a response that prefect manages that
a

Anna Geller

06/09/2022, 1:17 PM
I would encourage you to give it a try with
ConcurrentTaskRunner
first, then for benchmarking and comparison, perhaps try running DaskTaskRunner in various configurations using threads, e.g.
@flow(task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=4, processes=False)))
Are you processing massive scale IO operations? I will ask our expert on that to check and help you better here but would be great to find out about your use case first
a

Apostolos Papafragkakis

06/09/2022, 1:19 PM
I will give it a try, however, it is a strong requirement that the work is distributed across multiple nodes. The involved tasks are very computationally expensive and hence have to be split to ensure efficient resource usage.
a

Anna Geller

06/09/2022, 1:20 PM
how do you configure your Dask cluster/Dask operation to leverage this async functionality when you don't use Prefect?
a

Apostolos Papafragkakis

06/09/2022, 1:20 PM
I use asyncio/uvloop 🙂
a

Anna Geller

06/09/2022, 1:22 PM
Can you share some code you tried so far? Are you at a point where you have something working with plain Dask and you try to translate it to the task runner logic? or have you purely used asyncio/uvloop on a single machine and you try to distribute the work across a cluster now?
trying to gather as much information about your use case and the problem you are facing before communicating your request to someone who can help better than I can
a

Apostolos Papafragkakis

06/09/2022, 1:23 PM
for now I have tried asyncio/uvloop on a single machine to see how it scales
a

Anna Geller

06/09/2022, 1:28 PM
Do you have some code you can share? I would pass it to Michael to have a look at whether this can run on a remote Dask cluster with
DaskTaskRunner
a

Apostolos Papafragkakis

06/09/2022, 1:50 PM
I have no actual code to share, just experimenting with mock-up code for now.
a

Anna Geller

06/09/2022, 2:04 PM
can you share this mockup code then? I asked Michael, maybe he can respond later, running async operations across nodes is beyond my understanding of what async is for
z

Zanie

06/09/2022, 2:55 PM
When we submit task orchestration to Dask, we’re always submitting an async function. Dask runs these on the worker’s event loop. If your task is async, it will be executed on the same event loop. If it is sync, we will create a thread to run it in to avoid blocking the worker’s event loop.
:thank-you: 2
a

Apostolos Papafragkakis

06/10/2022, 9:22 AM
Hi Michael, many thanks for your reply; do I have to use gather inside the flow to ensure concorrency or just plain await each task? I still cannot make dask workers operate asynchronously.
I will try to post a small example later
this finishes in ~25 sec as expected (using the default runner):
import asyncio
from prefect import flow, task
@task()
async def mockup(idx):
print(f"Doing some work {idx=}")
await asyncio.sleep(25)
print(f"Finished work {idx=}")
@flow()
async def mockup_flow():
print("Starting flow")
futures = [mockup(i) for i in range(20)]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(mockup_flow())
However, using Dask, the same operations take ~125 sec to complete. i.e. 20/num_workers*25, where num_workers=4:
import asyncio
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task()
async def mockup(idx):
print(f"Doing some work {idx=}")
await asyncio.sleep(25)
print(f"Finished work {idx=}")
@flow(task_runner=DaskTaskRunner())
async def mockup_flow():
print("Starting flow")
futures = [mockup(i) for i in range(20)]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(mockup_flow())
a

Anna Geller

06/10/2022, 12:07 PM
I think this is Dask's overhead but Michael may have a better response
also cc @Kevin Kho since he knows more about Dask in general
a

Apostolos Papafragkakis

06/10/2022, 12:10 PM
@Anna Geller, thanks for the reply; I doubt it's just overhead, it seems that there is no concurrency at all within a worker.
z

Zanie

06/10/2022, 3:48 PM
You should await the call to
mockup
as that is awaiting submission to the worker, not the result of your task.
:thank-you: 1
I’m not sure why it wouldn’t run concurrently on the Dask workers as written though
If you use synchronous tasks with a sleep does Dask run them concurrently?
a

Apostolos Papafragkakis

06/14/2022, 7:04 AM
Hi @Zanie, thanks for yoru reply. It does not work asynchronously no matter what I try. Should we open an issue on github?
a

Anna Geller

06/14/2022, 12:02 PM
@Apostolos Papafragkakis it definitely doesn't hurt to create a GitHub issue - if anything, writing it all down in a structured way may even help you identify the root cause of the problem. If possible, please try to include all the steps you have taken so far to fix the issue and try to identify what's the actual problem here