Ryan Qiu
07/16/2025, 8:03 PMMarvin
07/16/2025, 8:03 PMMarvin
07/16/2025, 8:03 PMPREFECT_RUNNER_HEARTBEAT_FREQUENCY
environment variable.
- When a worker dies and stops sending heartbeats, Prefect can automatically mark the flow run as crashed.
2. Configure Automated Recovery:
You can set up an automation to detect and handle zombie flow runs. Here's an example configuration:
python
from prefect.server.schemas.states import Pending, Crashed
from prefect.server.orchestration.rules import flow_run_should_be_marked_as_crashed
@flow_run_should_be_marked_as_crashed.override
async def mark_as_crashed(flow_run):
# If the flow run is pending and hasn't had a heartbeat in 60 seconds
if (
isinstance(flow_run.state, Pending)
and flow_run.state.timestamp
and (datetime.now(timezone.utc) - flow_run.state.timestamp).total_seconds() > 60
):
return True
return False
3. Manual Cleanup:
If you need to clean up stuck flow runs manually, you can transition them to a failed state using the Prefect UI or API.
To prevent this issue in the future:
- Ensure your workers are running with proper monitoring and restart policies
- Consider using container orchestration systems (like Kubernetes) that can automatically restart failed workers
- Monitor the PREFECT_RUNNER_HEARTBEAT_FREQUENCY
logs to detect worker health issues early
You can find more detailed information about detecting and handling zombie flows in the Prefect documentation.
Would you like me to provide more specific details about any of these approaches, particularly about setting up the automated recovery system?Nate
07/16/2025, 8:06 PMMarvin
07/16/2025, 8:06 PMMarvin
07/16/2025, 8:08 PMCANCEL_NEW
with a concurrency limit of 1 ensures that if a worker dies while a flow run is pending, new attempts won't pile up behind it.
2. Resource Management: Concurrency limits help prevent resource exhaustion that might cause workers to fail in the first place.
3. Clean State Management: With CANCEL_PENDING
, you can ensure that new runs will clear out any potentially stuck pending runs.
To implement this for your specific case, I'd recommend:
1. First, set appropriate concurrency limits on your deployment
2. Combine this with the heartbeat monitoring mentioned earlier
3. Consider using CANCEL_PENDING
if you want new runs to take precedence over potentially stuck pending runs
Would you like me to elaborate on any specific aspect of concurrency policies or show more detailed examples of their implementation?Nate
07/16/2025, 8:08 PMLate
runs, i didn't read carefully at firstRyan Qiu
07/16/2025, 8:08 PMCANCEL_NEW
is the config that we are having trouble with 🙂
We observe zombie flow runs (stuck in Pending) taking up concurrency slots, which causes new flow runs from the same deployment getting CANCELLED
.
@Marvin Can you describe CANCEL_PENDING
more?Marvin
07/16/2025, 8:08 PMNate
07/16/2025, 8:09 PMMarvin
07/16/2025, 8:09 PMCANCEL_PENDING
differs from your current CANCEL_NEW
setup:
1. CANCEL_NEW
(Your Current Situation):
- When a new flow run is submitted and hits the concurrency limit
- The new run gets cancelled
- Existing pending runs (including zombies) keep their slots
- Result: Zombie pending runs block new executions
2. CANCEL_PENDING
(Potential Solution):
- When a new flow run is submitted and hits the concurrency limit
- The new run proceeds
- Any existing pending runs get cancelled
- Result: Newer runs take precedence, clearing out stuck pending runs
For your specific zombie flow run situation, CANCEL_PENDING
would be more appropriate because:
- It will automatically clear out those stuck pending runs when new runs come in
- It prioritizes newer runs over potentially dead ones
- It prevents resource slots from being held hostage by zombie runs
To implement this, you would modify your deployment configuration to use `CANCEL_PENDING`:
python
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.policies import DeploymentPolicy
@flow
def my_flow():
pass
deployment = flow.to_deployment(
name="my-deployment",
concurrency_policy=DeploymentPolicy.CANCEL_PENDING,
concurrency_limit=1
)
This approach, combined with the heartbeat monitoring mentioned earlier, should give you better handling of worker failures because:
1. If a worker dies leaving a pending run
2. When a new run comes in, the stuck pending run will be cancelled
3. The new run can proceed normally
Would you like me to provide more specific details about implementing this change or combining it with other resilience features?Nate
07/16/2025, 8:10 PMNate
07/16/2025, 8:10 PMNate
07/16/2025, 8:10 PMRyan Qiu
07/16/2025, 8:11 PMNate
07/16/2025, 8:16 PMPending
runs that stay in Pending
too long (configurable amount of time, defaults to something like 30 seconds iirc) become Late
so you can have an automation something like thisNate
07/16/2025, 8:17 PMFlow run state
trigger and tweak it to make a similar Custom
trigger that has the specifics you wantRyan Qiu
07/16/2025, 8:28 PMPending
for several minutes without being transitioned to Late
:Ryan Qiu
07/16/2025, 8:28 PMRyan Qiu
07/16/2025, 8:29 PMLate
?Nate
07/16/2025, 8:29 PMNate
07/16/2025, 8:29 PMNate
07/16/2025, 8:29 PMChris White
Ryan Qiu
07/16/2025, 8:30 PMRyan Qiu
07/16/2025, 8:30 PMChris White
Ryan Qiu
07/16/2025, 9:30 PMRunning
, then this same problem would happen?
I dug around and found this similar github issue, but I see in the comments that the issue was resolved?Chris White
Crashed
state if the job itself crashes, and otherwise the job would run just fine even if the worker crashes / restarts multiple times)Ryan Qiu
07/16/2025, 9:36 PMChris White
PREFECT_RUNNER_HEARTBEAT_FREQUENCY
env var on the workflow run's environment)Ryan Qiu
07/16/2025, 11:38 PMRyan Qiu
07/17/2025, 9:06 PMflow_run
table, looking for flow runs in a Pending
or Running
state whose infrastructure_pid
col matches with rows in the worker
table whose status
col is OFFLINE
. If it finds any matching flow runs, we'll use the Prefect REST api to set those flow runs to a CRASHED
state.
Since the script container is running in the same k8s pod as the prefect-server container, it should be pretty easy to set up that network connection. Similarly, since the prefect-server k8s pod already has network access to the postgres k8s pod, so it should be pretty easy for the script container to connect to the postgres pod as well.