<@ULVA73B9P> is there a way to clean up pending fl...
# marvin-ai
r
@Marvin is there a way to clean up pending flow runs when a worker that has picked up that flow run unexpectedly dies? I'm observing that when a when a prefect worker puts a flow run in pending and then dies before putting the job in a running state, the flow run gets stuck in the pending state.
m
thought for 38.4 seconds
Based on the research, I can provide you with a solution to handle flow runs that get stuck in a pending state when a worker dies. Here's what you can do: 1. Use Heartbeat Monitoring (Recommended Approach): - Prefect 3.x (specifically 3.1.8 and later) includes built-in heartbeat monitoring to detect and handle these situations. - You can configure the heartbeat frequency using the
PREFECT_RUNNER_HEARTBEAT_FREQUENCY
environment variable. - When a worker dies and stops sending heartbeats, Prefect can automatically mark the flow run as crashed. 2. Configure Automated Recovery: You can set up an automation to detect and handle zombie flow runs. Here's an example configuration:
Copy code
python
from prefect.server.schemas.states import Pending, Crashed
from prefect.server.orchestration.rules import flow_run_should_be_marked_as_crashed

@flow_run_should_be_marked_as_crashed.override
async def mark_as_crashed(flow_run):
    # If the flow run is pending and hasn't had a heartbeat in 60 seconds
    if (
        isinstance(flow_run.state, Pending)
        and flow_run.state.timestamp
        and (datetime.now(timezone.utc) - flow_run.state.timestamp).total_seconds() > 60
    ):
        return True
    return False
3. Manual Cleanup: If you need to clean up stuck flow runs manually, you can transition them to a failed state using the Prefect UI or API. To prevent this issue in the future: - Ensure your workers are running with proper monitoring and restart policies - Consider using container orchestration systems (like Kubernetes) that can automatically restart failed workers - Monitor the
PREFECT_RUNNER_HEARTBEAT_FREQUENCY
logs to detect worker health issues early You can find more detailed information about detecting and handling zombie flows in the Prefect documentation. Would you like me to provide more specific details about any of these approaches, particularly about setting up the automated recovery system?
n
@Marvin surprised you didn't mention deployment concurrency, CANCEL_NEW etc
m
thought for 74.5 seconds
This combination of concurrency policies and limits can help prevent the situation you're describing in several ways: 1. Prevent Zombie Runs: Using
CANCEL_NEW
with a concurrency limit of 1 ensures that if a worker dies while a flow run is pending, new attempts won't pile up behind it. 2. Resource Management: Concurrency limits help prevent resource exhaustion that might cause workers to fail in the first place. 3. Clean State Management: With
CANCEL_PENDING
, you can ensure that new runs will clear out any potentially stuck pending runs. To implement this for your specific case, I'd recommend: 1. First, set appropriate concurrency limits on your deployment 2. Combine this with the heartbeat monitoring mentioned earlier 3. Consider using
CANCEL_PENDING
if you want new runs to take precedence over potentially stuck pending runs Would you like me to elaborate on any specific aspect of concurrency policies or show more detailed examples of their implementation?
n
i guess maybe you're more interested in getting rid of
Late
runs, i didn't read carefully at first
r
@Nate Actually
CANCEL_NEW
is the config that we are having trouble with 🙂 We observe zombie flow runs (stuck in Pending) taking up concurrency slots, which causes new flow runs from the same deployment getting
CANCELLED
. @Marvin Can you describe
CANCEL_PENDING
more?
m
thought for 36.2 seconds
n
i think cancel pending is not real unfortunately
😢 1
m
Based on the research, here's how
CANCEL_PENDING
differs from your current
CANCEL_NEW
setup: 1.
CANCEL_NEW
(Your Current Situation)
: - When a new flow run is submitted and hits the concurrency limit - The new run gets cancelled - Existing pending runs (including zombies) keep their slots - Result: Zombie pending runs block new executions 2.
CANCEL_PENDING
(Potential Solution)
: - When a new flow run is submitted and hits the concurrency limit - The new run proceeds - Any existing pending runs get cancelled - Result: Newer runs take precedence, clearing out stuck pending runs For your specific zombie flow run situation,
CANCEL_PENDING
would be more appropriate because: - It will automatically clear out those stuck pending runs when new runs come in - It prioritizes newer runs over potentially dead ones - It prevents resource slots from being held hostage by zombie runs To implement this, you would modify your deployment configuration to use `CANCEL_PENDING`:
Copy code
python
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.policies import DeploymentPolicy

@flow
def my_flow():
    pass

deployment = flow.to_deployment(
    name="my-deployment",
    concurrency_policy=DeploymentPolicy.CANCEL_PENDING,
    concurrency_limit=1
)
This approach, combined with the heartbeat monitoring mentioned earlier, should give you better handling of worker failures because: 1. If a worker dies leaving a pending run 2. When a new run comes in, the stuck pending run will be cancelled 3. The new run can proceed normally Would you like me to provide more specific details about implementing this change or combining it with other resilience features?
👎 1
n
jeez ill see what's going on with that, that is not even almost real
😂 1
i think the best approach is to cancel late runs with an automation
based on what i hear so far at least
r
Is there some automation that prefect can provide here out of the box to cancel Pending flow runs that have been in that state for a while?
n
well
Pending
runs that stay in
Pending
too long (configurable amount of time, defaults to something like 30 seconds iirc) become
Late
so you can have an automation something like this
if you want to mess with the details, look at the JSON tab of the stock
Flow run state
trigger and tweak it to make a similar
Custom
trigger that has the specifics you want
r
Interesting, I'm observing that flow runs can get stuck in
Pending
for several minutes without being transitioned to
Late
:
We've even observed hours, but I dont have a screenshot for that
Is there some configuration that I can use to force hanging pending flow runs to become
Late
?
n
oh im sorry
i got my states messed up, if they stick in Scheduled theyll go to Late
if they're in Pending, they got picked up but not started
👍 1
c
The only worker that would cause this behavior I believe is a process worker - is that the worker type you are using?
r
give me a sec, i'll look it up
we are running a custom worker that extends the prefect base worker
c
interesting interesting. Ideally your worker should be designed to be stateless, meaning that the infrastructure a run is submitted to is fully decoupled from the worker process itself. Once a run is in "Pending" that means the submission to infra was successful, and the job itself should manage the transition to Running, not the worker
r
I see, so if the infrastructure were to crash between receiving the flow run and managing the transition to
Running
, then this same problem would happen? I dug around and found this similar github issue, but I see in the comments that the issue was resolved?
c
depending on the nature of the crash and depending on the worker, yea it could; the kubernetes worker is fully stateless so this wouldn't happen with the k8s worker. Even if the infrastructure crashes for the job and the worker restarts, the k8s worker knows how to monitor job labels to report crashes back to the API (so this situation would result in a
Crashed
state if the job itself crashes, and otherwise the job would run just fine even if the worker crashes / restarts multiple times)
r
Gotcha, this info is very helpful 🙏 I think we'll consider the k8s worker for this extra robustness, but in the meantime I guess we'll have to write some custom scripting to monitor these zombie flow runs?
🫡 1
c
Yea that'd be my recommendation; this doc covers the configuration of heartbeats + an automation that should be a helpful pattern to get you going: https://docs.prefect.io/v3/advanced/detect-zombie-flows. The guide is independent of worker (just make sure to set that
PREFECT_RUNNER_HEARTBEAT_FREQUENCY
env var on the workflow run's environment)
👀 1
r
@Chris White this seems pretty promising, I think we'll definitely consider this option. Thanks!
🙌 1
Hey @Chris White, I'm pursuing a different option -- going to post here for posterity in case it helps someone else in the future, or if you happen to be collecting usage patterns! I'm running prefect-server in a k8s pod and using another postgres k8s pod for persistence. I will be writing a script to run in a container inside the prefect-server pod in to look for zombie flow runs. The script is going to periodically query postgres's
flow_run
table, looking for flow runs in a
Pending
or
Running
state whose
infrastructure_pid
col matches with rows in the
worker
table whose
status
col is
OFFLINE
. If it finds any matching flow runs, we'll use the Prefect REST api to set those flow runs to a
CRASHED
state. Since the script container is running in the same k8s pod as the prefect-server container, it should be pretty easy to set up that network connection. Similarly, since the prefect-server k8s pod already has network access to the postgres k8s pod, so it should be pretty easy for the script container to connect to the postgres pod as well.