https://prefect.io logo
j

Jacob Goldberg

08/16/2021, 10:16 PM
Hello! I have a question about the DateTimeParameter. My goal is to have a flow run on a schedule, with dynamic date inputs, e.g. run every Monday where
start_date
is last Monday and the
end_date
is this Monday. However I also want the ability to override the default date inputs (e.g. to backfill data). I thought the
DateTimeParameter
would be perfect for this but I am having trouble getting it working. Here is a sample snippet:
Copy code
import datetime as dt
from prefect import Flow
from prefect.core.parameter import DateTimeParameter
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock

schedule = Schedule(clocks=[CronClock("0 9 * * 1")])
with Flow(name="test", schedule=schedule) as test_flow:
  start_date = DateTimeParameter("start_date", required=False) or dt.date.today() - dt.timedelta(days=7)
  end_date = DateTimeParameter("end_date", required=False) or dt.date.today() 

	 my_process(start_date, end_date)
This issue is that
my_process()
requires a datetime object but is receiving
NoneType
. It seems the
or
operator is not working as I would expect when defining
start_date
and
end_date
. Although
DateTimeParameter
"evaluates" to
None
in Prefect, the
or
operator sees them as a Prefect Parameters, so
start_date
and
end_date
both get defined as
None
. What is the proper way to structure the Flow so that it has default datetime objects that are run on a schedule, but I have the ability to override them via Prefect Cloud?
k

Kevin Kho

08/16/2021, 10:22 PM
Hey @Jacob Wilson, I think the issue here is that
a or b
will evaluate to a for whatever value of a that is not
None
. So the
DateTimeParameter
is still something and that expression will choose that. A
Parameter
is just a special
Task
that takes a value at runtime, but all Tasks have deferred execution, meaning that they evaluate to something later. The content of the
Flow
block is evaluated during the build time, so this expression will always return
DateTimeParameter
, which then evaluates to None later on. That said, you need to defer the
or
also, and the way to do that is by using a
Task
. For
example
,
Copy code
with Flow(name="test", schedule=schedule) as test_flow:
  start_date = DateTimeParameter("start_date", required=False)
  start_date_filled = task(lambda x: x or dt.date.today() - dt.timedelta(days=7))(start_date)
The task here is just the same as defining a task with the decorator above a function. This will defer execution to runtime. Now there is also a problem when you do
dt.date.today()
in the Flow context because it is evaluated during build time. It will take the value of Flow registration unless you use a script-based storage (Github, S3 or local but stored as script). Again, deferring execution with a task will also help with that.
j

Jacob Goldberg

08/16/2021, 10:24 PM
@Kevin Kho many thanks for guidance, and for catching the issue with defining
dt.date.today()
at buildtime not runtime. I will give this a shot
👍 1