Daniel Lomartra

    Daniel Lomartra

    3 months ago
    Hi all, I am trying to create a flow that can skip a block of tasks when a parameter is set to False. This is similar to the merge example in the docs on branching (Using conditional logic in a Flow ) except I have no need for the “case(False)” branch. Currently, I have a dummy task that just signals success so that I have something to merge. This feels pretty janky. Is there a cleaner approach? Task:
    @task(name="Skipped Task Branch")
    def skipped_task_branch(skip_block_name):
        logger = context.get("logger")
        <http://logger.info|logger.info>(f"Sucessfully skipped {skip_block_name}")
        raise signals.SUCCESS
    Flow:
    with Flow(
        name="example flow"
    ) as flow:
    myCondition = Parameter(name = "Run Tasks?", default = True)
    with case(myCondition, True):
        task1 = do_some_stuff()
        task2 = do_some_other_stuff()
    with case(myCondition, False):
        skip = skipped_task_branch("task block name")
    example_merge = merge(task2, skip)
    Kevin Kho

    Kevin Kho

    3 months ago
    I think you can just remove merge and the False branch if they do nothing. The one case alone should be fine. This merge is there to get the output of whichever task executed in the two branches.
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    I think I need the merge because I have downstream tasks that I want to run after this conditional block regardless of whether or not it is skipped. Is there another way to do that?
    Kevin Kho

    Kevin Kho

    3 months ago
    Yep
    from prefect.triggers import always_run
    @task(trigger=always_run)
    def abc():
        ....
    I think
    merge
    just has this trigger
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    Awesome, thank you! As a follow up question to this: I have three of these conditional blocks whose parent tasks are all parameters, what is the best way to set one upstream from the another? As an example:
    with Flow(
        name="example flow"
    ) as flow:
        myCondition1 = Parameter(name = "Run Task Group 1?", default = True)
        with case(myCondition, True):
            task1 = do_some_stuff()
            task2 = do_some_other_stuff()
        with case(myCondition, False):
            skip = skipped_task_branch("Task Group 1")
        example_merge = merge(task2, skip)
        myCondition2 = Parameter(name = "Run Task Group 2?", default = True)
        with case(myCondition2, True):
            task3 = do_some_stuff()
            task4 = do_some_other_stuff()
        with case(myCondition, False):
            skip = skipped_task_branch("Task Group 2")
        example_merge2 = merge(task2, skip)
    I have tried setting dependencies between these branches in a bunch of ways but they always seem to get executed in parallel. It seems like it has something to do with parameters always being evaluated first. The only way I can get this to work is by using dummy tasks at the top of each conditional that takes the value of the parameter as an argument and returns it and setting dependencies against those.
    Kevin Kho

    Kevin Kho

    3 months ago
    I think you can try:
    myCondition2 = Parameter(name = "Run Task Group 2?", default = True)
        myCondition2(upstream_tasks=[example_merge])
        with case(myCondition2, True):
             ...
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    That returned the following error when trying to register:
    TypeError: __call__() got an unexpected keyword argument 'upstream_tasks'
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah I guess you can’t do that then cuz Parameters are not exactly the same as normal tasks. You can set an upstream on the task inside the case though to something in the previous
    case
    statement
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    Am I correct in saying that the upstream needs to be set against the merge from the previous case or else it won't run unless the upstream conditional isnt skipped?
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah good question. Not exactly. So
    Skipped
    is terminal anyway so it will move on to downstream but you are right to be concerned because SKIP propagates so it will skip that task also
    I suppose your previous approach might be the clearest where you have a dummy task
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    Is there no trigger that will always run on skips but still fail if upstream task fails?
    Kevin Kho

    Kevin Kho

    3 months ago
    Maybe default trigger but use
    @task(skip_on_upstream_skip=False)
    ? But then the issue here is that the
    case
    uses SKIP as the mechanism so it won’t be respected
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    This doesn't work?
    @task(skip_on_upstream_skip=False,trigger=always_run)
    Kevin Kho

    Kevin Kho

    3 months ago
    always_run
    will never SKIP I think so you don’t need
    skip_on_upstream_skip
    , but then your
    case
    won’t be respected if it skips
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    Got it. I think I have a partial solution then. I can remove the False branches, merge tasks, and starting dummy task from each case and instead just have a dummy task at the end of each case with always_run trigger and set the dummy task as an upstream of the first task in the next case's True branch. I confirmed that this works and it cleans up the UI quite a bit. However, I think this means that if the upstream case fails then the downstream case will still run. Is there a way to pass state between tasks so that my dummy task can do something like:
    @task(name="Skipped Task Branch")
    def skipped_task_branch(skip_block_name):
        logger = context.get("logger")
        <http://logger.info|logger.info>(f"Sucessfully skipped {skip_block_name}")
        last_task_from_true_branch.state == SUCCESS:
            raise signals.SUCCESS
        else:
            raise singals.FAIL
    Kevin Kho

    Kevin Kho

    3 months ago
    I am a bit lost haha. Could you show me the latest Flow code you have?
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    haha sure. sorry, will rewrite using the example code.
    Okay so here is a full version of what I was doing originally:
    @task(name="Start Task Branch")
    def start_task_branch(parameter):
        return parameter == True
    
    @task(name="Skip Task Branch")
    def skip_task_branch():
        raise signals.SUCCESS
    
    
    with Flow(
        name="example flow"
    ) as flow:
        param_branch_1 = Parameter(name = "Run Task Group 1?", default = True)
        branch_1_start = start_task_branch(param_branch_1)
        with case(branch_1_start, True):
            task1 = do_some_stuff()
            task2 = do_some_other_stuff()
        with case(branch_1_start, False):
            skip = skip_task_branch("Task Group 1")
        branch_1_end = merge(task2, skip)
    
        param_branch_2 = Parameter(name = "Run Task Group 2?", default = True)
        branch_2_start = start_task_branch(param_branch_2)
        with case(branch_2_start, True):
            task3 = do_some_stuff()
            task4 = do_some_other_stuff()
        with case(branch_2_start, False):
            skip = skip_task_branch("Task Group 2")
        branch_2_end = merge(task4, skip)
    
        branch_2_start.set_upstream(branch_1_end)
    My new partial solution looks like this:
    @task(name="End Task Branch",trigger= always_run)
    def end_task_branch():
        raise signals.SUCCESS
    
    with Flow(
        name="example flow"
    ) as flow:
        param_branch_1 = Parameter(name = "Run Task Group 1?", default = True)
        with case(param_branch_1, True):
            task1 = do_some_stuff()
            task2 = do_some_other_stuff()
        branch_1_end = end_task_branch()
    
        branch_1_end.set_upstream(task2)
    
        param_branch_2 = Parameter(name = "Run Task Group 2?", default = True)
        with case(param_branch_2, True):
            task3 = do_some_stuff()
            task4 = do_some_other_stuff()
        branch_2_end = end_task_branch()
    
        branch_2_end.set_upstream(task4)
    
        task3.set_upstream(branch_1_end)
    Kevin Kho

    Kevin Kho

    3 months ago
    And we want to guarantee the second case runs after the first, while keeping the case functionality right?
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    Yes, and my solution does that sucessfully.
    But I am worried that if task1 or task 2 fails, the second case block will still run because the dummy task (branch_1_end) is set to always run. I would prefer it to fail.
    Kevin Kho

    Kevin Kho

    3 months ago
    Yeah I think this will work right? Are you good now? A fail in
    task2
    should fail
    branch_1_end
    and will fail
    task_3
    Ahh
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    So I was searching for a way to read the state of task2 from the dummy task and return a FAIL signal if task2 fails.
    Kevin Kho

    Kevin Kho

    3 months ago
    Ok I think I have a solution one sec
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    oh instead of setting it to always run should I just set it to
    skip_on_upstream_skip=False
    ?
    Kevin Kho

    Kevin Kho

    3 months ago
    That might work yep! That’s sounds more elegant than my solution but will just leave my solution here:
    from prefect import Flow, task 
    from prefect.triggers import always_run
    import prefect
    from prefect.engine.signals import FAIL
    
    @task
    def abc(x):
        if x == 2:
            raise ValueError()
        return x 
    
    @task(trigger=always_run)
    def bcd(x):
        <http://prefect.context.logger.info|prefect.context.logger.info>(x)
        if isinstance(x, ValueError):
            raise FAIL
        return x 
    
    with Flow("..") as flow:
        a = abc(2)
        bcd(a)
    
    flow.run()
    Daniel Lomartra

    Daniel Lomartra

    3 months ago
    Nice, will try both. Thanks, Kevin. You're the best.