https://prefect.io logo
Title
l

Lawrence Finn

06/23/2021, 4:25 PM
I am just starting to play with prefect so maybe I’m doing something dumb. I’m trying to use prefect with a fargate dask temporary cluster. The tasks run and finish, but the flow gets stuck in the running state even though each task finished successfully. It works fine with local executor, but not with dask fargate temporary cluster. Any ideas?
k

Kevin Kho

06/23/2021, 4:42 PM
Hi @Lawrence Finn, I have this issue with DaskExectuor where the last hangs sometimes. Maybe you can try using a DaskExecutor with processes instead of threads.
l

Lawrence Finn

06/23/2021, 4:59 PM
how do i do that?
k

Kevin Kho

06/23/2021, 5:05 PM
You would set it in the Dask config but one sec let me confirm with the team
Could you show me how you set up the DaskExecutor?
l

Lawrence Finn

06/23/2021, 5:30 PM
certainly! and sorry if im doing something super dumb lol
def fargate_cluster(n_workers=4):
    """Start a fargate cluster using the same image as the flow run"""
    return FargateCluster(
        image="actioniq/dask-image:latest",
        task_role_arn="...",
        execution_role_arn="...",
        n_workers=1,
        scheduler_cpu=256,
        scheduler_mem=512,
        worker_cpu=256,
        worker_mem=512,
        scheduler_timeout="15 minutes",
        subnets=["...],
        security_groups=["..."],
        skip_cleanup=True,
        fargate_use_private_ip=True,
    )
executor = DaskExecutor(
    cluster_class=fargate_cluster,
    cluster_kwargs={
        "n_workers": 5,
    },
)
with Flow("docker-flow-3", storage=S3(bucket="....", key="meowsers/powsers"), schedule=schedule, executor=executor) as flow:
OH i think it is bc of security groups, dask workers use some random port
k

Kevin Kho

06/23/2021, 6:16 PM
Oh ok i’ll respond in a bit how to make the Fargate cluster use processes though, but that is worth a shot
l

Lawrence Finn

06/23/2021, 6:17 PM
yeah, the security group fixed what i was seeing… aws always gets ya 😄
k

Kevin Kho

06/23/2021, 6:21 PM
Oh that’s good to know! How did you work around it? Open more ports?
l

Lawrence Finn

06/23/2021, 6:23 PM
yeah opening more ports worked. i think you can pass worker ports as a parameter, ill give that whirl too just so i dont need such permissive security groups (even though they are limited to 10.0.0.0/8)
worker_extra_args=["--worker-port=8786"],