Hi folks! Question…is there a way to use the same ...
# ask-community
j
Hi folks! Question…is there a way to use the same task runner when creating multiple of the same subflow? I’m also curious if this is a possible performance improvement…I’m guessing there is at least some cost to use a new task runner, so if I can use one that was already created, that would be great
Kind of related to the entire set of issues related to this one https://github.com/PrefectHQ/prefect/issues/7319
t
Hi Jai, you can get subflows to behave like tasks by wrapping the run_deployment command in a a task like so: https://github.com/PrefectHQ/prefect-recipes/blob/542e340fa47258b2931479be257ee9d71a4621ae/flows-advanced/parent-orchestrator/pokemon_weight.py#L59
Copy code
@task
def subflow_wrapping_task():
            run_deployment(  # returns a FlowRun object
                name="process-pokemon-batch/worker",
                parameters=dict(pokemon_names=pokemon_names),
            )
j
Hey @Taylor Curran, thanks for the info. The thing is, I want to run my subflows on the same infrastructure. i'm already using different infrastructure to run many of the parent flow in parallel, the subflows don't need their own infra to run on. What i've been doing so far is:
Copy code
@flow
def my_subflow():
    ...


@flow
async def my_main_flow():
    subflows = [
        run_sync_in_worker_thread(my_subflow)
        for _ in range(100)
    ]

    return await asyncio.gather(*subflows)


if __name__ == "__main__":
    asyncio.run(my_main_flow())
but i run into
Copy code
RuntimeError: The task runner is already started!
which i can get around by `deepcopy`ing the
my_subflow
call:
Copy code
subflows = [
        run_sync_in_worker_thread(deepcopy(my_subflow))
        for _ in range(100)
    ]
but that comes at the cost of (i believe) many many task runners being started up, which i would guess is unnecessary cost when i could just re-use a single task runner
v
@Jai P I tired using deep copy but the issue still persists.