https://prefect.io logo
Title
d

Daniel Lomartra

06/16/2022, 4:47 PM
Hi all, I am trying to create a flow that can skip a block of tasks when a parameter is set to False. This is similar to the merge example in the docs on branching (Using conditional logic in a Flow | Prefect Docs) except I have no need for the “case(False)” branch. Currently, I have a dummy task that just signals success so that I have something to merge. This feels pretty janky. Is there a cleaner approach? Task:
@task(name="Skipped Task Branch")
def skipped_task_branch(skip_block_name):
    logger = context.get("logger")
    <http://logger.info|logger.info>(f"Sucessfully skipped {skip_block_name}")
    raise signals.SUCCESS
Flow:
with Flow(
    name="example flow"
) as flow:
myCondition = Parameter(name = "Run Tasks?", default = True)
with case(myCondition, True):
    task1 = do_some_stuff()
    task2 = do_some_other_stuff()
with case(myCondition, False):
    skip = skipped_task_branch("task block name")
example_merge = merge(task2, skip)
k

Kevin Kho

06/16/2022, 5:04 PM
I think you can just remove merge and the False branch if they do nothing. The one case alone should be fine. This merge is there to get the output of whichever task executed in the two branches.
d

Daniel Lomartra

06/16/2022, 5:54 PM
I think I need the merge because I have downstream tasks that I want to run after this conditional block regardless of whether or not it is skipped. Is there another way to do that?
k

Kevin Kho

06/16/2022, 5:55 PM
Yep
from prefect.triggers import always_run
@task(trigger=always_run)
def abc():
    ....
I think
merge
just has this trigger
d

Daniel Lomartra

06/16/2022, 6:07 PM
Awesome, thank you! As a follow up question to this: I have three of these conditional blocks whose parent tasks are all parameters, what is the best way to set one upstream from the another? As an example:
with Flow(
    name="example flow"
) as flow:
    myCondition1 = Parameter(name = "Run Task Group 1?", default = True)
    with case(myCondition, True):
        task1 = do_some_stuff()
        task2 = do_some_other_stuff()
    with case(myCondition, False):
        skip = skipped_task_branch("Task Group 1")
    example_merge = merge(task2, skip)
    myCondition2 = Parameter(name = "Run Task Group 2?", default = True)
    with case(myCondition2, True):
        task3 = do_some_stuff()
        task4 = do_some_other_stuff()
    with case(myCondition, False):
        skip = skipped_task_branch("Task Group 2")
    example_merge2 = merge(task2, skip)
I have tried setting dependencies between these branches in a bunch of ways but they always seem to get executed in parallel. It seems like it has something to do with parameters always being evaluated first. The only way I can get this to work is by using dummy tasks at the top of each conditional that takes the value of the parameter as an argument and returns it and setting dependencies against those.
k

Kevin Kho

06/16/2022, 6:19 PM
I think you can try:
myCondition2 = Parameter(name = "Run Task Group 2?", default = True)
    myCondition2(upstream_tasks=[example_merge])
    with case(myCondition2, True):
         ...
d

Daniel Lomartra

06/16/2022, 6:39 PM
That returned the following error when trying to register:
TypeError: __call__() got an unexpected keyword argument 'upstream_tasks'
k

Kevin Kho

06/16/2022, 6:40 PM
Ah I guess you can’t do that then cuz Parameters are not exactly the same as normal tasks. You can set an upstream on the task inside the case though to something in the previous
case
statement
d

Daniel Lomartra

06/16/2022, 6:43 PM
Am I correct in saying that the upstream needs to be set against the merge from the previous case or else it won't run unless the upstream conditional isnt skipped?
k

Kevin Kho

06/16/2022, 6:46 PM
Ah good question. Not exactly. So
Skipped
is terminal anyway so it will move on to downstream but you are right to be concerned because SKIP propagates so it will skip that task also
I suppose your previous approach might be the clearest where you have a dummy task
d

Daniel Lomartra

06/16/2022, 6:52 PM
Is there no trigger that will always run on skips but still fail if upstream task fails?
k

Kevin Kho

06/16/2022, 6:54 PM
Maybe default trigger but use
@task(skip_on_upstream_skip=False)
? But then the issue here is that the
case
uses SKIP as the mechanism so it won’t be respected
d

Daniel Lomartra

06/16/2022, 6:57 PM
This doesn't work?
@task(skip_on_upstream_skip=False,trigger=always_run)
k

Kevin Kho

06/16/2022, 7:01 PM
always_run
will never SKIP I think so you don’t need
skip_on_upstream_skip
, but then your
case
won’t be respected if it skips
d

Daniel Lomartra

06/16/2022, 7:39 PM
Got it. I think I have a partial solution then. I can remove the False branches, merge tasks, and starting dummy task from each case and instead just have a dummy task at the end of each case with always_run trigger and set the dummy task as an upstream of the first task in the next case's True branch. I confirmed that this works and it cleans up the UI quite a bit. However, I think this means that if the upstream case fails then the downstream case will still run. Is there a way to pass state between tasks so that my dummy task can do something like:
@task(name="Skipped Task Branch")
def skipped_task_branch(skip_block_name):
    logger = context.get("logger")
    <http://logger.info|logger.info>(f"Sucessfully skipped {skip_block_name}")
    last_task_from_true_branch.state == SUCCESS:
        raise signals.SUCCESS
    else:
        raise singals.FAIL
k

Kevin Kho

06/16/2022, 7:44 PM
I am a bit lost haha. Could you show me the latest Flow code you have?
d

Daniel Lomartra

06/16/2022, 7:50 PM
haha sure. sorry, will rewrite using the example code.
Okay so here is a full version of what I was doing originally:
@task(name="Start Task Branch")
def start_task_branch(parameter):
    return parameter == True

@task(name="Skip Task Branch")
def skip_task_branch():
    raise signals.SUCCESS


with Flow(
    name="example flow"
) as flow:
    param_branch_1 = Parameter(name = "Run Task Group 1?", default = True)
    branch_1_start = start_task_branch(param_branch_1)
    with case(branch_1_start, True):
        task1 = do_some_stuff()
        task2 = do_some_other_stuff()
    with case(branch_1_start, False):
        skip = skip_task_branch("Task Group 1")
    branch_1_end = merge(task2, skip)

    param_branch_2 = Parameter(name = "Run Task Group 2?", default = True)
    branch_2_start = start_task_branch(param_branch_2)
    with case(branch_2_start, True):
        task3 = do_some_stuff()
        task4 = do_some_other_stuff()
    with case(branch_2_start, False):
        skip = skip_task_branch("Task Group 2")
    branch_2_end = merge(task4, skip)

    branch_2_start.set_upstream(branch_1_end)
My new partial solution looks like this:
@task(name="End Task Branch",trigger= always_run)
def end_task_branch():
    raise signals.SUCCESS

with Flow(
    name="example flow"
) as flow:
    param_branch_1 = Parameter(name = "Run Task Group 1?", default = True)
    with case(param_branch_1, True):
        task1 = do_some_stuff()
        task2 = do_some_other_stuff()
    branch_1_end = end_task_branch()

    branch_1_end.set_upstream(task2)

    param_branch_2 = Parameter(name = "Run Task Group 2?", default = True)
    with case(param_branch_2, True):
        task3 = do_some_stuff()
        task4 = do_some_other_stuff()
    branch_2_end = end_task_branch()

    branch_2_end.set_upstream(task4)

    task3.set_upstream(branch_1_end)
k

Kevin Kho

06/16/2022, 8:05 PM
And we want to guarantee the second case runs after the first, while keeping the case functionality right?
d

Daniel Lomartra

06/16/2022, 8:06 PM
Yes, and my solution does that sucessfully.
But I am worried that if task1 or task 2 fails, the second case block will still run because the dummy task (branch_1_end) is set to always run. I would prefer it to fail.
k

Kevin Kho

06/16/2022, 8:07 PM
Yeah I think this will work right? Are you good now? A fail in
task2
should fail
branch_1_end
and will fail
task_3
Ahh
d

Daniel Lomartra

06/16/2022, 8:07 PM
So I was searching for a way to read the state of task2 from the dummy task and return a FAIL signal if task2 fails.
k

Kevin Kho

06/16/2022, 8:08 PM
Ok I think I have a solution one sec
d

Daniel Lomartra

06/16/2022, 8:08 PM
oh instead of setting it to always run should I just set it to
skip_on_upstream_skip=False
?
k

Kevin Kho

06/16/2022, 8:11 PM
That might work yep! That’s sounds more elegant than my solution but will just leave my solution here:
from prefect import Flow, task 
from prefect.triggers import always_run
import prefect
from prefect.engine.signals import FAIL

@task
def abc(x):
    if x == 2:
        raise ValueError()
    return x 

@task(trigger=always_run)
def bcd(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    if isinstance(x, ValueError):
        raise FAIL
    return x 

with Flow("..") as flow:
    a = abc(2)
    bcd(a)

flow.run()
d

Daniel Lomartra

06/16/2022, 8:37 PM
Nice, will try both. Thanks, Kevin. You're the best.
👍 1