Tomas Borrella
06/20/2022, 12:29 PMupstream_tasks
) and at the end of all of them, I would like to execute a last flow only if it is a specific day of the week (for example execute it the last of the flows only on Mondays). I have tried the following code, but the condition is always false
.... what is the good way to do this?
with Flow('prefect_parent', run_config=config.RUN_CONFIG) as prefect_p_flow:
a = flow_a()
b = flow_b(upstream_tasks=[a])
c = flow_c(upstream_tasks=[b])
if date.today().weekday() == 0: # Monday
d = flow_d(upstream_tasks=[c])
prefect_p_flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
NOTE: I have been looking at Schedules
, but I donโt want the task to run at a specific point in time (because previous flows may take more or less), I would like it to run at the end of the previous task, but only if the day condition is met.
Any suggestions are welcomeSylvain Hazard
06/20/2022, 12:34 PMflow_d
if registered on a monday and not otherwise.
Off the top of my head, something like this would likely work :
@task
def is_monday():
return date.today().weekday() == 0
with Flow() as flow:
a = flow_a()
...
is_it_monday = is_monday()
with case(is_it_monday, True):
d = flow_d(upstream_tasks=[c])
I'm unsure the syntax is completely right but I believe this idea will work as you want it to.
More info on conditional logic in Prefect here.Sylvain Hazard
06/20/2022, 12:36 PMwith Flow()
block and that isn't a Task itself will be evaluated only once and only at registration time. Maybe people with more experience than me will be able to give you exceptions though.Anna Geller
Tomas Borrella
06/20/2022, 12:57 PM@task
decorator.