Kyle McChesney
09/06/2022, 3:11 PM@task
def required():
return [
1,
2,
3,
]
@task
def optional_case():
return True
@task
def optional(data):
data.append(4)
return data
@task
def report_case():
return False
@task(skip_on_upstream_skip=False)
def report():
print('all done!')
with Flow(
'test',
executor=LocalDaskExecutor(),
) as flow:
data = required()
with case(optional_case, True):
opt_data = optional(data)
with case(report_case, True):
report(upstream_tasks=(data, opt_data,))
The idea is there there are 2 variables, summarized as:
• variable 1: should optional data processing be run
• variable 2: should reporting be run at the end (note that reporting does not take data as an explicit input)
I am trying to achieve this in prefect one using case
statements. The issue is that I need to set skip_on_upstream_skip
on report, so that it runs even if optional data processing is not run. I just want to ensure that report is run if its case is True, otherwise it is not run, but it must only be run AFTER data
and optional
have runskip_on_upstream_skip
seems to also apply to the case
which I suppose makes some sense, but I need a way around this.trigger
can be used to customize the behavior fully, but skip_on_upstream_skip
must still be provided in order for the trigger callable to be calledAnna Geller
09/10/2022, 12:23 AMmerge
task is what you're looking for, an example:
from random import random
from prefect import task, Flow, case
from prefect.tasks.control_flow import merge
...
with Flow("conditional-branches-with-merge") as flow:
cond = check_condition()
with case(cond, True):
val1 = action_if_true()
with case(cond, False):
val2 = action_if_false()
val = merge(val1, val2)
another_action(val)
only once you merge, you can use triggers for downstream tasks and taking action on that basis
this is easier in 2.0 without the DAG overhead so a good motivation to migrate this flow to 2.0 :)