Has anyone developed a good pattern for concurrent...
# ask-community
c
Has anyone developed a good pattern for concurrent subflow execution that doesn’t require dynamically registering a bunch of different subflows with unique names? I’m using the approach of a function that returns a flow - giving it a unique name each time - but it means that I end up with potentially hundreds of registered subflows, some of which may never need to be run again.
z
Why do you need to give them unique names? For
run_deployment
?
c
so this is related to https://discourse.prefect.io/t/how-can-i-run-multiple-subflows-or-child-flows-in-parallel/96/13 I’m using something like the
build_subflow
given here rather than a deployment. I attempted to use a deployment based solution (still running locally) but performance was extremely slow - I’m still wrapping my head around deployments but I think it was copying files on every subflow invocation. I’d ideally love this to mostly live in a self-contained script of defining flows/subflows and then triggering the main flow which would spawn parameterized runs of the subflows (which could in-turn spawn additional subflow runs). right now I only have a single subflow which I’m trying to invoke multiple times concurrently
z
And you’re unwilling / able to use
async
flows?
c
they’re all async flows - but you can’t run two async flows with the same name concurrently
z
Sure you can 🙂
Can you show me a simple example?
c
sure, give me a min
z
Here’s like.. my dumbest example
Copy code
from prefect import flow, task
from copy import deepcopy
import asyncio


@task
async def test_task():
    return 1


@flow(log_prints=True)
async def my_child(i):
    assert await test_task() == 1
    print(i)


@flow
async def my_parent():
    coros = [deepcopy(my_child)(i) for i in range(10)]
    await asyncio.gather(*coros)


asyncio.run(my_parent())
c
ah - I remember how I got on this path. I had used deepcopy before - but I wanted the subflow to leverage the dask task executor with a local cluster and as I recall deepcopy wasn’t working with that executor attached
I’m trying to find my previous version to get the example
I essentially want a local dask cluster with n single-threaded workers each running an async flow
z
You want your subflows to be submitted to Dask?
c
I believe so - I guess the way I’ve been doing it, the subflows are executed in the main thread and then the tasks within each subflow are submitted to dask
I may be approaching this the wrong way though
z
That’s totally reasonable
So.. at https://github.com/PrefectHQ/prefect/pull/9342 I’m solving the need to perform a copy
but otherwise you should totally be able to just reuse the Dask cluster for each child
c
yeah I think what I may have been doing wrong was in how I was spinning up the dask cluster within the same script. I can’t remember the exact error I was seeing now.
solving the duplication issue will be great regardless though!
z
Copy code
from prefect import flow, task
import asyncio
from prefect_dask import DaskTaskRunner
from prefect.context import get_run_context


@task
async def test_task():
    return 1


@flow(log_prints=True)
async def my_child(i):
    assert await test_task() == 1
    print(i)


@flow(task_runner=DaskTaskRunner())
async def my_parent():
    child_task_runner = DaskTaskRunner(
        address=get_run_context().task_runner._client.scheduler_info()["address"]
    )
    coros = [my_child.with_options(task_runner=child_task_runner)(i) for i in range(5)]
    await asyncio.gather(*coros)


if __name__ == "__main__":
    asyncio.run(my_parent())
If you want to share a dynamic dask cluster
c
ohh - that’s exactly what I needed
Copy code
get_run_context().task_runner._client.scheduler_info()
It was how I was sharing the scheduler address after creating it in the main script that was causing issues
z
👍 you’ll probably want to do that instantiation within the loop rather than beforehand until my duplication fix goes out
c
awesome - that’s really helpful - thanks
yup - that works perfectly - thank you!
z
👍 wonderful