Carl
03/10/2021, 4:20 AMwith Flow('MyFlow') as flow:
do_thing = Parameter('do_think', default=False, required=False)
data = get_data()
with case(do_thing, True):
data = do_thing_task(data)
data = do_another_thing(data) # This gets skipped when do_thing = False
Chris White
data
so you are probably creating a dependency graph that you don’t intend to create; you should use new variable names with each task call:
with Flow('MyFlow') as flow:
do_thing = Parameter('do_think', default=False, required=False)
data = get_data()
with case(do_thing, True):
true_data = do_thing_task(data)
other_data = do_another_thing(data)
Carl
03/10/2021, 4:26 AMtrue_data
to do_another_thing()
when the condition is True?Chris White
other_data = do_another_thing(true_data)
would do the trick, but in that case do_another-thing
will skip if the condition is False
, which is expected but sounds like not what you want?Carl
03/10/2021, 4:30 AMwith case(do_thing, False):
true_data = data
but that feels pretty messyChris White
skip_on_upstream_skip=False
flag when you create the taskCarl
03/10/2021, 8:47 AMJim Crist-Harif
03/10/2021, 2:35 PMmerge
, which can be used to merge two branches of a flow back into one (the first executed task passed to merge is its result).
from prefect.tasks.control_flow import merge
with Flow('MyFlow') as flow:
do_thing = Parameter('do_think', default=False, required=False)
data = get_data()
with case(do_thing, True):
data2 = do_thing_task(data)
# `data2` if `data2` wasn't skipped, otherwise `data`
data = merge(data2, data)
other_data = do_another_thing(data)