https://prefect.io logo
p

Peter Peter

02/09/2022, 2:28 PM
Hello, I have a task that I am calling map on using Dask executor which works fine until there is an error. Sometimes I get an "Unexpected error: KilledWorker" which in my case seems to be because of dask worker has run out of memory. There are only a handful of cases where this happens but when it does it kills the whole flow run. Is there any way to handle this without killing the whole flow? I want to have 2 downstream tasks that would take the filtered results and act on them. Any help would be appreciated.
k

Kevin Kho

02/09/2022, 2:29 PM
How many workers do you have in your executor
p

Peter Peter

02/09/2022, 2:29 PM
I am currently in my testing have 3.
k

Kevin Kho

02/09/2022, 2:32 PM
I think the default of Dask is to have the scheduler restart the workers though and then the workers retry the work. What is your Dask cluster setup?
Also this is a good read if you haven’t seen it yet. If they are not restarted, I think you are running into an unrecoverable error? In that case, I don’t think Prefect can do anything because those filtered results can’t get back from the Dask cluster to your client
p

Peter Peter

02/09/2022, 2:38 PM
The dask cluster is dynamically created in k8s with
Copy code
pod_spec_kwargs = {'memory_limit': '4G'}

return DaskExecutor(cluster_class=lambda: KubeCluster(make_pod_spec(image=pod_spec_image, **pod_spec_kwargs)),
                            adapt_kwargs={
                                "minimum": min_workers,
                                "maximum": max_workers
                            })
Are you saying that if there is any failure in a dask worker then the flow will fail and there is no way around this?
k

Kevin Kho

02/09/2022, 2:44 PM
I can’t think of any, especially if your downstream tasks have a data dependency on the tasks submitted to Dask cuz that data gets lost
But for recoverable errors, Dask will be able to spin up the worker again so there is a distinction I think
p

Peter Peter

02/09/2022, 3:06 PM
WIth "out of memory" error the only way it will succeed is if memory is increased and my understanding is that there is no way to change this. What my flow is doing is converting files and using dask to parallelize this. We are only want to collect if the conversion was successful or not. If it was possible to not have perfect kill the flow for an error.
k

Kevin Kho

02/09/2022, 3:08 PM
Why is there no way to change the memory? You can increase it in the
pod_spec_kwargs
in your snippet right? I am not sure this will work but maybe you can try using a
trigger
on the downstream task
Copy code
@task(trigger=always_run)
def something():
    return 1
so it will always run even if the upstream fails.
p

Peter Peter

02/09/2022, 3:16 PM
I am aware I can increase memory but that is across the board. Do not want to have 1000's of runs using 64GB when 99% of the runs will finish successfully with 4gb. Still trying to determine how to detect these outliers.
k

Kevin Kho

02/09/2022, 3:18 PM
Ah I see
p

Peter Peter

02/09/2022, 3:27 PM
Don't think always_run will work since the flow is stopped when there is an error even if there is outstanding work.
k

Kevin Kho

02/09/2022, 3:46 PM
Ah I see what you mean. In that case, I don’t think there is anything we can do on the Prefect side. We’d need Dask not to raise that error, but I don’t know if it can be done
Actually maybe we can use a resource manager instead and maybe the tasks can be treated independently?
p

Peter Peter

02/09/2022, 8:47 PM
thanks will check this out.
11 Views