Scott Aefsky
02/22/2022, 6:57 PMKevin Kho
from prefect import Flow
from prefect.executors import DaskExecutor
def dynamic_executor():
from distributed import LocalCluster
# could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
with Flow("example", executor=DaskExecutor(cluster_class=dynamic_executor)) as flow:
flow.add_task(Parameter("n_workers", default=5))
Pass in a parameter to set the number of workers the first time you run and then set the default to a lower value?Scott Aefsky
02/22/2022, 7:00 PMwith Flow("example", executor=DaskExecutor()) as flow:
pass
and
with Flow("example") as flow:
pass
flow.executor = DaskExecutor()
?Kevin Kho
Scott Aefsky
02/22/2022, 8:48 PMdef dynamic_executor():
if prefect.context.parameters["bulk_run"] == 'true':
return FargateCluster(
image= ECR_IMAGE,
n_workers= 5, # Must specify n_workers
cluster_arn= fr"arn:aws:ecs:us-east-1:{account}:cluster/PrefectCluster",
task_role_arn= fr"arn:aws:iam::{account}:role/prefect-ecs-task-role",
vpc= vpc,
subnets= [subnet],
security_groups= [sg],
)
else:
return LocalCluster()
One of the main reasons for doing this was to eliminate the spin-up time for the FargateCluster if we didn't need that many resources, hence the two different return types. Thanks again for the help @Kevin Kho!Kevin Kho