Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    Hello all, I recently started experimenting with Prefect for one of my projects. I would like to ask whether it is possible to use the asynchronous functionality provided by dask; there are some tasks that involve IO output and as such they could be executed asynchronously (I am using the latest release of Prefect 2.0).
    Anna Geller

    Anna Geller

    3 months ago
    I would be curious to hear more about your use case - what are you trying to do? Not to be against Dask, but Dask introduces a lot of overhead. In contrast, the
    ConcurrentTaskRunner
    (which is the default one in Prefect 2.0) is super lightweight and perfectly capable of solving the use cases you would otherwise have to delegate to Dask
    Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    Hi Anna, thanks for the reply. I would like to offload the computation to a cluster hence the need for a distributed framework like dask. I tried setting asynchronous keyword to initialize dask's async mode yet to now avail; I got a response that prefect manages that
    Anna Geller

    Anna Geller

    3 months ago
    I would encourage you to give it a try with
    ConcurrentTaskRunner
    first, then for benchmarking and comparison, perhaps try running DaskTaskRunner in various configurations using threads, e.g.
    @flow(task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=4, processes=False)))
    Are you processing massive scale IO operations? I will ask our expert on that to check and help you better here but would be great to find out about your use case first
    Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    I will give it a try, however, it is a strong requirement that the work is distributed across multiple nodes. The involved tasks are very computationally expensive and hence have to be split to ensure efficient resource usage.
    Anna Geller

    Anna Geller

    3 months ago
    how do you configure your Dask cluster/Dask operation to leverage this async functionality when you don't use Prefect?
    Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    I use asyncio/uvloop 🙂
    Anna Geller

    Anna Geller

    3 months ago
    Can you share some code you tried so far? Are you at a point where you have something working with plain Dask and you try to translate it to the task runner logic? or have you purely used asyncio/uvloop on a single machine and you try to distribute the work across a cluster now?
    trying to gather as much information about your use case and the problem you are facing before communicating your request to someone who can help better than I can
    Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    for now I have tried asyncio/uvloop on a single machine to see how it scales
    Anna Geller

    Anna Geller

    3 months ago
    Do you have some code you can share? I would pass it to Michael to have a look at whether this can run on a remote Dask cluster with
    DaskTaskRunner
    Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    I have no actual code to share, just experimenting with mock-up code for now.
    Anna Geller

    Anna Geller

    3 months ago
    can you share this mockup code then? I asked Michael, maybe he can respond later, running async operations across nodes is beyond my understanding of what async is for
    Michael Adkins

    Michael Adkins

    3 months ago
    When we submit task orchestration to Dask, we’re always submitting an async function. Dask runs these on the worker’s event loop. If your task is async, it will be executed on the same event loop. If it is sync, we will create a thread to run it in to avoid blocking the worker’s event loop.
    Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    Hi Michael, many thanks for your reply; do I have to use gather inside the flow to ensure concorrency or just plain await each task? I still cannot make dask workers operate asynchronously.
    I will try to post a small example later
    this finishes in ~25 sec as expected (using the default runner):
    import asyncio
    from prefect import flow, task
    @task()
    async def mockup(idx):
    print(f"Doing some work {idx=}")
    await asyncio.sleep(25)
    print(f"Finished work {idx=}")
    @flow()
    async def mockup_flow():
    print("Starting flow")
    futures = [mockup(i) for i in range(20)]
    await asyncio.gather(*futures)
    if __name__ == "__main__":
    asyncio.run(mockup_flow())
    However, using Dask, the same operations take ~125 sec to complete. i.e. 20/num_workers*25, where num_workers=4:
    import asyncio
    from prefect import flow, task
    from prefect.task_runners import DaskTaskRunner
    @task()
    async def mockup(idx):
    print(f"Doing some work {idx=}")
    await asyncio.sleep(25)
    print(f"Finished work {idx=}")
    @flow(task_runner=DaskTaskRunner())
    async def mockup_flow():
    print("Starting flow")
    futures = [mockup(i) for i in range(20)]
    await asyncio.gather(*futures)
    if __name__ == "__main__":
    asyncio.run(mockup_flow())
    Anna Geller

    Anna Geller

    3 months ago
    I think this is Dask's overhead but Michael may have a better response
    also cc @Kevin Kho since he knows more about Dask in general
    Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    @Anna Geller, thanks for the reply; I doubt it's just overhead, it seems that there is no concurrency at all within a worker.
    Michael Adkins

    Michael Adkins

    3 months ago
    You should await the call to
    mockup
    as that is awaiting submission to the worker, not the result of your task.
    I’m not sure why it wouldn’t run concurrently on the Dask workers as written though
    If you use synchronous tasks with a sleep does Dask run them concurrently?
    Apostolos Papafragkakis

    Apostolos Papafragkakis

    3 months ago
    Hi @Michael Adkins, thanks for yoru reply. It does not work asynchronously no matter what I try. Should we open an issue on github?
    Anna Geller

    Anna Geller

    3 months ago
    @Apostolos Papafragkakis it definitely doesn't hurt to create a GitHub issue - if anything, writing it all down in a structured way may even help you identify the root cause of the problem. If possible, please try to include all the steps you have taken so far to fix the issue and try to identify what's the actual problem here