Kartik
11/05/2025, 8:51 AMMarvin
11/05/2025, 8:51 AMMarvin
11/05/2025, 8:54 AMflow decorator with your own.
Options:
- Flow-level timeout (recommended)
from prefect import flow
@flow(timeout_seconds=3600) # 1 hour
def my_flow():
...
- Task-level timeout (for specific long tasks)
from prefect import task
@task(timeout_seconds=300) # 5 minutes
def work():
...
- Granular timeout inside code
from prefect.utilities.timeout import timeout
with timeout(30): # seconds
do_something_blocking()
- Simulate a “global” default with a wrapper decorator
import os
from prefect import flow as prefect_flow
DEFAULT_FLOW_TIMEOUT = int(os.getenv("PREFECT_FLOW_TIMEOUT", "3600"))
def org_flow(*args, **kwargs):
kwargs.setdefault("timeout_seconds", DEFAULT_FLOW_TIMEOUT)
return prefect_flow(*args, **kwargs)
@org_flow
def my_flow():
...
What happens on timeout
- The flow moves to a Failed state with a timeout error and Prefect attempts to interrupt execution. If you need cleanup, add hooks:
@flow(timeout_seconds=3600, on_cancellation=[...], on_failure=[...])
def my_flow():
...
Infrastructure-level notes
- There’s no built-in “kill after T” at the worker level. If you run on Kubernetes, you can add activeDeadlineSeconds to your job/pod via your Kubernetes worker job template to hard-stop the container if you want an infra-enforced cap. See Workers and customize the Kubernetes job template accordingly.
- For Docker/Process workers, rely on timeout_seconds or run under an external supervisor if you truly need a hard kill.
Docs and references
- Flow/task timeouts and timeout helpers: Timeout utilities
- Hooks for cleanup: State change hooks
- Cancellation concepts: Cancel workflows
- Workers overview: Workers
A couple of clarifying questions so I can tailor this:
- Are you on Prefect 3.x, and which worker/infrastructure are you using (Process, Docker, Kubernetes)?
- Do you want a default applied across all flows in a repo (code-level) or across your whole workspace (policy-level)?