Lawrence Finn
06/23/2021, 4:25 PMKevin Kho
Lawrence Finn
06/23/2021, 4:59 PMKevin Kho
Kevin Kho
Lawrence Finn
06/23/2021, 5:30 PMLawrence Finn
06/23/2021, 5:31 PMdef fargate_cluster(n_workers=4):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(
image="actioniq/dask-image:latest",
task_role_arn="...",
execution_role_arn="...",
n_workers=1,
scheduler_cpu=256,
scheduler_mem=512,
worker_cpu=256,
worker_mem=512,
scheduler_timeout="15 minutes",
subnets=["...],
security_groups=["..."],
skip_cleanup=True,
fargate_use_private_ip=True,
)
executor = DaskExecutor(
cluster_class=fargate_cluster,
cluster_kwargs={
"n_workers": 5,
},
)
with Flow("docker-flow-3", storage=S3(bucket="....", key="meowsers/powsers"), schedule=schedule, executor=executor) as flow:
Lawrence Finn
06/23/2021, 6:14 PMKevin Kho
Lawrence Finn
06/23/2021, 6:17 PMKevin Kho
Lawrence Finn
06/23/2021, 6:23 PMLawrence Finn
06/23/2021, 6:26 PMworker_extra_args=["--worker-port=8786"],