Didier Marin
07/29/2021, 1:28 PMKevin Kho
Zach Schumacher
07/29/2021, 2:31 PM@task(name="set-date-params", nout=2)
def set_date_params(start_date: typing.Optional[str], end_date: typing.Optional[str]) -> typing.Tuple[date, date]:
"""
If start date and end date are explicitly set on the Flow, we simply construct date objects and return them.
If start_date is not set, we default it to 7 days from the local context date of the flow.
If end_date is not set, we default it to the local context date of the flow.
"""
context_run_datetime = context.get("scheduled_start_time")
aware_utc_run_datetime = context_run_datetime.replace(tzinfo=utc)
aware_et_run_datetime = aware_utc_run_datetime.astimezone(et)
if not start_date:
seven_days_ago = aware_et_run_datetime - timedelta(days=7)
start_date = seven_days_ago.date().isoformat()
if not end_date:
end_date = aware_et_run_datetime.date().isoformat()
<http://logger.info|logger.info>(f"start_date={start_date}, end_date={end_date}")
start_date = cast(str, start_date)
end_date = cast(str, end_date)
return date.fromisoformat(start_date), date.fromisoformat(end_date)
Zach Schumacher
07/29/2021, 2:31 PMZach Schumacher
07/29/2021, 2:31 PMwith flow:
start_date = Parameter("start-date", default=None)
end_date = Parameter("end-date", default=None)
start_date, end_date = set_date_params(start_date, end_date)
Didier Marin
07/29/2021, 2:45 PM