Emma Rizzi
03/14/2022, 9:54 AMStéphan Taljaard
03/14/2022, 10:13 AMfrom datetime import datetime
from typing import Tuple
import pendulum
import prefect
from prefect import task
TIMEZONE = pendulum.timezone("Africa/Johannesburg")
@task
def generate_date(date_parameter, hours_delta=None) -> datetime: # pendulum.datetime.DateTime
"""Generate a date value from parameter inputs"""
if not hours_delta:
minutes_delta_log = ""
if date_parameter == "scheduled_start_time":
date_ = prefect.context.get("scheduled_start_time")
elif date_parameter == "yesterday":
date_ = pendulum.today(TIMEZONE).subtract(days=1)
elif date_parameter == "today":
date_ = pendulum.today(TIMEZONE)
elif date_parameter == "now":
date_ = pendulum.now(TIMEZONE)
else:
date_ = pendulum.parse(date_parameter, tz=TIMEZONE)
else:
if not isinstance(date_parameter, datetime):
raise TypeError("date_parameter should be a DateTime object")
minutes_delta = round(abs(float(hours_delta)) * 60)
minutes_delta_log = f" - {minutes_delta} minutes"
date_ = pendulum.instance(date_parameter).subtract(minutes=minutes_delta)
date_ = date_.in_timezone(TIMEZONE)
prefect.context.get("logger").info(f"Generated date for `{date_parameter}`{minutes_delta_log}: {date_}")
return date_
@task
def check_if_start_hours_delta_given(start_hours_delta_parameter):
return bool(start_hours_delta_parameter)
@task
def generate_start_and_end_dates(
start_date, start_date_hours_delta, end_date
) -> Tuple[pendulum.DateTime, pendulum.DateTime]:
"""
Generates a start and end date that can be used in queries.
The start date can be a fixed value, or amount of hours to offset the end date with.
If `hours` is given, it will be used instead of a fixed start date.
"""
date_end = generate_date.run(end_date)
use_start_hours_delta = check_if_start_hours_delta_given.run(start_date_hours_delta)
if use_start_hours_delta:
prefect.context.get("logger").info("Using hours delta")
date_start = generate_date.run(date_end, start_date_hours_delta)
else:
date_start = generate_date.run(start_date)
return date_start, date_end
I then use generate_start_and_end_dates
in all my flowsAnna Geller
03/14/2022, 10:13 AMprefect.context.get("scheduled_start_time")
You need to be careful to call it only within a task since the context information is populated at runtime, while the Flow gets built at registration time. That's why passing it as the default parameter value may not work as you intend it and you may need to pass it as data dependency as here:
import prefect
from prefect import Flow, task
from prefect.core.parameter import DateTimeParameter
@task
def get_scheduled_start_time():
return prefect.context.get("scheduled_start_time")
@task(log_stdout=True)
def print_val(x):
print(x)
@task(log_stdout=True)
def determine_parameter_to_use(sched_start_time, dt_param_val):
if dt_param_val:
print(dt_param_value)
else:
print(sched_start_time)
with Flow("parametrized_dt_flow") as flow:
sched_start_time = get_scheduled_start_time()
print_val(sched_start_time)
dt_param_value = DateTimeParameter("some_dt", required=False)
determine_parameter_to_use(sched_start_time, dt_param_value)
if __name__ == "__main__":
flow.run()
Emma Rizzi
03/14/2022, 10:45 AM