Giovanni Furetti
10/16/2025, 9:30 AMfrom prefect import flow
import time
@flow(timeout_seconds=1, log_prints=True)
def show_timeouts():
print("I will execute")
time.sleep(5)
print("I will not execute")
if __name__ == "__main__":
show_timeouts()
I would expect the flow to start, print “I will execute,” and then fail because timeout_seconds is shorter than the sleep time.
However, the flow finishes without any problems. I’m using Prefect 3.4.
Could you tell me why?Marvin
10/16/2025, 9:30 AMMarvin
10/16/2025, 9:37 AMtime.sleep(...). In your example, the flow body runs synchronously with no await points or task boundaries, so the engine can’t interrupt the `sleep`; it only regains control after your function returns, at which point it marks the flow as completed.
A few notes and ways to reproduce a timeout reliably:
- If you make the flow async and use cancel-friendly waits (e.g., anyio.sleep), the timeout will trigger as expected.
- Alternatively, move the blocking work into a task and set a task timeout. The task may continue to run in a background thread briefly, but Prefect will mark it as timed out and fail the flow.
Try one of these:
1) Async flow with cooperative sleep
from prefect import flow
import anyio
@flow(timeout_seconds=1, log_prints=True)
async def show_timeouts():
print("I will execute")
await anyio.sleep(5) # cooperative, cancellation-friendly
print("I will not execute")
if __name__ == "__main__":
show_timeouts()
2) Task-level timeout (works even in sync flows)
import time
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task(timeout_seconds=1, log_prints=True)
def sleepy():
print("I will execute")
time.sleep(5) # blocking call
print("I will not execute")
@flow(log_prints=True, task_runner=ThreadPoolTaskRunner())
def show_timeouts():
sleepy()
if __name__ == "__main__":
show_timeouts()
Why this matters:
- For sync/blocking code, Python cannot reliably inject an exception into the middle of the call to preempt it. Prefect’s timeouts are enforced at safe “checkpoints” — e.g., between tasks, on awaits, or when control returns to the engine.
- If you’re on Windows, there are additional platform limitations around signal-based interruption which further prevents preemption of blocking calls.
Docs for reference:
- Flow/task timeouts and timeout utilities: prefect.utilities.timeout
If your goal is to ensure “the workflow is considered failed if it exceeds X seconds,” either approach above will do that. If you specifically need to kill a long-running process that’s blocking, the typical pattern is to:
- Run the blocking code in a subprocess you control and terminate the subprocess on timeout, or
- Use async/cancel-friendly libraries where possible.
If you can share your OS and how you’re running the script (plain Python, within an IDE, etc.), I can tailor the recommendation further.