<@ULVA73B9P> Cloud run Jobs have a max timeout of ...
# marvin-ai
t
@Marvin Cloud run Jobs have a max timeout of 1 hour. I want my flow to finish and trigger a new flow when approaching timeout (instead of just crashing). How can I track the timeout inside of the job and do so?
m
thought for 217.5 seconds
n
you can do what you're asking (the same way you'd time out any python code), but you can also just use the cloud run v2 work pool, which should use the v2 cloud run api where jobs have a timeout of a day i believe
t
we are using v2, i think the base job template timeout string for cloud run is wrong then
n
can you link to what you mean?
t
yep looking
m
Great question. You can handle Cloud Run’s hard one-hour limit by giving your flow a “time budget,” checking the remaining time periodically, and handing off to a new run before you hit the limit. You can also catch the SIGTERM Cloud Run sends at timeout as a last-resort handoff. Below are two complementary patterns you can combine. Pattern A: Proactive handoff based on a deadline - Start a timer at the beginning of the flow and compute seconds remaining. - After each unit of work, if the remaining time is below a threshold (e.g., 5 minutes), trigger the next flow run and exit cleanly. - Use
prefect.deployments.run_deployment
to kick off the next run of the same deployment (or another one) with continuation parameters. Example
Copy code
from datetime import datetime, timezone, timedelta

from prefect import flow, get_run_logger
from prefect.deployments import run_deployment

DEFAULT_BUDGET_SECONDS = 55 * 60  # stay under Cloud Run 1h; adjust as needed
HANDOFF_THRESHOLD = 5 * 60        # hand off with 5 minutes remaining

def make_deadline(budget_seconds: int):
    start = datetime.now(timezone.utc)
    deadline = start + timedelta(seconds=budget_seconds)
    def seconds_left() -> float:
        return (deadline - datetime.now(timezone.utc)).total_seconds()
    return deadline, seconds_left

def do_unit_of_work(cursor: int) -> int:
    # TODO: implement your work
    # return the next cursor/checkpoint position
    return cursor + 1

@flow
def my_flow(start_cursor: int = 0, budget_seconds: int = DEFAULT_BUDGET_SECONDS):
    log = get_run_logger()
    _, time_left = make_deadline(budget_seconds)

    cursor = start_cursor
    while True:
        # Optionally bound each unit of work by remaining time (see Pattern C)
        cursor = do_unit_of_work(cursor)

        if time_left() < HANDOFF_THRESHOLD:
            <http://log.info|log.info>(f"Time nearly up; handing off continuation from cursor={cursor}")
            # Trigger the next run; pass continuation params
            run_deployment(
                name="my-project/my-deployment",  # <flow-name/deployment-name>
                parameters={"start_cursor": cursor},
                as_subflow=False,  # ensure a new flow run, not a subflow
            )
            return  # exit gracefully
Pattern B: Catch SIGTERM from Cloud Run and hand off Cloud Run sends SIGTERM shortly before it kills the container. Use Prefect’s
capture_sigterm
to translate it into a catchable signal and schedule the continuation before exiting.
Copy code
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.utilities.engine import capture_sigterm
from prefect.exceptions import TerminationSignal

@flow
def my_flow(...):
    log = get_run_logger()
    try:
        with capture_sigterm():
            # ... your main loop / work here ...
            pass
    except TerminationSignal:
        log.warning("SIGTERM received from Cloud Run; scheduling continuation")
        run_deployment(
            name="my-project/my-deployment",
            parameters={"start_cursor": ...},  # pass your checkpoint
            as_subflow=False,
        )
        # Re-raise or return; Prefect will mark appropriately
        raise
Pattern C: Bound long units of work with Prefect timeouts If a single unit of work can overrun your budget, bound it so control returns to your “time-left” check. Per-task timeouts via decorator: -
@task(timeout_seconds=...)
is great if you can size the unit of work. Ad-hoc timeout around a block of code: ``` from prefect.utilities.timeout import timeout # Example inside your loop remaining = max(1, int(time_left() - 30)) # 30s safety margin try: with timeout(seconds=remaining): cursor = do_unit_of_work(cursor) except TimeoutError: log.info("Unit of work timed out; handing off to next run") run_deployment(name="my-project/my-deployment", parameters={"start_cursor": cursor}, as_subflow=False)
return ``` Notes and best practices - Decide where to checkpoint progress: e.g., a GCS file, a DB row, or simply a parameter like
start_cursor
for the next run. - Keep your unit of work small enough that you can check time frequently (and respond to SIGTERM quickly). - Consider a conservative budget (e.g., 55 minutes) to leave time for handoff. - As a safety net, you can also set a flow-level
timeout_seconds
to avoid “zombie” runs, but proactive handoff is preferable for a clean Completed state. Relevant docs - run_deployment API: prefect.deployments.run_deployment - Flow runtime info: prefect.runtime.flow_run - Timeout utilities: prefect.utilities.timeout Questions to tailor this for you - Are you running the flow directly in the Cloud Run Job container, or via a Prefect worker that launches the flow? - What’s your “handoff” strategy: trigger the same deployment with continuation parameters, or a different deployment? - How do you want to checkpoint progress between runs (so the next run knows where to resume)?
t
@Nate
Copy code
"timeout": {
          "anyOf": [
            {
              "type": "integer",
              "maximum": 3600,
              "exclusiveMinimum": 0
            },
            {
              "type": "null"
            }
          ],
          "title": "Job Timeout",
          "default": 600,
          "description": "Max allowed duration the Job may be active before Cloud Run will actively try to mark it failed and kill associated containers (maximum of 3600 seconds, 1 hour)."
        },
i believe we got the template from prefect cant remember where exactly though
is there a different template for cloud run v2?
yeah I remember now, we copied it from the prefect UI and added it to IaC
oh wait, my fault we had a mismatch between something clickopsed vs iac, it is correct in the cloud run v2 base job template
👍 1