Assaf Pinhasi
07/03/2023, 12:01 PMDaniel
07/04/2023, 9:30 PMfrom distributed import LocalCluster
from prefect import flow
from prefect_dask import DaskTaskRunner
@flow
def subflow():
pass
@flow
def parent_flow(cluster_address):
subflow.with_options(task_runner=DaskTaskRunner(cluster_address))()
@flow
def outer_flow():
cluster = LocalCluster()
parent_flow.with_options(
task_runner=DaskTaskRunner(cluster.scheduler_address)
)(cluster.scheduler_address)
if __name__ == "__main__":
outer_flow()