<@ULVA73B9P> I’m trying to reproduce the case wher...
# marvin-ai
g
@Marvin I’m trying to reproduce the case where a workflow gets shut down if it runs for too long. I took the example from the documentation here: https://docs.prefect.io/v3/how-to-guides/workflows/write-and-run#cancel-a-workflow-if-it-runs-for-too-long The script I’m running is this:
Copy code
from prefect import flow
import time

@flow(timeout_seconds=1, log_prints=True)
def show_timeouts():
    print("I will execute")
    time.sleep(5)
    print("I will not execute")


if __name__ == "__main__":
    show_timeouts()
I would expect the flow to start, print “I will execute,” and then fail because
timeout_seconds
is shorter than the sleep time. However, the flow finishes without any problems. I’m using Prefect 3.4. Could you tell me why?
m
thought for 458.9 seconds
Great question — and thanks for sharing the exact snippet. What you’re seeing is due to how timeouts are enforced in Prefect 3.x: flow and task timeouts are cooperative. They don’t forcibly preempt blocking synchronous calls like
time.sleep(...)
. In your example, the flow body runs synchronously with no await points or task boundaries, so the engine can’t interrupt the `sleep`; it only regains control after your function returns, at which point it marks the flow as completed. A few notes and ways to reproduce a timeout reliably: - If you make the flow async and use cancel-friendly waits (e.g.,
anyio.sleep
), the timeout will trigger as expected. - Alternatively, move the blocking work into a task and set a task timeout. The task may continue to run in a background thread briefly, but Prefect will mark it as timed out and fail the flow. Try one of these: 1) Async flow with cooperative sleep
Copy code
from prefect import flow
import anyio

@flow(timeout_seconds=1, log_prints=True)
async def show_timeouts():
    print("I will execute")
    await anyio.sleep(5)  # cooperative, cancellation-friendly
    print("I will not execute")

if __name__ == "__main__":
    show_timeouts()
2) Task-level timeout (works even in sync flows)
Copy code
import time
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task(timeout_seconds=1, log_prints=True)
def sleepy():
    print("I will execute")
    time.sleep(5)  # blocking call
    print("I will not execute")

@flow(log_prints=True, task_runner=ThreadPoolTaskRunner())
def show_timeouts():
    sleepy()

if __name__ == "__main__":
    show_timeouts()
Why this matters: - For sync/blocking code, Python cannot reliably inject an exception into the middle of the call to preempt it. Prefect’s timeouts are enforced at safe “checkpoints” — e.g., between tasks, on awaits, or when control returns to the engine. - If you’re on Windows, there are additional platform limitations around signal-based interruption which further prevents preemption of blocking calls. Docs for reference: - Flow/task timeouts and timeout utilities: prefect.utilities.timeout If your goal is to ensure “the workflow is considered failed if it exceeds X seconds,” either approach above will do that. If you specifically need to kill a long-running process that’s blocking, the typical pattern is to: - Run the blocking code in a subprocess you control and terminate the subprocess on timeout, or - Use async/cancel-friendly libraries where possible. If you can share your OS and how you’re running the script (plain Python, within an IDE, etc.), I can tailor the recommendation further.