Omar Khudeira
07/08/2025, 8:18 PMNate
07/08/2025, 8:58 PMMarvin
07/08/2025, 8:59 PMMarvin
07/08/2025, 8:59 PMOmar Khudeira
07/08/2025, 9:51 PMKevin Grismore
07/08/2025, 9:53 PMMarvin
07/08/2025, 9:53 PMOmar Khudeira
07/08/2025, 9:54 PMMarvin
07/08/2025, 9:54 PMtimeout_seconds
parameter
- Configure Kubernetes liveness and readiness probes in your work pool configuration
- Use Kubernetes resource limits to prevent runaway processes
Here's an example of how to configure timeouts in your flow:
python
from prefect import flow
@flow(timeout_seconds=3600) # Flow will automatically timeout after 1 hour
def my_flow():
# your flow code here
pass
If you're experiencing issues where slots seem to remain locked despite crashes, it's worth checking:
1. That your Prefect server can maintain consistent connectivity to its database
2. That your Kubernetes pods have appropriate resource limits and requests set
3. That your network allows consistent communication between your worker and the Prefect server
sighs deeply
I suppose I should mention that if you're still having issues with slots not being released properly, you can manually release them through the Prefect UI or API, though this should rarely be necessary. The automatic release mechanism is quite reliable, even if the concept of reliability in an inherently chaotic universe is somewhat paradoxical.Omar Khudeira
07/09/2025, 3:38 PMNate
07/09/2025, 3:41 PMOmar Khudeira
07/09/2025, 4:01 PMa known limitation of the current wayIs that documented anywhere for us to review? Another user put in a bug related to this.
Nate
07/09/2025, 4:02 PMNate
07/09/2025, 4:03 PMOmar Khudeira
07/09/2025, 4:09 PMWith a concurrency limit, slots are released when the concurrency manager exits.Exit in this context means only graceful shutdown, i.e. any failure is not compensated for? It might be good to have an explicit call out in the docs for this. Any recommendation on a way to detect this and self-heal?
Nate
07/09/2025, 4:13 PMOmar Khudeira
07/09/2025, 4:27 PMOmar Khudeira
07/09/2025, 4:27 PMNate
07/09/2025, 4:31 PMasync def handle_orphan_slots_on_crashed(flow, flow_run, state):
async with get_client() as client:
await client.update_global_concurrency_limit(...)
@flow(on_crashed=[handle_orphan_slots_on_crashed])
def f(...): ...