Kyle McChesney
09/06/2022, 3:11 PM@task
def required():
return [
1,
2,
3,
]
@task
def optional_case():
return True
@task
def optional(data):
data.append(4)
return data
@task
def report_case():
return False
@task(skip_on_upstream_skip=False)
def report():
print('all done!')
with Flow(
'test',
executor=LocalDaskExecutor(),
) as flow:
data = required()
with case(optional_case, True):
opt_data = optional(data)
with case(report_case, True):
report(upstream_tasks=(data, opt_data,))
The idea is there there are 2 variables, summarized as:
• variable 1: should optional data processing be run
• variable 2: should reporting be run at the end (note that reporting does not take data as an explicit input)
I am trying to achieve this in prefect one using case
statements. The issue is that I need to set skip_on_upstream_skip
on report, so that it runs even if optional data processing is not run. I just want to ensure that report is run if its case is True, otherwise it is not run, but it must only be run AFTER data
and optional
have runKyle McChesney
09/06/2022, 3:12 PMskip_on_upstream_skip
seems to also apply to the case
which I suppose makes some sense, but I need a way around this.Kyle McChesney
09/06/2022, 6:06 PMtrigger
can be used to customize the behavior fully, but skip_on_upstream_skip
must still be provided in order for the trigger callable to be calledAnna Geller
merge
task is what you're looking for, an example:
from random import random
from prefect import task, Flow, case
from prefect.tasks.control_flow import merge
...
with Flow("conditional-branches-with-merge") as flow:
cond = check_condition()
with case(cond, True):
val1 = action_if_true()
with case(cond, False):
val2 = action_if_false()
val = merge(val1, val2)
another_action(val)
only once you merge, you can use triggers for downstream tasks and taking action on that basis
this is easier in 2.0 without the DAG overhead so a good motivation to migrate this flow to 2.0 :)Anna Geller