Noah Holm
01/04/2022, 9:52 AMAnna Geller
Anna Geller
from prefect import flow
from prefect.task_runners import DaskTaskRunner
@flow
def subflow(x):
print(x)
@flow(task_runner=DaskTaskRunner())
def parent():
[subflow(i) for i in range(10)]
if __name__ == "__main__":
parent()
Noah Holm
01/04/2022, 11:07 AMNoah Holm
01/04/2022, 11:17 AM@flow()
def parentflow(runs: int):
for _ in range(runs):
subflow()
parentflow(4)
But what’d be neat is to determine the amount of subflow runs with a preceding task, kind of like this.
@flow()
def parentflow():
runs = determine_run_amount() # this is a task
for _ in range(runs):
subflow()
parentflow()
(but doesn’t work since we can’t range on a PrefectFuture object)Anna Geller
@flow()
def parentflow():
state = determine_run_amount().wait() # this is a task
runs_nr = state.result()
for _ in range(runs_nr):
subflow()
parentflow()
Noah Holm
01/04/2022, 11:27 AMNoah Holm
01/04/2022, 11:28 AMAnna Geller
Marko Mušnjak
01/13/2022, 8:37 AMMarko Mušnjak
01/13/2022, 8:39 AMAnna Geller
Marko Mušnjak
01/13/2022, 10:32 AM