https://prefect.io logo
#prefect-community
Title
# prefect-community
k

Kyle McChesney

09/06/2022, 3:11 PM
okay, here is an interesting question, I have the following example flow
Copy code
@task
def required():
    return [
        1,
        2,
        3,
    ]


@task
def optional_case():
    return True


@task
def optional(data):
    data.append(4)
    return data


@task
def report_case():
    return False


@task(skip_on_upstream_skip=False)
def report():
    print('all done!')


with Flow(
    'test',
    executor=LocalDaskExecutor(),
) as flow:

    data = required()

    with case(optional_case, True):
        opt_data = optional(data)

    with case(report_case, True):
        report(upstream_tasks=(data, opt_data,))
The idea is there there are 2 variables, summarized as: • variable 1: should optional data processing be run • variable 2: should reporting be run at the end (note that reporting does not take data as an explicit input) I am trying to achieve this in prefect one using
case
statements. The issue is that I need to set
skip_on_upstream_skip
on report, so that it runs even if optional data processing is not run. I just want to ensure that report is run if its case is True, otherwise it is not run, but it must only be run AFTER
data
and
optional
have run
1️⃣ 1
1
skip_on_upstream_skip
seems to also apply to the
case
which I suppose makes some sense, but I need a way around this.
seems like
trigger
can be used to customize the behavior fully, but
skip_on_upstream_skip
must still be provided in order for the trigger callable to be called
a

Anna Geller

09/10/2022, 12:23 AM
@Kyle McChesney checking a bit late but a
merge
task is what you're looking for, an example:
Copy code
from random import random
from prefect import task, Flow, case
from prefect.tasks.control_flow import merge

...


with Flow("conditional-branches-with-merge") as flow:
    cond = check_condition()

    with case(cond, True):
        val1 = action_if_true()

    with case(cond, False):
        val2 = action_if_false()

    val = merge(val1, val2)

    another_action(val)
only once you merge, you can use triggers for downstream tasks and taking action on that basis this is easier in 2.0 without the DAG overhead so a good motivation to migrate this flow to 2.0 :)
also: explicit is better than implicit - better write exactly what you expect to happen than relying on maybe skipping some tasks or maybe not