Zach Schumacher
07/13/2021, 2:33 PM@task(name="set-date-params", nout=2)
def set_date_params(
start_date: typing.Optional[str],
end_date: typing.Optional[str]
) -> typing.Tuple[datetime.date, datetime.date]:
"""
If start date and end date are explicitly set on the Flow, we simply construct date objects and return them.
If start_date is not set, we default it to 7 days from the local context date of the flow.
If end_date is not set, we default it to the local context date of the flow.
"""
context_run_datetime = context.get("date")
aware_utc_run_datetime = context_run_datetime.replace(tzinfo=utc)
aware_et_run_datetime = aware_utc_run_datetime.astimezone(et)
if not start_date:
seven_days_ago = aware_et_run_datetime - datetime.timedelta(days=7)
start_date = seven_days_ago.date().isoformat()
if not end_date:
end_date = aware_et_run_datetime.date().isoformat()
<http://logger.info|logger.info>(f"start_date={start_date}, end_date={end_date}")
return datetime.date.fromisoformat(start_date), datetime.date.fromisoformat(end_date)
with flow:
start_date_, end_date_ = Parameter("start-date", default=None), Parameter("end-date", default=None)
start_date_, end_date_ = set_date_params(start_date_, end_date_)
Kevin Kho
Zach Schumacher
07/13/2021, 2:36 PM