Hello all, I recently started experimenting with P...
# prefect-community
a
Hello all, I recently started experimenting with Prefect for one of my projects. I would like to ask whether it is possible to use the asynchronous functionality provided by dask; there are some tasks that involve IO output and as such they could be executed asynchronously (I am using the latest release of Prefect 2.0).
1
a
I would be curious to hear more about your use case - what are you trying to do? Not to be against Dask, but Dask introduces a lot of overhead. In contrast, the
ConcurrentTaskRunner
(which is the default one in Prefect 2.0) is super lightweight and perfectly capable of solving the use cases you would otherwise have to delegate to Dask
a
Hi Anna, thanks for the reply. I would like to offload the computation to a cluster hence the need for a distributed framework like dask. I tried setting asynchronous keyword to initialize dask's async mode yet to now avail; I got a response that prefect manages that
a
I would encourage you to give it a try with
ConcurrentTaskRunner
first, then for benchmarking and comparison, perhaps try running DaskTaskRunner in various configurations using threads, e.g.
Copy code
@flow(task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=4, processes=False)))
Are you processing massive scale IO operations? I will ask our expert on that to check and help you better here but would be great to find out about your use case first
a
I will give it a try, however, it is a strong requirement that the work is distributed across multiple nodes. The involved tasks are very computationally expensive and hence have to be split to ensure efficient resource usage.
a
how do you configure your Dask cluster/Dask operation to leverage this async functionality when you don't use Prefect?
a
I use asyncio/uvloop 🙂
a
Can you share some code you tried so far? Are you at a point where you have something working with plain Dask and you try to translate it to the task runner logic? or have you purely used asyncio/uvloop on a single machine and you try to distribute the work across a cluster now?
trying to gather as much information about your use case and the problem you are facing before communicating your request to someone who can help better than I can
a
for now I have tried asyncio/uvloop on a single machine to see how it scales
a
Do you have some code you can share? I would pass it to Michael to have a look at whether this can run on a remote Dask cluster with
DaskTaskRunner
a
I have no actual code to share, just experimenting with mock-up code for now.
a
can you share this mockup code then? I asked Michael, maybe he can respond later, running async operations across nodes is beyond my understanding of what async is for
z
When we submit task orchestration to Dask, we’re always submitting an async function. Dask runs these on the worker’s event loop. If your task is async, it will be executed on the same event loop. If it is sync, we will create a thread to run it in to avoid blocking the worker’s event loop.
🙏 2
a
Hi Michael, many thanks for your reply; do I have to use gather inside the flow to ensure concorrency or just plain await each task? I still cannot make dask workers operate asynchronously.
I will try to post a small example later
this finishes in ~25 sec as expected (using the default runner):
import asyncio
from prefect import flow, task
@task()
async def mockup(idx):
print(f"Doing some work {idx=}")
await asyncio.sleep(25)
print(f"Finished work {idx=}")
@flow()
async def mockup_flow():
print("Starting flow")
futures = [mockup(i) for i in range(20)]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(mockup_flow())
However, using Dask, the same operations take ~125 sec to complete. i.e. 20/num_workers*25, where num_workers=4:
import asyncio
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task()
async def mockup(idx):
print(f"Doing some work {idx=}")
await asyncio.sleep(25)
print(f"Finished work {idx=}")
@flow(task_runner=DaskTaskRunner())
async def mockup_flow():
print("Starting flow")
futures = [mockup(i) for i in range(20)]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(mockup_flow())
a
I think this is Dask's overhead but Michael may have a better response
also cc @Kevin Kho since he knows more about Dask in general
a
@Anna Geller, thanks for the reply; I doubt it's just overhead, it seems that there is no concurrency at all within a worker.
z
You should await the call to
mockup
as that is awaiting submission to the worker, not the result of your task.
🙏 1
I’m not sure why it wouldn’t run concurrently on the Dask workers as written though
If you use synchronous tasks with a sleep does Dask run them concurrently?
a
Hi @Zanie, thanks for yoru reply. It does not work asynchronously no matter what I try. Should we open an issue on github?
a
@Apostolos Papafragkakis it definitely doesn't hurt to create a GitHub issue - if anything, writing it all down in a structured way may even help you identify the root cause of the problem. If possible, please try to include all the steps you have taken so far to fix the issue and try to identify what's the actual problem here