Jai P
03/01/2023, 9:18 PMJai P
03/01/2023, 9:19 PMTaylor Curran
03/01/2023, 10:50 PMTaylor Curran
03/01/2023, 10:51 PM@task
def subflow_wrapping_task():
run_deployment( # returns a FlowRun object
name="process-pokemon-batch/worker",
parameters=dict(pokemon_names=pokemon_names),
)
Jai P
03/02/2023, 3:11 AM@flow
def my_subflow():
...
@flow
async def my_main_flow():
subflows = [
run_sync_in_worker_thread(my_subflow)
for _ in range(100)
]
return await asyncio.gather(*subflows)
if __name__ == "__main__":
asyncio.run(my_main_flow())
but i run into
RuntimeError: The task runner is already started!
which i can get around by `deepcopy`ing the my_subflow
call:
subflows = [
run_sync_in_worker_thread(deepcopy(my_subflow))
for _ in range(100)
]
but that comes at the cost of (i believe) many many task runners being started up, which i would guess is unnecessary cost when i could just re-use a single task runnerVishnu Duggirala
03/21/2023, 4:26 PM