Thread
#prefect-community
    m

    Marwan Sarieddine

    7 months ago
    Hi folks, since last week we have been encountering an issue with prefect cloud, version locking and heartbeat failures - more details in the thread.
    The pattern is fairly consistent by now: One of our prefect tasks gets marked as: “version lock encountered” by prefect cloud. This then causes the downstream tasks to skip running. (see the attached screenshot of our logs for more details)
    After 2 minutes the task is failed due to a heartbeat failure but the flow run state remains in a “running” state
    Questions: Is there a reason why we might be encountering more version locking errors than usual ? Is there a way to verify the version locking errors are legitimate and not caused by a bug on the prefect cloud side? Why doe heartbeat not fail the entire flow run ? We don’t have lazarus enabled due to bugs we have encountered using lazarus so we would really want this to cause a flow failure
    Anna Geller

    Anna Geller

    7 months ago
    Hi @Marwan Sarieddine I remember the previous discussions about similar issue you had in the past. I remember the issue last time was Dask and we concluded that version locking can only help to prevent a task from running to completion more than once, but cannot prevent a task from running more than once in general. Can you send the flow run IDs of a couple of flow runs that failed this way?
    for reference, those are related previous issues we discussed before: • Lazarus version locking
    m

    Marwan Sarieddine

    7 months ago
    Hi @Anna Geller thank you for recalling the previous issues ! Yes that is correct, that is what we concluded. We then proceeded to add this environment variable to our dask setup:
    DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES: "0"
    We also turned off dask’s memory management and are not using a dask nanny i.e. we run:
    dask-worker ... --no-nanny --memory-limit 0
    we believe the above dask configuration should prevent dask-related restarts - but the prefect version locking seems to indicate otherwise - so what are we missing here ?
    Not sure if it helps but most of the tasks where version locking was encountered are very “light” tasks like “case” context manager tasks or resource manager “setup” and “cleanup” tasks or “merge” tasks …
    Anna Geller

    Anna Geller

    7 months ago
    I looked at the logs a bit and the issue seems to be related/caused by
    "No heartbeat detected from the remote task; marking the run as failed."
    error rather than version locking. I didn’t see this env variable in your config, can you consider adding that? We’ve seen that switching heartbeat mode from processes to threads often really helped with heartbeat issues:
    from prefect.run_configs import KubernetesRun
    flow.run_config = KubernetesRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
    Can you give one example of how do you use those case/merge/resource manager tasks in your flow with Dask? It would be nice if I could reproduce the issue to find out what exactly is causing the task runs to lose heartbeats.
    m

    Marwan Sarieddine

    7 months ago
    looked at the logs a bit and the issue seems to be related/caused by 
    "No heartbeat detected from the remote task; marking the run as failed."
     error rather than version locking.
    That is what I first thought but if you take a closer look at the screenshot I shared in this thread you will see that the problem starts with the version locking, which causes all the downstream task runs to stop running (I am referring to this message that is logged in all the downstream tasks
    Not all upstream states are finished; ending run.
    ) , which in turn triggers the hearbeat failure
    i.e. I don’t think there is any issue with the heartbeat getting triggered incorrectly - it correctly waits for 2 minutes and fails the task .. my main issue with it is that it does not fail the flow run I should also note that it is very strange the version locking log doesn’t it make it to the prefect cloud logs
    The simplified version of the flow with the case statement is something like this:
    with Flow("test") as flow:
       test_mode = Parameter(name="test_mode", type=bool, default=False)
       with case(test_mode, False):
          run_task()
    Where the case statement is checking the value of a parameter
    Anna Geller

    Anna Geller

    7 months ago
    Thanks a lot for all this info. I will analyze it more and ask the team. So far my only idea would be to add some hack adding a state handler checking for this error message “Not all upstream states are finished; ending run” or “No heartbeat detected from the remote task; marking the run as failed.” and cancelling the flow run manually if that happens:
    import prefect
    from prefect.client import Client
    
    c = Client()
    c.cancel_flow_run(flow_run_id=prefect.context.get("flow_run_id"))
    I’ll get back to you
    m

    Marwan Sarieddine

    7 months ago
    thank you and take your time 🙂
    Anna Geller

    Anna Geller

    7 months ago
    Have you actually tried disabling version locking? I’m not sure this is helpful in your use case given that you already disabled all Dask-specific retries, nanny and memory management
    m

    Marwan Sarieddine

    7 months ago
    yes this will most likely be the solution I resort to - to disable version locking like we did with lazarus … just a bit unfortunate we are not able to leverage these prefect cloud features given they are not 100% reliable so far
    Anna Geller

    Anna Geller

    7 months ago
    I understand… I asked the team and will let you know if I hear any suggestions. Why do the task runs loose heartbeat in the first place? Are they failing on Dask and given that Dask cannot retry those due to the current setting, Prefect loses heartbeat to those? It would be good if we could prevent this root cause of the problem so that ideally task runs shouldn’t loose heartbeats, right? Maybe it could be not enough memory allocated for the task runs?
    m

    Marwan Sarieddine

    7 months ago
    Why do the task runs loose heartbeat in the first place?
    From what I understand it is the version locking that is causing the task run to get stuck in a “Running” state - i.e. preventing it from proceeding to a successful state causing it to loose heartbeat The pod on which the worker is running is not affected and the downstream tasks runs don’t properly execute due to “Not all upstream states are finished;”
    Maybe it could be not enough memory allocated for the task runs?
    I doubt this is memory related - the same flow run runs successfully for 4 times every day, and this only occurs rarely. Additionally, the failure to the case statement happens at the very beginning of the flow when memory utilization is minimal
    Anna Geller

    Anna Geller

    7 months ago
    I see. In that case I agree with you that disabling version locking seems like the right thing to do