Marwan Sarieddine
02/21/2022, 3:19 PMMarwan Sarieddine
02/21/2022, 3:22 PMMarwan Sarieddine
02/21/2022, 3:23 PMMarwan Sarieddine
02/21/2022, 3:25 PMAnna Geller
Anna Geller
Marwan Sarieddine
02/21/2022, 4:12 PMDASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES: "0"
We also turned off dask’s memory management and are not using a dask nanny
i.e. we run:
dask-worker ... --no-nanny --memory-limit 0
we believe the above dask configuration should prevent dask-related restarts - but the prefect version locking seems to indicate otherwise - so what are we missing here ?Marwan Sarieddine
02/21/2022, 4:30 PMAnna Geller
"No heartbeat detected from the remote task; marking the run as failed."
error rather than version locking.
I didn’t see this env variable in your config, can you consider adding that? We’ve seen that switching heartbeat mode from processes to threads often really helped with heartbeat issues:
from prefect.run_configs import KubernetesRun
flow.run_config = KubernetesRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
Can you give one example of how do you use those case/merge/resource manager tasks in your flow with Dask?
It would be nice if I could reproduce the issue to find out what exactly is causing the task runs to lose heartbeats.Marwan Sarieddine
02/21/2022, 5:20 PMlooked at the logs a bit and the issue seems to be related/caused byThat is what I first thought but if you take a closer look at the screenshot I shared in this thread you will see that the problem starts with the version locking, which causes all the downstream task runs to stop running (I am referring to this message that is logged in all the downstream taskserror rather than version locking."No heartbeat detected from the remote task; marking the run as failed."
Not all upstream states are finished; ending run.
)
, which in turn triggers the hearbeat failureMarwan Sarieddine
02/21/2022, 5:22 PMMarwan Sarieddine
02/21/2022, 5:28 PMwith Flow("test") as flow:
test_mode = Parameter(name="test_mode", type=bool, default=False)
with case(test_mode, False):
run_task()
Where the case statement is checking the value of a parameterAnna Geller
import prefect
from prefect.client import Client
c = Client()
c.cancel_flow_run(flow_run_id=prefect.context.get("flow_run_id"))
I’ll get back to youMarwan Sarieddine
02/21/2022, 5:37 PMAnna Geller
Marwan Sarieddine
02/21/2022, 5:46 PMAnna Geller
Marwan Sarieddine
02/21/2022, 6:53 PMWhy do the task runs loose heartbeat in the first place?From what I understand it is the version locking that is causing the task run to get stuck in a “Running” state - i.e. preventing it from proceeding to a successful state causing it to loose heartbeat The pod on which the worker is running is not affected and the downstream tasks runs don’t properly execute due to “Not all upstream states are finished;”
Maybe it could be not enough memory allocated for the task runs?I doubt this is memory related - the same flow run runs successfully for 4 times every day, and this only occurs rarely. Additionally, the failure to the case statement happens at the very beginning of the flow when memory utilization is minimal
Anna Geller