Hey community, There’s a phenomenon I’ve encounte...
# ask-community
a
Hey community, There’s a phenomenon I’ve encountered various times in which a task is being executed twice within one flow run (after completing successfully the first time). It happens for regular tasks and mapped tasks as well. We are suspecting that this is an issue where the downstream task tries to fetch the task’s result, fails to do so, and that triggers the original task to run again, even though it completed successfully. Any ideas why this is happening and what we can do to fix? Working on: • Kubernetes dask cluster (Kubernetes Executor) • Prefect 13.18 cc: @Vladimir Zoubritsky
v
The issue sounds similar to the issue that was fixed for LocalDaskExecutor https://github.com/PrefectHQ/prefect/pull/3127 Could this be affecting us using
DaskExecutor(cluster_class=KubeCluster)
?
🤔 1
d
Hi @Avi A! Could you share an some example logs and states as well as your Run Config and Executor configuration?
Also, we’ve made some big changes since the last 0.13.x release, you may find that upgrading to 0.14.x may solve your issue
a
Thanks @Dylan. We might upgrade to 0.14 soon. @Vladimir Zoubritsky is the engineer working with me on this; I hope he can share that config info. Currently we suspect that Dask workers are getting killed (possibly due to exceeding memory limit), and then their output is getting lost. It’s pretty weird because we don’t see task failures so not sure why the workers are getting restarted. Does that sound related?
v
Looking at the worker logs from the time when the tasks started executing for the second time, the worker was killed. However, the worker was probably not executing any task at the time, so no task appear as failed
Copy code
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - INFO - Worker process 17 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:    <tcp://10.244.19.5:33061>
distributed.worker - INFO -          Listening to:    <tcp://10.244.19.5:33061>
distributed.worker - INFO -          dashboard at:          10.244.19.5:39427
distributed.worker - INFO - Waiting to connect to:    <tcp://10.244.9.86:34195>
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   57.00 GB
distributed.worker - INFO -       Local Directory: /srv/app/dask-worker-space/dask-worker-space/worker-utvrlpcv
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:    <tcp://10.244.9.86:34195>
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
d
Thanks for this info! Could you also share your Run Config and Executor configuration?
If you’re running without an orchestration layer (Prefect Cloud or Prefect Server) Dask worker restarts can cause some tasks to be re-run
You can also try giving the Dask workers / Scheduler more resources to try to prevent restarts
v
Thanks! We will try increasing the number of resources, since the restarts cause subsequent restarts in a quite deterministic way. We are using Prefect Server for the orchestration layer (using the helm-chart to install on kubernetes) The run config we are using is
run_config=KubernetesRun(image=image)
(using a custom image with all our dependencies) And for the executor
Copy code
executor = DaskExecutor(
            cluster_class=KubeCluster,
            cluster_kwargs={
                'n_workers': 3,
                'pod_template': make_pod_spec(
                    image=image,
                    cpu_request='2',
                    memory_request='57G',
                    memory_limit='57G',
                )
            },
        )