<@ULVA73B9P> i have defined this workflow to test ...
# marvin-ai
k
@Marvin i have defined this workflow to test the timeout_seconds functionality to cancel the workflow if it runs for more than 1 minute from prefect import flow, get_run_logger import time from dotenv import load_dotenv from helpers.db_utils import load_env_vars_dynamic @flow(timeout_seconds=60) # Flow will be cancelled automatically after 60 seconds def test_timeout_of_workflow(): """ _A simple Prefect flow to test timeout_seconds functionality._ It runs a timer loop longer than the timeout duration to trigger flow cancellation. """ logger = get_run_logger() try: logger.info("🚀 Starting the timeout test flow...") logger.info("🔄 Loading environment variables...") _# load_dotenv()_ _# load_env_vars_dynamic()_ logger.info("✅ Environment variables loaded successfully.") logger.info("⏳ Starting simulated long-running process...") for i in range(1, 91): # Run 90 seconds — should exceed timeout logger.info(f"🕒 Timer: {i} seconds elapsed") time.sleep(1) logger.info("✅ Flow finished successfully (this should NOT happen if timeout works)") except Exception as e: logger.error(f"❌ Flow interrupted or failed: {str(e)}") finally: logger.info("🧹 Flow ended (either due to timeout or completion).") if name == "__main__": test_timeout_of_workflow() on executing the run, by creating a deployment from the prefect.yaml - name: 'Test Timeout' version: null tags: ['test'] concurrency_limit: limit: 1 collision_strategy: CANCEL_NEW description: |- Testing the timeout functionality entrypoint: flows/test_timeout.py:test_timeout_of_workflow work_pool: name: local-work-pool work_queue_name: null job_variables: {} schedules: - interval: 1800.0 anchor_date: '2025-09-09T083020.631532+00:00' timezone: UTC active: false this is not cancelling the flow run even if the time of execuion exceeds the described time as confirmed by the logs
Copy code
đź•’ Timer: 90 seconds elapsed
04:34:13 PM
prefect.flow_runs
âś… Flow finished successfully (this should NOT happen if timeout works)
04:34:14 PM
prefect.flow_runs

đź§ą Flow ended (either due to timeout or completion).
04:34:14 PM
prefect.flow_runs
Finished in state Completed()
04:34:14 PM
prefect.flow_runs
Process for flow run 'imaginary-dog' exited cleanly.
m
âś… thought for 164.1 seconds
Thanks for sharing the details — I see what’s happening. Short version: your
try/except
is catching and swallowing the timeout exception, so Prefect can’t mark the flow as timed out. In Prefect 3.x,
timeout_seconds
works by raising a timeout/cancellation exception inside the flow. If you catch
Exception
and don’t re-raise, the flow returns normally and ends in Completed. What to change - Either remove the broad
try/except
, or re-raise on timeout/cancellation so Prefect can set the correct terminal state. Example fix (sync flow)
Copy code
from prefect import flow, get_run_logger
import time

@flow(timeout_seconds=60)
def test_timeout_of_workflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>(":rocket: Starting the timeout test flow...")

    try:
        <http://logger.info|logger.info>(":hourglass_flowing_sand: Starting simulated long-running process...")
        for i in range(1, 91):  # Run 90 seconds — should exceed timeout
            <http://logger.info|logger.info>(f":clock3: Timer: {i} seconds elapsed")
            time.sleep(1)

        <http://logger.info|logger.info>(":white_check_mark: Flow finished successfully (this should NOT happen if timeout works)")

    except TimeoutError:
        # Let Prefect handle the timeout state
        logger.error(":x: Flow timed out")
        raise
    except Exception as e:
        # Re-raise other errors too so Prefect sets the state appropriately
        logger.exception(f":x: Flow failed: {e}")
        raise
    finally:
        <http://logger.info|logger.info>(":broom: Flow ended (either due to timeout or completion).")
If you prefer async (more reliable cancellation) Using async gives stronger cancellation semantics because Prefect relies on anyio’s cancellation under the hood:
Copy code
from prefect import flow, get_run_logger
import anyio

@flow(timeout_seconds=60)
async def test_timeout_of_workflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>(":rocket: Starting the timeout test flow...")

    try:
        <http://logger.info|logger.info>(":hourglass_flowing_sand: Starting simulated long-running process...")
        for i in range(1, 91):
            <http://logger.info|logger.info>(f":clock3: Timer: {i} seconds elapsed")
            await anyio.sleep(1)

        <http://logger.info|logger.info>(":white_check_mark: Flow finished successfully (this should NOT happen if timeout works)")
    except Exception:
        # Re-raise so Prefect can set the terminal state
        logger.exception(":x: Flow interrupted or failed")
        raise
    finally:
        <http://logger.info|logger.info>(":broom: Flow ended (either due to timeout or completion).")
Notes and caveats - Timeouts are best-effort for synchronous, blocking code. They raise an exception, but can’t preempt some blocking C calls while they’re running. With your 1-second sleeps, cancellation should still be observed as long as you don’t swallow the exception. - Don’t catch-and-continue on timeouts. If you need to log something, log and re-raise. - Deployments/workers don’t change the timeout semantics — the exception still needs to propagate for the run to end as Cancelled/TimedOut. Related docs - prefect.utilities.timeout (how timeouts are implemented) - How anyio powers Prefect’s async architecture (why async is more cancellation-friendly) If you try the minimal test below (no try/except), the run should not finish as Completed:
Copy code
from prefect import flow
import time

@flow(timeout_seconds=5)
def f():
    time.sleep(10)

if __name__ == "__main__":
    f()
If you’re still seeing Completed after removing the broad
except
, let me know your OS/Python version and Prefect version — there are a few platform-specific nuances we can dig into.
k
@Marvin here is my fllow from prefect import flow, get_run_logger from prefect_dbt import PrefectDbtRunner import os from prefect.blocks.system import Secret from dotenv import load_dotenv from helpers.db_utils import load_env_vars_dynamic @flow(timeout_seconds=60) def clickstream_dbt_run( database: str, models: str = "", # optional full_refresh: bool = False, store_timezone_offset_minutes: str = "330" ): """ This runs the dbt aggregation for all the clickstream models Args: database: Database name models: Specific models to run (default: 'constants') _full_refresh: Whether to do full refresh (default: True)_ _store_timezone_offset_minutes: Timezone offset (default: "330")_ """ logger = get_run_logger() # Load the environment variables for the deployment load_env_vars_dynamic( database=database, store_timezone_offset_minutes=store_timezone_offset_minutes ) logger.info(f"Prefect-Dbt Environment is Ready For Database: {database}") logger.info(f"Running For Database: {database}") logger.info(f"Models: {models}") logger.info(f"Full refresh: {full_refresh}") # Build the dbt command dbt_command = ["run", "--target", 'dynamic-target'] # Add model selection if models: dbt_command.extend(["-s", models]) # Add full refresh if specified if full_refresh: dbt_command.append("--full-refresh") logger.info(f"Executing dbt command: {' '.join(dbt_command)}") # Run dbt try: result = PrefectDbtRunner().invoke(dbt_command) logger.info("dbt run completed successfully") return result except TimeoutError: # Let Prefect handle the timeout state logger.error("❌ Flow timed out") raise except Exception as e: logger.error(f"dbt run failed: {str(e)}") raise which runs the dbt model for a specific store, how can i use the timeout in this flow, that if the flow is in the running state more than the timeout in seconds described, it should get cancel, how can i do this
m
âś… thought for 126.3 seconds
Great question — and your current flow is very close. Key point:
timeout_seconds
raises a cancellation/timeout exception in the Python runtime. If your flow is making a blocking call into an external process (like dbt via
PrefectDbtRunner().invoke()
), Prefect’s timeout will not forcibly kill that subprocess; the exception will only be raised once the call returns. To ensure the run cancels on time, you need to make the dbt call cancellation-aware and terminate the subprocess when cancellation happens. Two reliable patterns you can use: Option A (recommended): Use an async flow and run dbt via the CLI with anyio - anyio’s
run_process
terminates the child process when the cancellation/timeout triggers, so you get timely cancellation. - Keep the flow-level
timeout_seconds
, and also wrap just the dbt command with
fail_after()
for extra safety.
Copy code
from prefect import flow, get_run_logger
import anyio

@flow(timeout_seconds=60)
async def clickstream_dbt_run(
    database: str,
    models: str = "",
    full_refresh: bool = False,
    store_timezone_offset_minutes: str = "330",
):
    logger = get_run_logger()

    # Load env vars for this store (your function)
    # load_env_vars_dynamic(database=database, store_timezone_offset_minutes=store_timezone_offset_minutes)

    cmd = ["dbt", "run", "--target", "dynamic-target"]
    if models:
        cmd += ["-s", models]
    if full_refresh:
        cmd += ["--full-refresh"]

    <http://logger.info|logger.info>(f"Executing dbt command: {' '.join(cmd)}")

    try:
        # Explicit subprocess timeout in addition to the flow-level timeout
        with anyio.fail_after(60):
            result = await anyio.run_process(cmd, check=True)
        <http://logger.info|logger.info>("dbt run completed successfully")
        return result.returncode
    except TimeoutError:
        logger.error(":x: dbt run timed out; subprocess was terminated")
        raise
    except Exception as e:
        logger.exception(f"dbt run failed: {e}")
        raise
Notes: - Ensure
dbt
is available in the worker’s environment. - On cancellation,
anyio.run_process
will terminate the process;
fail_after
ensures we don’t rely solely on the flow-level timeout. - Docs: anyio.run_process, Prefect timeout utilities Option B: Stay synchronous and manage the subprocess yourself - If you prefer sync code, call the dbt CLI via
subprocess.Popen
and kill the process group on timeout/cancellation.
Copy code
import subprocess, os, signal

def run_dbt_with_timeout(cmd: list[str], seconds: int):
    # Start dbt in its own process group so we can terminate all children
    proc = subprocess.Popen(cmd, start_new_session=True)
    try:
        proc.wait(timeout=seconds)
        return proc.returncode
    except subprocess.TimeoutExpired:
        # Terminate the whole group
        os.killpg(proc.pid, signal.SIGTERM)
        try:
            proc.wait(timeout=10)
        except subprocess.TimeoutExpired:
            os.killpg(proc.pid, signal.SIGKILL)
        raise TimeoutError("dbt process timed out")

from prefect import flow, get_run_logger

@flow(timeout_seconds=60)
def clickstream_dbt_run(database: str, models: str = "", full_refresh: bool = False, store_timezone_offset_minutes: str = "330"):
    logger = get_run_logger()

    cmd = ["dbt", "run", "--target", "dynamic-target"]
    if models:
        cmd += ["-s", models]
    if full_refresh:
        cmd += ["--full-refresh"]

    <http://logger.info|logger.info>(f"Executing dbt command: {' '.join(cmd)}")

    try:
        rc = run_dbt_with_timeout(cmd, seconds=60)
        <http://logger.info|logger.info>("dbt run completed successfully")
        return rc
    except TimeoutError:
        logger.error(":x: dbt run timed out")
        raise
    except Exception as e:
        logger.exception(f"dbt run failed: {e}")
        raise
Why not rely on PrefectDbtRunner().invoke() for cancellation?
-
PrefectDbtRunner.invoke()
is a synchronous, blocking call. When the flow times out, Prefect raises a timeout exception in Python, but it doesn’t automatically kill the dbt process run by the runner. That means the exception may not be observed until dbt exits on its own. Managing the subprocess directly (as above) lets you terminate dbt immediately on timeout. General guidance - Keep catching
TimeoutError
and re-raise it so Prefect can mark the run as cancelled/timed out. - Prefer async + anyio for the cleanest cancellation semantics when running external commands. - If you must keep using
PrefectDbtRunner
, consider running dbt via the CLI instead in your flow for predictable termination behavior. References - prefect.utilities.timeout (how flow/task timeouts work) - prefect-dbt docs (integration overview) - anyio.run_process (subprocess cancellation behavior) If you’re open to the async/CLI route, I recommend Option A. If you want to stick with
PrefectDbtRunner
, I can help adapt your flow to use the CLI just for execution while keeping your current environment setup. What environment/OS are your workers running on? That can influence the best pattern.
k
why isnt the option timeout_seconds working @Marvin for any flow(with without try/catch, with Prefect
m
âś… thought for 43.1 seconds
Great question. Here’s the “why” and what to do about it. Why
timeout_seconds
can appear to “not work” - It’s a soft timeout. In Prefect 3.x,
timeout_seconds
raises a timeout/cancellation exception inside your flow. It does not forcibly preempt blocking C calls or kill child processes. If your flow is dominated by
time.sleep(...)
or a blocking external process (like dbt via
PrefectDbtRunner().invoke()
), the exception can only be delivered once control returns to Python. - Broad
try/except
can mask it. If you catch
Exception
(or
TimeoutError
) and don’t re-raise, Prefect sees a normal return and marks the run Completed. - Platform matters. On Windows in particular, synchronous timeouts cannot interrupt many blocking calls and can be unreliable. Async flows cancel much more reliably. - External subprocesses aren’t killed. Flow cancellation won’t automatically terminate a dbt subprocess created inside the flow unless you run it in a cancellation-aware way. How to make timeouts reliable in your dbt flow Pick one of these: 1) Recommended: Async flow + anyio, which cancels and terminates the subprocess on timeout
Copy code
from prefect import flow, get_run_logger
import anyio

@flow(timeout_seconds=60)
async def clickstream_dbt_run(database: str, models: str = "", full_refresh: bool = False, store_timezone_offset_minutes: str = "330"):
    logger = get_run_logger()

    # load_env_vars_dynamic(database=database, store_timezone_offset_minutes=store_timezone_offset_minutes)

    cmd = ["dbt", "run", "--target", "dynamic-target"]
    if models:
        cmd += ["-s", models]
    if full_refresh:
        cmd += ["--full-refresh"]

    <http://logger.info|logger.info>(f"Executing dbt command: {' '.join(cmd)}")

    try:
        # Fail fast if the dbt subprocess exceeds 60s; anyio will terminate it on cancellation
        with anyio.fail_after(60):
            result = await anyio.run_process(cmd, check=True)
        <http://logger.info|logger.info>("dbt run completed successfully")
        return result.returncode
    except TimeoutError:
        logger.error(":x: dbt run timed out; subprocess was terminated")
        raise
    except Exception as e:
        logger.exception(f"dbt run failed: {e}")
        raise
2) If you must stay synchronous: manage the subprocess and kill it on timeout
Copy code
import subprocess, os, signal
from prefect import flow, get_run_logger

def run_dbt_with_timeout(cmd: list[str], seconds: int):
    # Start dbt in its own process group so we can terminate all children
    proc = subprocess.Popen(cmd, start_new_session=True)
    try:
        proc.wait(timeout=seconds)
        return proc.returncode
    except subprocess.TimeoutExpired:
        os.killpg(proc.pid, signal.SIGTERM)
        try:
            proc.wait(timeout=10)
        except subprocess.TimeoutExpired:
            os.killpg(proc.pid, signal.SIGKILL)
        raise TimeoutError("dbt process timed out")

@flow(timeout_seconds=60)
def clickstream_dbt_run(database: str, models: str = "", full_refresh: bool = False, store_timezone_offset_minutes: str = "330"):
    logger = get_run_logger()

    cmd = ["dbt", "run", "--target", "dynamic-target"]
    if models:
        cmd += ["-s", models]
    if full_refresh:
        cmd += ["--full-refresh"]

    <http://logger.info|logger.info>(f"Executing dbt command: {' '.join(cmd)}")

    try:
        rc = run_dbt_with_timeout(cmd, seconds=60)
        <http://logger.info|logger.info>("dbt run completed successfully")
        return rc
    except TimeoutError:
        logger.error(":x: dbt run timed out")
        raise
    except Exception as e:
        logger.exception(f"dbt run failed: {e}")
        raise
If you keep using
PrefectDbtRunner().invoke(...)
- It’s a blocking call. Prefect’s timeout will not kill the dbt process started by the runner; the flow will only observe the timeout once the runner returns. - If timeout enforcement is critical, prefer the async/CLI approach above. Quick sanity checks
- Test an async flow with
await anyio.sleep(120)
and
@flow(timeout_seconds=60)
. It should cancel around 60s. - Test a sync flow without any try/except:
Copy code
from prefect import flow
import time

@flow(timeout_seconds=5)
def f():
    time.sleep(10)

if __name__ == "__main__":
    f()
If this still completes on your machine, share your OS, Python, and Prefect versions — platform specifics (especially Windows) can affect sync timeout behavior. References - prefect.utilities.timeout (how timeouts work) - prefect-dbt docs - anyio.run_process (cancellation-aware subprocess) If you can share: - OS and Python version - Prefect version - How/where the worker runs (local, Docker, Kubernetes) I can tailor the recommendation to your environment.