Hi there :wave: I’m playing with control flow logi...
# ask-community
a
Hi there 👋 I’m playing with control flow logic using
case
and seeing something unexpected and I’m wondering if it is expected to you or if it is a bug. Equally if I’m doing something weird and there’s a better way, please let me know!
I’ve written an example below to demo my scenario. I have a task (
Task1
in demo code) that is upstream of two other tasks (
Task2
and
Task3
).
Task2
only runs on a condition.
Task3
runs, even if that condition fails (
skip_on_upstream_skip=False
). When the condition fails,
Task1
is skipped, such that its value is
None
, breaking the input to
Task3
. I can fix this by explicitly setting
Task1
as downstream of
Task3
(see commented code line) but thought I’d check if this was expected behaviour?
Copy code
from prefect import Flow, Task, case
from prefect.engine.executors import LocalDaskExecutor


class MyCondition(Task):
    def run(self):
        return False


class Task1(Task):
    def run(self):
        print("task 1 run")
        return "123"


class Task2(Task):
    def run(self, task_1_output: str):
        print(f"task 1 output: {task_1_output}")
        print("task 2 run")


class Task3(Task):
    def run(self, task_1_output: str):
        print(f"task 1 output: {task_1_output}")
        print("task 3 run")


executor = LocalDaskExecutor()

with Flow("test_flow") as flow:
    my_condition = MyCondition()
    task1 = Task1()
    task2 = Task2()
    task3 = Task3(skip_on_upstream_skip=False)

    # task1.set_downstream(task3)

    with case(my_condition, True):
        task2.set_upstream(task1, key="task_1_output")

    task3.set_upstream(task1, key="task_1_output")

flow.run(executor=executor)
z
Hi! You're defining your tasks as classes instead of using the
@task
decorator so when you call
Task1()
you are creating an instance of your task class but you are not calling it with any arguments. Using
set_upstream
to create edges between tasks isn't best practice and I'm actually not sure how it'll behave. I'd recommend doing something like this
Copy code
# Create instances of tasks
my_condition = MyCondition()
task1 = Task1()
task2 = Task2()
task3 = Task3(skip_on_upstream_skip=False)


with Flow("test_flow") as flow:
    result1 = task1()

    with case(my_condition(), True):
        task2(result1)

    task3(result1)
upvote 3
k
Hey @Andy Waugh, I am a bit confused here but I think the weirdness might not be coming from the
case
. So when you use a task class, the first parenthesis in the init and the second is the run.
Michael beat me to it
z
😄 3 of us looking at this in parallel.
😆 1
k
You can also do:
Copy code
with Flow("test_flow") as flow:
    my_condition = MyCondition(init_stuff)(run_stuff)
    task1 = Task1(init_stuff)(run_stuff)
    task2 = Task2(init_stuff)(run_stuff)
a
ah thanks all 🙏 I will look at these suggestions!