Kiran
09/03/2025, 12:11 PMfrom prefect import flow, get_run_context
from datetime import date, timedelta, datetime
def resolve_date(date_param: str, run_time: datetime) -> str:
if date_param == "today":
return str(run_time.date())
elif date_param == "yesterday":
return str((run_time - timedelta(days=1)).date())
else:
# assume it's already YYYY-MM-DD
return date_param
@flow
def actual_flow(run_date: str):
ctx = get_run_context()
# resolve only once, based on expected_start_time (not retry time!)
resolved_date = resolve_date(run_date, ctx.flow_run.expected_start_time)
print(f"Running for resolved date: {resolved_date}")
Marvin
09/03/2025, 12:11 PMMarvin
09/03/2025, 12:13 PMctx.flow_run.expected_start_time
is not valid. Use the runtime API’s prefect.runtime.flow_run.scheduled_start_time
instead.
Assuming Prefect 3.x, here’s a working version that resolves the date once based on the scheduled start time (not the retry time), with a safe fallback for ad‑hoc runs:
from datetime import timedelta, datetime, timezone
from prefect import flow
from prefect.runtime import flow_run
def resolve_date(date_param: str, run_time: datetime) -> str:
if date_param == "today":
return run_time.date().isoformat()
elif date_param == "yesterday":
return (run_time - timedelta(days=1)).date().isoformat()
else:
# assume it's already YYYY-MM-DD
return date_param
@flow
def actual_flow(run_date: str):
# Use the deployment's scheduled time; fall back to "now" for manual runs
run_time = flow_run.scheduled_start_time or datetime.now(timezone.utc)
resolved_date = resolve_date(run_date, run_time)
print(f"Running for resolved date: {resolved_date}")
Notes:
- get_run_context()
exists in 3.x, but the expected_start_time
attribute you used is from 2.x. In 3.x, prefer prefect.runtime.flow_run.scheduled_start_time
.
- scheduled_start_time
is the time the run was scheduled to begin, so it won’t change across retries.
Refs:
- Runtime context (Prefect 3.x)
- Schedules and scheduled start time