Kartik
11/06/2025, 11:06 AMđź•’ Timer: 90 seconds elapsed
04:34:13 PM
prefect.flow_runs
âś… Flow finished successfully (this should NOT happen if timeout works)
04:34:14 PM
prefect.flow_runs
đź§ą Flow ended (either due to timeout or completion).
04:34:14 PM
prefect.flow_runs
Finished in state Completed()
04:34:14 PM
prefect.flow_runs
Process for flow run 'imaginary-dog' exited cleanly.Marvin
11/06/2025, 11:06 AMMarvin
11/06/2025, 11:09 AMtry/except is catching and swallowing the timeout exception, so Prefect can’t mark the flow as timed out. In Prefect 3.x, timeout_seconds works by raising a timeout/cancellation exception inside the flow. If you catch Exception and don’t re-raise, the flow returns normally and ends in Completed.
What to change
- Either remove the broad try/except, or re-raise on timeout/cancellation so Prefect can set the correct terminal state.
Example fix (sync flow)
from prefect import flow, get_run_logger
import time
@flow(timeout_seconds=60)
def test_timeout_of_workflow():
logger = get_run_logger()
<http://logger.info|logger.info>(":rocket: Starting the timeout test flow...")
try:
<http://logger.info|logger.info>(":hourglass_flowing_sand: Starting simulated long-running process...")
for i in range(1, 91): # Run 90 seconds — should exceed timeout
<http://logger.info|logger.info>(f":clock3: Timer: {i} seconds elapsed")
time.sleep(1)
<http://logger.info|logger.info>(":white_check_mark: Flow finished successfully (this should NOT happen if timeout works)")
except TimeoutError:
# Let Prefect handle the timeout state
logger.error(":x: Flow timed out")
raise
except Exception as e:
# Re-raise other errors too so Prefect sets the state appropriately
logger.exception(f":x: Flow failed: {e}")
raise
finally:
<http://logger.info|logger.info>(":broom: Flow ended (either due to timeout or completion).")
If you prefer async (more reliable cancellation)
Using async gives stronger cancellation semantics because Prefect relies on anyio’s cancellation under the hood:
from prefect import flow, get_run_logger
import anyio
@flow(timeout_seconds=60)
async def test_timeout_of_workflow():
logger = get_run_logger()
<http://logger.info|logger.info>(":rocket: Starting the timeout test flow...")
try:
<http://logger.info|logger.info>(":hourglass_flowing_sand: Starting simulated long-running process...")
for i in range(1, 91):
<http://logger.info|logger.info>(f":clock3: Timer: {i} seconds elapsed")
await anyio.sleep(1)
<http://logger.info|logger.info>(":white_check_mark: Flow finished successfully (this should NOT happen if timeout works)")
except Exception:
# Re-raise so Prefect can set the terminal state
logger.exception(":x: Flow interrupted or failed")
raise
finally:
<http://logger.info|logger.info>(":broom: Flow ended (either due to timeout or completion).")
Notes and caveats
- Timeouts are best-effort for synchronous, blocking code. They raise an exception, but can’t preempt some blocking C calls while they’re running. With your 1-second sleeps, cancellation should still be observed as long as you don’t swallow the exception.
- Don’t catch-and-continue on timeouts. If you need to log something, log and re-raise.
- Deployments/workers don’t change the timeout semantics — the exception still needs to propagate for the run to end as Cancelled/TimedOut.
Related docs
- prefect.utilities.timeout (how timeouts are implemented)
- How anyio powers Prefect’s async architecture (why async is more cancellation-friendly)
If you try the minimal test below (no try/except), the run should not finish as Completed:
from prefect import flow
import time
@flow(timeout_seconds=5)
def f():
time.sleep(10)
if __name__ == "__main__":
f()
If you’re still seeing Completed after removing the broad except, let me know your OS/Python version and Prefect version — there are a few platform-specific nuances we can dig into.Kartik
11/06/2025, 11:33 AMMarvin
11/06/2025, 11:33 AMMarvin
11/06/2025, 11:35 AMtimeout_seconds raises a cancellation/timeout exception in the Python runtime. If your flow is making a blocking call into an external process (like dbt via PrefectDbtRunner().invoke()), Prefect’s timeout will not forcibly kill that subprocess; the exception will only be raised once the call returns. To ensure the run cancels on time, you need to make the dbt call cancellation-aware and terminate the subprocess when cancellation happens.
Two reliable patterns you can use:
Option A (recommended): Use an async flow and run dbt via the CLI with anyio
- anyio’s run_process terminates the child process when the cancellation/timeout triggers, so you get timely cancellation.
- Keep the flow-level timeout_seconds, and also wrap just the dbt command with fail_after() for extra safety.
from prefect import flow, get_run_logger
import anyio
@flow(timeout_seconds=60)
async def clickstream_dbt_run(
database: str,
models: str = "",
full_refresh: bool = False,
store_timezone_offset_minutes: str = "330",
):
logger = get_run_logger()
# Load env vars for this store (your function)
# load_env_vars_dynamic(database=database, store_timezone_offset_minutes=store_timezone_offset_minutes)
cmd = ["dbt", "run", "--target", "dynamic-target"]
if models:
cmd += ["-s", models]
if full_refresh:
cmd += ["--full-refresh"]
<http://logger.info|logger.info>(f"Executing dbt command: {' '.join(cmd)}")
try:
# Explicit subprocess timeout in addition to the flow-level timeout
with anyio.fail_after(60):
result = await anyio.run_process(cmd, check=True)
<http://logger.info|logger.info>("dbt run completed successfully")
return result.returncode
except TimeoutError:
logger.error(":x: dbt run timed out; subprocess was terminated")
raise
except Exception as e:
logger.exception(f"dbt run failed: {e}")
raise
Notes:
- Ensure dbt is available in the worker’s environment.
- On cancellation, anyio.run_process will terminate the process; fail_after ensures we don’t rely solely on the flow-level timeout.
- Docs: anyio.run_process, Prefect timeout utilities
Option B: Stay synchronous and manage the subprocess yourself
- If you prefer sync code, call the dbt CLI via subprocess.Popen and kill the process group on timeout/cancellation.
import subprocess, os, signal
def run_dbt_with_timeout(cmd: list[str], seconds: int):
# Start dbt in its own process group so we can terminate all children
proc = subprocess.Popen(cmd, start_new_session=True)
try:
proc.wait(timeout=seconds)
return proc.returncode
except subprocess.TimeoutExpired:
# Terminate the whole group
os.killpg(proc.pid, signal.SIGTERM)
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
os.killpg(proc.pid, signal.SIGKILL)
raise TimeoutError("dbt process timed out")
from prefect import flow, get_run_logger
@flow(timeout_seconds=60)
def clickstream_dbt_run(database: str, models: str = "", full_refresh: bool = False, store_timezone_offset_minutes: str = "330"):
logger = get_run_logger()
cmd = ["dbt", "run", "--target", "dynamic-target"]
if models:
cmd += ["-s", models]
if full_refresh:
cmd += ["--full-refresh"]
<http://logger.info|logger.info>(f"Executing dbt command: {' '.join(cmd)}")
try:
rc = run_dbt_with_timeout(cmd, seconds=60)
<http://logger.info|logger.info>("dbt run completed successfully")
return rc
except TimeoutError:
logger.error(":x: dbt run timed out")
raise
except Exception as e:
logger.exception(f"dbt run failed: {e}")
raise
Why not rely on PrefectDbtRunner().invoke() for cancellation?Marvin
11/06/2025, 11:35 AMPrefectDbtRunner.invoke() is a synchronous, blocking call. When the flow times out, Prefect raises a timeout exception in Python, but it doesn’t automatically kill the dbt process run by the runner. That means the exception may not be observed until dbt exits on its own. Managing the subprocess directly (as above) lets you terminate dbt immediately on timeout.
General guidance
- Keep catching TimeoutError and re-raise it so Prefect can mark the run as cancelled/timed out.
- Prefer async + anyio for the cleanest cancellation semantics when running external commands.
- If you must keep using PrefectDbtRunner, consider running dbt via the CLI instead in your flow for predictable termination behavior.
References
- prefect.utilities.timeout (how flow/task timeouts work)
- prefect-dbt docs (integration overview)
- anyio.run_process (subprocess cancellation behavior)
If you’re open to the async/CLI route, I recommend Option A. If you want to stick with PrefectDbtRunner, I can help adapt your flow to use the CLI just for execution while keeping your current environment setup. What environment/OS are your workers running on? That can influence the best pattern.Kartik
11/06/2025, 12:50 PMMarvin
11/06/2025, 12:51 PMMarvin
11/06/2025, 12:51 PMtimeout_seconds can appear to “not work”
- It’s a soft timeout. In Prefect 3.x, timeout_seconds raises a timeout/cancellation exception inside your flow. It does not forcibly preempt blocking C calls or kill child processes. If your flow is dominated by time.sleep(...) or a blocking external process (like dbt via PrefectDbtRunner().invoke()), the exception can only be delivered once control returns to Python.
- Broad try/except can mask it. If you catch Exception (or TimeoutError) and don’t re-raise, Prefect sees a normal return and marks the run Completed.
- Platform matters. On Windows in particular, synchronous timeouts cannot interrupt many blocking calls and can be unreliable. Async flows cancel much more reliably.
- External subprocesses aren’t killed. Flow cancellation won’t automatically terminate a dbt subprocess created inside the flow unless you run it in a cancellation-aware way.
How to make timeouts reliable in your dbt flow
Pick one of these:
1) Recommended: Async flow + anyio, which cancels and terminates the subprocess on timeout
from prefect import flow, get_run_logger
import anyio
@flow(timeout_seconds=60)
async def clickstream_dbt_run(database: str, models: str = "", full_refresh: bool = False, store_timezone_offset_minutes: str = "330"):
logger = get_run_logger()
# load_env_vars_dynamic(database=database, store_timezone_offset_minutes=store_timezone_offset_minutes)
cmd = ["dbt", "run", "--target", "dynamic-target"]
if models:
cmd += ["-s", models]
if full_refresh:
cmd += ["--full-refresh"]
<http://logger.info|logger.info>(f"Executing dbt command: {' '.join(cmd)}")
try:
# Fail fast if the dbt subprocess exceeds 60s; anyio will terminate it on cancellation
with anyio.fail_after(60):
result = await anyio.run_process(cmd, check=True)
<http://logger.info|logger.info>("dbt run completed successfully")
return result.returncode
except TimeoutError:
logger.error(":x: dbt run timed out; subprocess was terminated")
raise
except Exception as e:
logger.exception(f"dbt run failed: {e}")
raise
2) If you must stay synchronous: manage the subprocess and kill it on timeout
import subprocess, os, signal
from prefect import flow, get_run_logger
def run_dbt_with_timeout(cmd: list[str], seconds: int):
# Start dbt in its own process group so we can terminate all children
proc = subprocess.Popen(cmd, start_new_session=True)
try:
proc.wait(timeout=seconds)
return proc.returncode
except subprocess.TimeoutExpired:
os.killpg(proc.pid, signal.SIGTERM)
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
os.killpg(proc.pid, signal.SIGKILL)
raise TimeoutError("dbt process timed out")
@flow(timeout_seconds=60)
def clickstream_dbt_run(database: str, models: str = "", full_refresh: bool = False, store_timezone_offset_minutes: str = "330"):
logger = get_run_logger()
cmd = ["dbt", "run", "--target", "dynamic-target"]
if models:
cmd += ["-s", models]
if full_refresh:
cmd += ["--full-refresh"]
<http://logger.info|logger.info>(f"Executing dbt command: {' '.join(cmd)}")
try:
rc = run_dbt_with_timeout(cmd, seconds=60)
<http://logger.info|logger.info>("dbt run completed successfully")
return rc
except TimeoutError:
logger.error(":x: dbt run timed out")
raise
except Exception as e:
logger.exception(f"dbt run failed: {e}")
raise
If you keep using PrefectDbtRunner().invoke(...)
- It’s a blocking call. Prefect’s timeout will not kill the dbt process started by the runner; the flow will only observe the timeout once the runner returns.
- If timeout enforcement is critical, prefer the async/CLI approach above.
Quick sanity checksMarvin
11/06/2025, 12:51 PMawait anyio.sleep(120) and @flow(timeout_seconds=60). It should cancel around 60s.
- Test a sync flow without any try/except:
from prefect import flow
import time
@flow(timeout_seconds=5)
def f():
time.sleep(10)
if __name__ == "__main__":
f()
If this still completes on your machine, share your OS, Python, and Prefect versions — platform specifics (especially Windows) can affect sync timeout behavior.
References
- prefect.utilities.timeout (how timeouts work)
- prefect-dbt docs
- anyio.run_process (cancellation-aware subprocess)
If you can share:
- OS and Python version
- Prefect version
- How/where the worker runs (local, Docker, Kubernetes)
I can tailor the recommendation to your environment.