Jacob Goldberg
08/16/2021, 10:16 PMstart_date
is last Monday and the end_date
is this Monday. However I also want the ability to override the default date inputs (e.g. to backfill data). I thought the DateTimeParameter
would be perfect for this but I am having trouble getting it working. Here is a sample snippet:
import datetime as dt
from prefect import Flow
from prefect.core.parameter import DateTimeParameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
schedule = Schedule(clocks=[CronClock("0 9 * * 1")])
with Flow(name="test", schedule=schedule) as test_flow:
start_date = DateTimeParameter("start_date", required=False) or dt.date.today() - dt.timedelta(days=7)
end_date = DateTimeParameter("end_date", required=False) or dt.date.today()
my_process(start_date, end_date)
This issue is that my_process()
requires a datetime object but is receiving NoneType
. It seems the or
operator is not working as I would expect when defining start_date
and end_date
. Although DateTimeParameter
"evaluates" to None
in Prefect, the or
operator sees them as a Prefect Parameters, so start_date
and end_date
both get defined as None
. What is the proper way to structure the Flow so that it has default datetime objects that are run on a schedule, but I have the ability to override them via Prefect Cloud?Kevin Kho
a or b
will evaluate to a for whatever value of a that is not None
. So the DateTimeParameter
is still something and that expression will choose that.
A Parameter
is just a special Task
that takes a value at runtime, but all Tasks have deferred execution, meaning that they evaluate to something later. The content of the Flow
block is evaluated during the build time, so this expression will always return DateTimeParameter
, which then evaluates to None later on.
That said, you need to defer the or
also, and the way to do that is by using a Task
. For example
,
with Flow(name="test", schedule=schedule) as test_flow:
start_date = DateTimeParameter("start_date", required=False)
start_date_filled = task(lambda x: x or dt.date.today() - dt.timedelta(days=7))(start_date)
The task here is just the same as defining a task with the decorator above a function. This will defer execution to runtime.
Now there is also a problem when you do dt.date.today()
in the Flow context because it is evaluated during build time. It will take the value of Flow registration unless you use a script-based storage (Github, S3 or local but stored as script). Again, deferring execution with a task will also help with that.Jacob Goldberg
08/16/2021, 10:24 PMdt.date.today()
at buildtime not runtime. I will give this a shot