Jacob Goldberg
08/16/2021, 10:16 PMstart_date is last Monday and the end_date is this Monday. However I also want the ability to override the default date inputs (e.g. to backfill data). I thought the DateTimeParameter would be perfect for this but I am having trouble getting it working. Here is a sample snippet:
import datetime as dt
from prefect import Flow
from prefect.core.parameter import DateTimeParameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
schedule = Schedule(clocks=[CronClock("0 9 * * 1")])
with Flow(name="test", schedule=schedule) as test_flow:
start_date = DateTimeParameter("start_date", required=False) or dt.date.today() - dt.timedelta(days=7)
end_date = DateTimeParameter("end_date", required=False) or dt.date.today()
my_process(start_date, end_date)
This issue is that my_process() requires a datetime object but is receiving NoneType. It seems the or operator is not working as I would expect when defining start_date and end_date. Although DateTimeParameter "evaluates" to None in Prefect, the or operator sees them as a Prefect Parameters, so start_date and end_date both get defined as None. What is the proper way to structure the Flow so that it has default datetime objects that are run on a schedule, but I have the ability to override them via Prefect Cloud?Kevin Kho
a or b will evaluate to a for whatever value of a that is not None. So the DateTimeParameter is still something and that expression will choose that.
A Parameter is just a special Task that takes a value at runtime, but all Tasks have deferred execution, meaning that they evaluate to something later. The content of the Flow block is evaluated during the build time, so this expression will always return DateTimeParameter , which then evaluates to None later on.
That said, you need to defer the or also, and the way to do that is by using a Task. For example,
with Flow(name="test", schedule=schedule) as test_flow:
start_date = DateTimeParameter("start_date", required=False)
start_date_filled = task(lambda x: x or dt.date.today() - dt.timedelta(days=7))(start_date)
The task here is just the same as defining a task with the decorator above a function. This will defer execution to runtime.
Now there is also a problem when you do dt.date.today() in the Flow context because it is evaluated during build time. It will take the value of Flow registration unless you use a script-based storage (Github, S3 or local but stored as script). Again, deferring execution with a task will also help with that.Jacob Goldberg
08/16/2021, 10:24 PMdt.date.today() at buildtime not runtime. I will give this a shot