Jacob Goldberg

    Jacob Goldberg

    1 year ago
    Hello! I have a question about the DateTimeParameter. My goal is to have a flow run on a schedule, with dynamic date inputs, e.g. run every Monday where
    start_date
    is last Monday and the
    end_date
    is this Monday. However I also want the ability to override the default date inputs (e.g. to backfill data). I thought the
    DateTimeParameter
    would be perfect for this but I am having trouble getting it working. Here is a sample snippet:
    import datetime as dt
    from prefect import Flow
    from prefect.core.parameter import DateTimeParameter
    from prefect.schedules import Schedule
    from prefect.schedules.clocks import CronClock
    
    schedule = Schedule(clocks=[CronClock("0 9 * * 1")])
    with Flow(name="test", schedule=schedule) as test_flow:
      start_date = DateTimeParameter("start_date", required=False) or dt.date.today() - dt.timedelta(days=7)
      end_date = DateTimeParameter("end_date", required=False) or dt.date.today() 
    
    	 my_process(start_date, end_date)
    This issue is that
    my_process()
    requires a datetime object but is receiving
    NoneType
    . It seems the
    or
    operator is not working as I would expect when defining
    start_date
    and
    end_date
    . Although
    DateTimeParameter
    "evaluates" to
    None
    in Prefect, the
    or
    operator sees them as a Prefect Parameters, so
    start_date
    and
    end_date
    both get defined as
    None
    . What is the proper way to structure the Flow so that it has default datetime objects that are run on a schedule, but I have the ability to override them via Prefect Cloud?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Jacob Wilson, I think the issue here is that
    a or b
    will evaluate to a for whatever value of a that is not
    None
    . So the
    DateTimeParameter
    is still something and that expression will choose that. A
    Parameter
    is just a special
    Task
    that takes a value at runtime, but all Tasks have deferred execution, meaning that they evaluate to something later. The content of the
    Flow
    block is evaluated during the build time, so this expression will always return
    DateTimeParameter
    , which then evaluates to None later on. That said, you need to defer the
    or
    also, and the way to do that is by using a
    Task
    . For
    example
    ,
    with Flow(name="test", schedule=schedule) as test_flow:
      start_date = DateTimeParameter("start_date", required=False)
      start_date_filled = task(lambda x: x or dt.date.today() - dt.timedelta(days=7))(start_date)
    The task here is just the same as defining a task with the decorator above a function. This will defer execution to runtime. Now there is also a problem when you do
    dt.date.today()
    in the Flow context because it is evaluated during build time. It will take the value of Flow registration unless you use a script-based storage (Github, S3 or local but stored as script). Again, deferring execution with a task will also help with that.
    Jacob Goldberg

    Jacob Goldberg

    1 year ago
    @Kevin Kho many thanks for guidance, and for catching the issue with defining
    dt.date.today()
    at buildtime not runtime. I will give this a shot