Rajip Upreti
07/03/2025, 6:16 AMMarvin
07/03/2025, 6:16 AMMarvin
07/03/2025, 6:18 AMbash
prefect flow-run cancel <flow-run-id>
2. Preventive Measures:
- Implement health checks in your flow to detect worker disconnection
- Add timeout settings to your flow runs using the timeout_seconds
parameter:
python
@flow(timeout_seconds=3600) # Flow will fail after 1 hour
def your_flow():
...
- Use a try-finally block in your flow to handle cleanup:
python
@flow
def your_flow():
try:
# Your flow logic here
...
finally:
# Cleanup logic
...
3. Infrastructure Configuration:
- Set appropriate resource requests and limits in your Kubernetes worker deployment
- Configure liveness and readiness probes for your worker pods
- Consider using pod disruption budgets to manage graceful termination
This is a known issue that's being tracked, particularly in Kubernetes environments. The core problem is that when a worker crashes unexpectedly, the flow run state can become desynchronized from the actual execution state.
For now, the most reliable approach is to:
1. Set appropriate timeouts on your flows
2. Implement monitoring to detect and handle stuck flows
3. Use the manual cancellation approach when needed
If you're experiencing this regularly, you might want to track the relevant GitHub issue (https://github.com/PrefectHQ/prefect/issues/17955) for updates on more permanent solutions.Rajip Upreti
07/03/2025, 6:22 AMMarvin
07/03/2025, 6:22 AMMarvin
07/03/2025, 6:23 AMpython
from prefect.automations import create_flow_run_automation
from prefect.states import Crashed
from datetime import timedelta
# Create an automation to detect and handle stuck flows
automation = create_flow_run_automation(
name="Detect Stuck Flows",
match_criteria={
"state": {"type": "RUNNING"},
"last_heartbeat_delta": {"greater_than": timedelta(minutes=10)}
},
actions=[
{
"type": "mark_failed",
"state": Crashed(message="Flow run marked as crashed due to missing heartbeats")
}
]
)
2. Custom Monitoring Flow
Here's a monitoring flow that you can run periodically to check for stuck flows:
python
from prefect import flow, task
from prefect.client import get_client
from datetime import datetime, timedelta
from prefect.states import Crashed
@task
async def get_stuck_flows(stuck_threshold_minutes: int = 30):
async with get_client() as client:
# Get flows that have been running for longer than the threshold
flows = await client.read_flow_runs(
flow_runs={"state": {"type": "RUNNING"}},
)
stuck_flows = []
now = datetime.utcnow()
for flow in flows:
# Check if the flow's last heartbeat is too old
if flow.state_timestamp:
time_running = now - flow.state_timestamp
if time_running > timedelta(minutes=stuck_threshold_minutes):
stuck_flows.append(flow)
return stuck_flows
@task
async def handle_stuck_flow(flow_run_id: str):
async with get_client() as client:
await client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Crashed(
message="Flow automatically marked as crashed due to missing heartbeats"
)
)
@flow(log_prints=True)
async def monitor_stuck_flows(stuck_threshold_minutes: int = 30):
stuck_flows = await get_stuck_flows(stuck_threshold_minutes)
print(f"Found {len(stuck_flows)} potentially stuck flows")
for flow in stuck_flows:
print(f"Handling stuck flow: {flow.id}")
await handle_stuck_flow(flow.id)
return len(stuck_flows)
3. Schedule the Monitoring Flow
Deploy the monitoring flow to run on a schedule:
python
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import IntervalSchedule
deployment = Deployment.build_from_flow(
flow=monitor_stuck_flows,
name="stuck-flow-monitor",
schedule=IntervalSchedule(interval=timedelta(minutes=15)),
parameters={"stuck_threshold_minutes": 30}
)
deployment.apply()
4. Configure Heartbeat Settings
In your Prefect configuration, ensure appropriate heartbeat settings:
bash
prefect config set PREFECT_RUNNER_HEARTBEAT_FREQUENCY=30
5. Additional Monitoring Best Practices
- Set appropriate timeouts on your flows:
python
@flow(timeout_seconds=3600) # 1 hour timeout
def your_flow():
...
- Add health check tasks to your flows:
python
@task
def health_check():
# Implement your health check logic
# e.g., check connections, resources, etc.
pass
@flow
def your_flow():
health_check()
# Rest of your flow logic
- Log important events for external monitoring:
```python
from prefect import get_run_logger
@flow
def your_flow():
logger = get_run_logger()
logger.info("Flow started")
try:
# Your flow logic
pass
except Exception as e:
logger.error(f"Flow failed: