Lawrence Finn
06/23/2021, 4:25 PMKevin Kho
06/23/2021, 4:42 PMLawrence Finn
06/23/2021, 4:59 PMKevin Kho
06/23/2021, 5:05 PMLawrence Finn
06/23/2021, 5:30 PMdef fargate_cluster(n_workers=4):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(
image="actioniq/dask-image:latest",
task_role_arn="...",
execution_role_arn="...",
n_workers=1,
scheduler_cpu=256,
scheduler_mem=512,
worker_cpu=256,
worker_mem=512,
scheduler_timeout="15 minutes",
subnets=["...],
security_groups=["..."],
skip_cleanup=True,
fargate_use_private_ip=True,
)
executor = DaskExecutor(
cluster_class=fargate_cluster,
cluster_kwargs={
"n_workers": 5,
},
)
with Flow("docker-flow-3", storage=S3(bucket="....", key="meowsers/powsers"), schedule=schedule, executor=executor) as flow:
Kevin Kho
06/23/2021, 6:16 PMLawrence Finn
06/23/2021, 6:17 PMKevin Kho
06/23/2021, 6:21 PMLawrence Finn
06/23/2021, 6:23 PMworker_extra_args=["--worker-port=8786"],