Marko Herkaliuk
10/18/2021, 8:43 AMstart_date = Parameter("start_date", default=pendulum.now("Europe/Kiev").add(days=-1).to_date_string())
start_date = Parameter("start_date", default=(date.today() - timedelta(1)).strftime("%Y-%m-%d"))
and usually everything worked as needed, ie during the execution of the script the value of the parameter was calculated.
However, I noticed that on Saturday, Sunday and Monday (today) all similar parameters were passed as 2021-10-14
, ie as if Flow was launched on Friday. Last weekend this was not the case (as in general before)Marko Herkaliuk
10/18/2021, 9:13 AMAnna Geller
Marko Herkaliuk
10/18/2021, 9:14 AMMarko Herkaliuk
10/18/2021, 9:15 AMAnna Geller
from prefect import Flow, Parameter, task
import pendulum
@task(log_stdout=True)
def hello_world(user_input: str):
if user_input is None:
default_val = pendulum.now("Europe/Kiev").add(days=-1).to_date_string()
print(f"Hello {default_val}")
else:
print(f"hello {user_input}")
with Flow("test-flow") as flow:
param = Parameter("user_input", default=None)
hw = hello_world(param)
if __name__ == '__main__':
flow.run()
# flow.run(parameters=dict(user_input="2021-10-17"))
Marko Herkaliuk
10/18/2021, 9:22 AMMarko Herkaliuk
10/18/2021, 9:24 AMAnna Geller
Anna Geller
Anna Geller
Marko Herkaliuk
10/18/2021, 9:47 AMprefect:0.15.6-python3.8
We create a schedule through UI, when you do that you can set parameters, but we didn’t do that. That’s what I meant. About wrapper for the parameters - probably we make this.Kevin Kho
Marko Herkaliuk
10/18/2021, 2:26 PMKevin Kho
Kevin Kho
Marko Herkaliuk
10/18/2021, 3:10 PMMarko Jamedzija
11/16/2021, 12:23 PM0.15.5
to 0.15.9
Marko Jamedzija
11/16/2021, 12:25 PMschedule
object and were working ok until this. Now they have values from the registration. We use module
storageAnna Geller
Marko Jamedzija
11/16/2021, 12:49 PMMarko Jamedzija
11/16/2021, 1:02 PMschedule
. So it was like this:
run_datetime_hour = (schedule.next(1)[0] - timedelta(hours=2)).strftime("%Y-%m-%dT%H:00:00")
source_start_datetime = Parameter(name="source_start_datetime", default=run_datetime_hour)
Now with this change it will be like this:
@task
def get_run_date_time_hour(param_input: str, schedule: Schedule) -> str:
"""
Returns the previous hour using the schedule if the param_input is not None.
"""
if param_input:
return param_input
elif schedule:
return (schedule.next(1)[0] - timedelta(hours=2)).strftime("%Y-%m-%dT%H:00:00")
else:
raise Exception("Schedule is needed to generate the default value!")
source_start_datetime_param = Parameter(name="source_start_datetime", default=None)
source_start_datetime = get_run_date_time_hour(param_input=source_start_datetime_param, schedule=schedule)
I tested it out locally and it works. Do you think it will work when deployed on prefect server as well, @Anna Geller? Thanks!Anna Geller
schedule.next(1)[0]
, I think it may be better to retrieve this information from the context:
import prefect
prefect.context["scheduled_start_time"]
This way, you don’t rely on the schedule object that is valid only at registration time, but instead you leverage the runtime specific value of the scheduled_start_time
.Marko Jamedzija
11/16/2021, 1:44 PMMarko Jamedzija
11/16/2021, 2:36 PM