Chris Goddard
04/26/2023, 9:18 PMZanie
04/26/2023, 9:29 PMrun_deployment
?Chris Goddard
04/26/2023, 9:35 PMbuild_subflow
given here rather than a deployment.
I attempted to use a deployment based solution (still running locally) but performance was extremely slow - I’m still wrapping my head around deployments but I think it was copying files on every subflow invocation.
I’d ideally love this to mostly live in a self-contained script of defining flows/subflows and then triggering the main flow which would spawn parameterized runs of the subflows (which could in-turn spawn additional subflow runs). right now I only have a single subflow which I’m trying to invoke multiple times concurrentlyZanie
04/26/2023, 9:47 PMasync
flows?Chris Goddard
04/26/2023, 9:47 PMZanie
04/26/2023, 9:47 PMChris Goddard
04/26/2023, 9:48 PMZanie
04/26/2023, 9:50 PMfrom prefect import flow, task
from copy import deepcopy
import asyncio
@task
async def test_task():
return 1
@flow(log_prints=True)
async def my_child(i):
assert await test_task() == 1
print(i)
@flow
async def my_parent():
coros = [deepcopy(my_child)(i) for i in range(10)]
await asyncio.gather(*coros)
asyncio.run(my_parent())
Chris Goddard
04/26/2023, 9:53 PMZanie
04/26/2023, 10:01 PMChris Goddard
04/26/2023, 10:03 PMZanie
04/26/2023, 10:08 PMChris Goddard
04/26/2023, 10:10 PMZanie
04/26/2023, 10:20 PMfrom prefect import flow, task
import asyncio
from prefect_dask import DaskTaskRunner
from prefect.context import get_run_context
@task
async def test_task():
return 1
@flow(log_prints=True)
async def my_child(i):
assert await test_task() == 1
print(i)
@flow(task_runner=DaskTaskRunner())
async def my_parent():
child_task_runner = DaskTaskRunner(
address=get_run_context().task_runner._client.scheduler_info()["address"]
)
coros = [my_child.with_options(task_runner=child_task_runner)(i) for i in range(5)]
await asyncio.gather(*coros)
if __name__ == "__main__":
asyncio.run(my_parent())
Chris Goddard
04/26/2023, 10:21 PMget_run_context().task_runner._client.scheduler_info()
Zanie
04/26/2023, 10:22 PMChris Goddard
04/26/2023, 10:23 PMZanie
04/26/2023, 10:47 PM