Tomas Borrella
06/20/2022, 12:29 PMupstream_tasks
) and at the end of all of them, I would like to execute a last flow only if it is a specific day of the week (for example execute it the last of the flows only on Mondays). I have tried the following code, but the condition is always false
.... what is the good way to do this?
with Flow('prefect_parent', run_config=config.RUN_CONFIG) as prefect_p_flow:
a = flow_a()
b = flow_b(upstream_tasks=[a])
c = flow_c(upstream_tasks=[b])
if date.today().weekday() == 0: # Monday
d = flow_d(upstream_tasks=[c])
prefect_p_flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=4)
NOTE: I have been looking at Schedules
, but I donโt want the task to run at a specific point in time (because previous flows may take more or less), I would like it to run at the end of the previous task, but only if the day condition is met.
Any suggestions are welcomeSylvain Hazard
06/20/2022, 12:34 PMflow_d
if registered on a monday and not otherwise.
Off the top of my head, something like this would likely work :
@task
def is_monday():
return date.today().weekday() == 0
with Flow() as flow:
a = flow_a()
...
is_it_monday = is_monday()
with case(is_it_monday, True):
d = flow_d(upstream_tasks=[c])
I'm unsure the syntax is completely right but I believe this idea will work as you want it to.
More info on conditional logic in Prefect here.with Flow()
block and that isn't a Task itself will be evaluated only once and only at registration time. Maybe people with more experience than me will be able to give you exceptions though.Anna Geller
06/20/2022, 12:37 PMTomas Borrella
06/20/2022, 12:57 PM@task
decorator.