https://prefect.io logo
Title
e

Emma Rizzi

03/14/2022, 9:54 AM
Hello! I have a flow that uses DateTimeParameters, I would like to schedule it with one date being default to the day of the schedule, or first day of the month for example, is there a way to do it through schedules or should I handle this in my code ?
s

Stéphan Taljaard

03/14/2022, 10:13 AM
Hi Emma I also have ETL flows, where I need to give a start and end date. These are the tasks I wrote to generate the dates from parameters when the task is run:
from datetime import datetime
from typing import Tuple

import pendulum
import prefect
from prefect import task

TIMEZONE = pendulum.timezone("Africa/Johannesburg")


@task
def generate_date(date_parameter, hours_delta=None) -> datetime:  # pendulum.datetime.DateTime
    """Generate a date value from parameter inputs"""
    if not hours_delta:
        minutes_delta_log = ""
        if date_parameter == "scheduled_start_time":
            date_ = prefect.context.get("scheduled_start_time")
        elif date_parameter == "yesterday":
            date_ = pendulum.today(TIMEZONE).subtract(days=1)
        elif date_parameter == "today":
            date_ = pendulum.today(TIMEZONE)
        elif date_parameter == "now":
            date_ = pendulum.now(TIMEZONE)
        else:
            date_ = pendulum.parse(date_parameter, tz=TIMEZONE)

    else:
        if not isinstance(date_parameter, datetime):
            raise TypeError("date_parameter should be a DateTime object")
        minutes_delta = round(abs(float(hours_delta)) * 60)
        minutes_delta_log = f" - {minutes_delta} minutes"
        date_ = pendulum.instance(date_parameter).subtract(minutes=minutes_delta)

    date_ = date_.in_timezone(TIMEZONE)
    prefect.context.get("logger").info(f"Generated date for `{date_parameter}`{minutes_delta_log}: {date_}")
    return date_


@task
def check_if_start_hours_delta_given(start_hours_delta_parameter):
    return bool(start_hours_delta_parameter)


@task
def generate_start_and_end_dates(
    start_date, start_date_hours_delta, end_date
) -> Tuple[pendulum.DateTime, pendulum.DateTime]:
    """
    Generates a start and end date that can be used in queries.
    The start date can be a fixed value, or amount of hours to offset the end date with.
    If `hours` is given, it will be used instead of a fixed start date.
    """
    date_end = generate_date.run(end_date)
    use_start_hours_delta = check_if_start_hours_delta_given.run(start_date_hours_delta)

    if use_start_hours_delta:
        prefect.context.get("logger").info("Using hours delta")
        date_start = generate_date.run(date_end, start_date_hours_delta)
    else:
        date_start = generate_date.run(start_date)

    return date_start, date_end
I then use
generate_start_and_end_dates
in all my flows
:upvote: 1
:party-parrot: 1
a

Anna Geller

03/14/2022, 10:13 AM
While it's possible to retrieve the date of your schedule using the context:
prefect.context.get("scheduled_start_time")
You need to be careful to call it only within a task since the context information is populated at runtime, while the Flow gets built at registration time. That's why passing it as the default parameter value may not work as you intend it and you may need to pass it as data dependency as here:
import prefect
from prefect import Flow, task
from prefect.core.parameter import DateTimeParameter


@task
def get_scheduled_start_time():
    return prefect.context.get("scheduled_start_time")


@task(log_stdout=True)
def print_val(x):
    print(x)


@task(log_stdout=True)
def determine_parameter_to_use(sched_start_time, dt_param_val):
    if dt_param_val:
        print(dt_param_value)
    else:
        print(sched_start_time)


with Flow("parametrized_dt_flow") as flow:
    sched_start_time = get_scheduled_start_time()
    print_val(sched_start_time)
    dt_param_value = DateTimeParameter("some_dt", required=False)
    determine_parameter_to_use(sched_start_time, dt_param_value)

if __name__ == "__main__":
    flow.run()
:party-parrot: 1
❤️ 1
e

Emma Rizzi

03/14/2022, 10:45 AM
@Stéphan Taljaard @Anna Geller thank you both for your inputs ! looks like exactly what I'm trying to do I didnt know about the pendulum library also, it's very interesting thanks for sharing!
👍 2