Ben Sack
07/22/2021, 5:06 PMwith *Flow*("dummy_flow") as flow:
process_date = *Parameter*(name="process_date", default=*date_minus_seven*())
The issue with this is the process_date parameter is set at flow registration, but isn’t updated each time the flow runs. For example, if I registered the flow today, the process_date parameter would be set as 2021-07-15, but when the flow runs tomorrow, the process_date would still be set to 2021-07-15.Kevin Kho
default=datetime.datetime.now()
The recommended approach is to have a Parameter with a value like “today”, and then calculate that in a task. This defers the execution during run time.Ben Sack
07/22/2021, 5:37 PMKevin Kho
Sam Cook
07/22/2021, 6:07 PMBen Sack
07/22/2021, 6:38 PMSam Cook
07/22/2021, 7:52 PMimport functools
import inspect
from prefect import task, Flow
def argspec_factory(wrapped):
sig = inspect.signature(wrapped)
new_params = []
for name, parameter in sig.parameters.items():
# Perform any filter, updates, etc. as needed
# to function parameters here. If you use annotation
# they need to be converted to strings here.
...
new_params.append(parameter)
sig = sig.replace(parameters=new_params)
def wrap(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
kwargs['mytime'] = None if kwargs.get('mytime', None) is None else resolve_time(mytime)
return func(*args, **kwargs)
wrapper.__signature__ = argspec_factory(func)
return wrapper
@task
@wrap
def task_func(mytime=None):
# do work
...
with Flow("name"):
# Create flow as normal
Kevin Kho
@wrap
instead of @wrapper
around the function?Ben Sack
07/22/2021, 8:10 PMSam Cook
07/22/2021, 8:41 PM