Tejaswi Bhangi
10/01/2025, 7:42 PMMarvin
10/01/2025, 7:42 PMNate
10/01/2025, 7:44 PMTejaswi Bhangi
10/01/2025, 7:44 PMNate
10/01/2025, 7:45 PMTejaswi Bhangi
10/01/2025, 7:45 PMMarvin
10/01/2025, 7:46 PMprefect.deployments.run_deployment
to kick off the next run of the same deployment (or another one) with continuation parameters.
Example
from datetime import datetime, timezone, timedelta
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
DEFAULT_BUDGET_SECONDS = 55 * 60 # stay under Cloud Run 1h; adjust as needed
HANDOFF_THRESHOLD = 5 * 60 # hand off with 5 minutes remaining
def make_deadline(budget_seconds: int):
start = datetime.now(timezone.utc)
deadline = start + timedelta(seconds=budget_seconds)
def seconds_left() -> float:
return (deadline - datetime.now(timezone.utc)).total_seconds()
return deadline, seconds_left
def do_unit_of_work(cursor: int) -> int:
# TODO: implement your work
# return the next cursor/checkpoint position
return cursor + 1
@flow
def my_flow(start_cursor: int = 0, budget_seconds: int = DEFAULT_BUDGET_SECONDS):
log = get_run_logger()
_, time_left = make_deadline(budget_seconds)
cursor = start_cursor
while True:
# Optionally bound each unit of work by remaining time (see Pattern C)
cursor = do_unit_of_work(cursor)
if time_left() < HANDOFF_THRESHOLD:
<http://log.info|log.info>(f"Time nearly up; handing off continuation from cursor={cursor}")
# Trigger the next run; pass continuation params
run_deployment(
name="my-project/my-deployment", # <flow-name/deployment-name>
parameters={"start_cursor": cursor},
as_subflow=False, # ensure a new flow run, not a subflow
)
return # exit gracefully
Pattern B: Catch SIGTERM from Cloud Run and hand off
Cloud Run sends SIGTERM shortly before it kills the container. Use Prefect’s capture_sigterm
to translate it into a catchable signal and schedule the continuation before exiting.
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from prefect.utilities.engine import capture_sigterm
from prefect.exceptions import TerminationSignal
@flow
def my_flow(...):
log = get_run_logger()
try:
with capture_sigterm():
# ... your main loop / work here ...
pass
except TerminationSignal:
log.warning("SIGTERM received from Cloud Run; scheduling continuation")
run_deployment(
name="my-project/my-deployment",
parameters={"start_cursor": ...}, # pass your checkpoint
as_subflow=False,
)
# Re-raise or return; Prefect will mark appropriately
raise
Pattern C: Bound long units of work with Prefect timeouts
If a single unit of work can overrun your budget, bound it so control returns to your “time-left” check.
Per-task timeouts via decorator:
- @task(timeout_seconds=...)
is great if you can size the unit of work.
Ad-hoc timeout around a block of code:
```
from prefect.utilities.timeout import timeout
# Example inside your loop
remaining = max(1, int(time_left() - 30)) # 30s safety margin
try:
with timeout(seconds=remaining):
cursor = do_unit_of_work(cursor)
except TimeoutError:
log.info("Unit of work timed out; handing off to next run")
run_deployment(name="my-project/my-deployment",
parameters={"start_cursor": cursor},
as_subflow=False)Marvin
10/01/2025, 7:46 PMstart_cursor
for the next run.
- Keep your unit of work small enough that you can check time frequently (and respond to SIGTERM quickly).
- Consider a conservative budget (e.g., 55 minutes) to leave time for handoff.
- As a safety net, you can also set a flow-level timeout_seconds
to avoid “zombie” runs, but proactive handoff is preferable for a clean Completed state.
Relevant docs
- run_deployment API: prefect.deployments.run_deployment
- Flow runtime info: prefect.runtime.flow_run
- Timeout utilities: prefect.utilities.timeout
Questions to tailor this for you
- Are you running the flow directly in the Cloud Run Job container, or via a Prefect worker that launches the flow?
- What’s your “handoff” strategy: trigger the same deployment with continuation parameters, or a different deployment?
- How do you want to checkpoint progress between runs (so the next run knows where to resume)?Tejaswi Bhangi
10/01/2025, 7:47 PM"timeout": {
"anyOf": [
{
"type": "integer",
"maximum": 3600,
"exclusiveMinimum": 0
},
{
"type": "null"
}
],
"title": "Job Timeout",
"default": 600,
"description": "Max allowed duration the Job may be active before Cloud Run will actively try to mark it failed and kill associated containers (maximum of 3600 seconds, 1 hour)."
},
i believe we got the template from prefect cant remember where exactly thoughTejaswi Bhangi
10/01/2025, 7:47 PMTejaswi Bhangi
10/01/2025, 7:47 PMTejaswi Bhangi
10/01/2025, 7:58 PM