Scott Aefsky
02/22/2022, 6:57 PMKevin Kho
from prefect import Flow
from prefect.executors import DaskExecutor
def dynamic_executor():
    from distributed import LocalCluster
    # could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
    return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
with Flow("example", executor=DaskExecutor(cluster_class=dynamic_executor)) as flow:
    flow.add_task(Parameter("n_workers", default=5))Scott Aefsky
02/22/2022, 7:00 PMScott Aefsky
02/22/2022, 8:46 PMwith Flow("example", executor=DaskExecutor()) as flow:
    passwith Flow("example") as flow:
    pass
flow.executor = DaskExecutor()Scott Aefsky
02/22/2022, 8:46 PMKevin Kho
Scott Aefsky
02/22/2022, 8:48 PMScott Aefsky
02/23/2022, 3:22 PMdef dynamic_executor():
    if prefect.context.parameters["bulk_run"] == 'true':
        return FargateCluster(
            image= ECR_IMAGE, 
            n_workers= 5, # Must specify n_workers
            cluster_arn= fr"arn:aws:ecs:us-east-1:{account}:cluster/PrefectCluster",
            task_role_arn= fr"arn:aws:iam::{account}:role/prefect-ecs-task-role",
            vpc= vpc,
            subnets= [subnet],
            security_groups= [sg], 
        )
    else:
        return LocalCluster()Kevin Kho
