https://prefect.io logo
Title
c

Chris Goddard

04/26/2023, 9:18 PM
Has anyone developed a good pattern for concurrent subflow execution that doesn’t require dynamically registering a bunch of different subflows with unique names? I’m using the approach of a function that returns a flow - giving it a unique name each time - but it means that I end up with potentially hundreds of registered subflows, some of which may never need to be run again.
z

Zanie

04/26/2023, 9:29 PM
Why do you need to give them unique names? For
run_deployment
?
c

Chris Goddard

04/26/2023, 9:35 PM
so this is related to https://discourse.prefect.io/t/how-can-i-run-multiple-subflows-or-child-flows-in-parallel/96/13 I’m using something like the
build_subflow
given here rather than a deployment. I attempted to use a deployment based solution (still running locally) but performance was extremely slow - I’m still wrapping my head around deployments but I think it was copying files on every subflow invocation. I’d ideally love this to mostly live in a self-contained script of defining flows/subflows and then triggering the main flow which would spawn parameterized runs of the subflows (which could in-turn spawn additional subflow runs). right now I only have a single subflow which I’m trying to invoke multiple times concurrently
z

Zanie

04/26/2023, 9:47 PM
And you’re unwilling / able to use
async
flows?
c

Chris Goddard

04/26/2023, 9:47 PM
they’re all async flows - but you can’t run two async flows with the same name concurrently
z

Zanie

04/26/2023, 9:47 PM
Sure you can 🙂
Can you show me a simple example?
c

Chris Goddard

04/26/2023, 9:48 PM
sure, give me a min
z

Zanie

04/26/2023, 9:50 PM
Here’s like.. my dumbest example
from prefect import flow, task
from copy import deepcopy
import asyncio


@task
async def test_task():
    return 1


@flow(log_prints=True)
async def my_child(i):
    assert await test_task() == 1
    print(i)


@flow
async def my_parent():
    coros = [deepcopy(my_child)(i) for i in range(10)]
    await asyncio.gather(*coros)


asyncio.run(my_parent())
c

Chris Goddard

04/26/2023, 9:53 PM
ah - I remember how I got on this path. I had used deepcopy before - but I wanted the subflow to leverage the dask task executor with a local cluster and as I recall deepcopy wasn’t working with that executor attached
I’m trying to find my previous version to get the example
I essentially want a local dask cluster with n single-threaded workers each running an async flow
z

Zanie

04/26/2023, 10:01 PM
You want your subflows to be submitted to Dask?
c

Chris Goddard

04/26/2023, 10:03 PM
I believe so - I guess the way I’ve been doing it, the subflows are executed in the main thread and then the tasks within each subflow are submitted to dask
I may be approaching this the wrong way though
z

Zanie

04/26/2023, 10:08 PM
That’s totally reasonable
So.. at https://github.com/PrefectHQ/prefect/pull/9342 I’m solving the need to perform a copy
but otherwise you should totally be able to just reuse the Dask cluster for each child
c

Chris Goddard

04/26/2023, 10:10 PM
yeah I think what I may have been doing wrong was in how I was spinning up the dask cluster within the same script. I can’t remember the exact error I was seeing now.
solving the duplication issue will be great regardless though!
z

Zanie

04/26/2023, 10:20 PM
from prefect import flow, task
import asyncio
from prefect_dask import DaskTaskRunner
from prefect.context import get_run_context


@task
async def test_task():
    return 1


@flow(log_prints=True)
async def my_child(i):
    assert await test_task() == 1
    print(i)


@flow(task_runner=DaskTaskRunner())
async def my_parent():
    child_task_runner = DaskTaskRunner(
        address=get_run_context().task_runner._client.scheduler_info()["address"]
    )
    coros = [my_child.with_options(task_runner=child_task_runner)(i) for i in range(5)]
    await asyncio.gather(*coros)


if __name__ == "__main__":
    asyncio.run(my_parent())
If you want to share a dynamic dask cluster
c

Chris Goddard

04/26/2023, 10:21 PM
ohh - that’s exactly what I needed
get_run_context().task_runner._client.scheduler_info()
It was how I was sharing the scheduler address after creating it in the main script that was causing issues
z

Zanie

04/26/2023, 10:22 PM
👍 you’ll probably want to do that instantiation within the loop rather than beforehand until my duplication fix goes out
c

Chris Goddard

04/26/2023, 10:23 PM
awesome - that’s really helpful - thanks
yup - that works perfectly - thank you!
z

Zanie

04/26/2023, 10:47 PM
👍 wonderful