Daniel Lomartra
06/16/2022, 4:47 PM@task(name="Skipped Task Branch")
def skipped_task_branch(skip_block_name):
logger = context.get("logger")
<http://logger.info|logger.info>(f"Sucessfully skipped {skip_block_name}")
raise signals.SUCCESS
Flow:
with Flow(
name="example flow"
) as flow:
myCondition = Parameter(name = "Run Tasks?", default = True)
with case(myCondition, True):
task1 = do_some_stuff()
task2 = do_some_other_stuff()
with case(myCondition, False):
skip = skipped_task_branch("task block name")
example_merge = merge(task2, skip)
Kevin Kho
06/16/2022, 5:04 PMDaniel Lomartra
06/16/2022, 5:54 PMKevin Kho
06/16/2022, 5:55 PMfrom prefect.triggers import always_run
@task(trigger=always_run)
def abc():
....
I think merge
just has this triggerDaniel Lomartra
06/16/2022, 6:07 PMwith Flow(
name="example flow"
) as flow:
myCondition1 = Parameter(name = "Run Task Group 1?", default = True)
with case(myCondition, True):
task1 = do_some_stuff()
task2 = do_some_other_stuff()
with case(myCondition, False):
skip = skipped_task_branch("Task Group 1")
example_merge = merge(task2, skip)
myCondition2 = Parameter(name = "Run Task Group 2?", default = True)
with case(myCondition2, True):
task3 = do_some_stuff()
task4 = do_some_other_stuff()
with case(myCondition, False):
skip = skipped_task_branch("Task Group 2")
example_merge2 = merge(task2, skip)
I have tried setting dependencies between these branches in a bunch of ways but they always seem to get executed in parallel. It seems like it has something to do with parameters always being evaluated first. The only way I can get this to work is by using dummy tasks at the top of each conditional that takes the value of the parameter as an argument and returns it and setting dependencies against those.Kevin Kho
06/16/2022, 6:19 PMmyCondition2 = Parameter(name = "Run Task Group 2?", default = True)
myCondition2(upstream_tasks=[example_merge])
with case(myCondition2, True):
...
Daniel Lomartra
06/16/2022, 6:39 PMTypeError: __call__() got an unexpected keyword argument 'upstream_tasks'
Kevin Kho
06/16/2022, 6:40 PMcase
statementDaniel Lomartra
06/16/2022, 6:43 PMKevin Kho
06/16/2022, 6:46 PMSkipped
is terminal anyway so it will move on to downstream but you are right to be concerned because SKIP propagates so it will skip that task alsoDaniel Lomartra
06/16/2022, 6:52 PMKevin Kho
06/16/2022, 6:54 PM@task(skip_on_upstream_skip=False)
?
But then the issue here is that the case
uses SKIP as the mechanism so it won’t be respectedDaniel Lomartra
06/16/2022, 6:57 PM@task(skip_on_upstream_skip=False,trigger=always_run)
Kevin Kho
06/16/2022, 7:01 PMalways_run
will never SKIP I think so you don’t need skip_on_upstream_skip
, but then your case
won’t be respected if it skipsDaniel Lomartra
06/16/2022, 7:39 PM@task(name="Skipped Task Branch")
def skipped_task_branch(skip_block_name):
logger = context.get("logger")
<http://logger.info|logger.info>(f"Sucessfully skipped {skip_block_name}")
last_task_from_true_branch.state == SUCCESS:
raise signals.SUCCESS
else:
raise singals.FAIL
Kevin Kho
06/16/2022, 7:44 PMDaniel Lomartra
06/16/2022, 7:50 PM@task(name="Start Task Branch")
def start_task_branch(parameter):
return parameter == True
@task(name="Skip Task Branch")
def skip_task_branch():
raise signals.SUCCESS
with Flow(
name="example flow"
) as flow:
param_branch_1 = Parameter(name = "Run Task Group 1?", default = True)
branch_1_start = start_task_branch(param_branch_1)
with case(branch_1_start, True):
task1 = do_some_stuff()
task2 = do_some_other_stuff()
with case(branch_1_start, False):
skip = skip_task_branch("Task Group 1")
branch_1_end = merge(task2, skip)
param_branch_2 = Parameter(name = "Run Task Group 2?", default = True)
branch_2_start = start_task_branch(param_branch_2)
with case(branch_2_start, True):
task3 = do_some_stuff()
task4 = do_some_other_stuff()
with case(branch_2_start, False):
skip = skip_task_branch("Task Group 2")
branch_2_end = merge(task4, skip)
branch_2_start.set_upstream(branch_1_end)
@task(name="End Task Branch",trigger= always_run)
def end_task_branch():
raise signals.SUCCESS
with Flow(
name="example flow"
) as flow:
param_branch_1 = Parameter(name = "Run Task Group 1?", default = True)
with case(param_branch_1, True):
task1 = do_some_stuff()
task2 = do_some_other_stuff()
branch_1_end = end_task_branch()
branch_1_end.set_upstream(task2)
param_branch_2 = Parameter(name = "Run Task Group 2?", default = True)
with case(param_branch_2, True):
task3 = do_some_stuff()
task4 = do_some_other_stuff()
branch_2_end = end_task_branch()
branch_2_end.set_upstream(task4)
task3.set_upstream(branch_1_end)
Kevin Kho
06/16/2022, 8:05 PMDaniel Lomartra
06/16/2022, 8:06 PMKevin Kho
06/16/2022, 8:07 PMtask2
should fail branch_1_end
and will fail task_3
Daniel Lomartra
06/16/2022, 8:07 PMKevin Kho
06/16/2022, 8:08 PMDaniel Lomartra
06/16/2022, 8:08 PMskip_on_upstream_skip=False
?Kevin Kho
06/16/2022, 8:11 PMfrom prefect import Flow, task
from prefect.triggers import always_run
import prefect
from prefect.engine.signals import FAIL
@task
def abc(x):
if x == 2:
raise ValueError()
return x
@task(trigger=always_run)
def bcd(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(x)
if isinstance(x, ValueError):
raise FAIL
return x
with Flow("..") as flow:
a = abc(2)
bcd(a)
flow.run()
Daniel Lomartra
06/16/2022, 8:37 PM