Andy Waugh
10/11/2021, 4:17 PMcase
and seeing something unexpected and I’m wondering if it is expected to you or if it is a bug. Equally if I’m doing something weird and there’s a better way, please let me know!Andy Waugh
10/11/2021, 4:17 PMTask1
in demo code) that is upstream of two other tasks (Task2
and Task3
). Task2
only runs on a condition. Task3
runs, even if that condition fails (skip_on_upstream_skip=False
).
When the condition fails, Task1
is skipped, such that its value is None
, breaking the input to Task3
. I can fix this by explicitly setting Task1
as downstream of Task3
(see commented code line) but thought I’d check if this was expected behaviour?
from prefect import Flow, Task, case
from prefect.engine.executors import LocalDaskExecutor
class MyCondition(Task):
def run(self):
return False
class Task1(Task):
def run(self):
print("task 1 run")
return "123"
class Task2(Task):
def run(self, task_1_output: str):
print(f"task 1 output: {task_1_output}")
print("task 2 run")
class Task3(Task):
def run(self, task_1_output: str):
print(f"task 1 output: {task_1_output}")
print("task 3 run")
executor = LocalDaskExecutor()
with Flow("test_flow") as flow:
my_condition = MyCondition()
task1 = Task1()
task2 = Task2()
task3 = Task3(skip_on_upstream_skip=False)
# task1.set_downstream(task3)
with case(my_condition, True):
task2.set_upstream(task1, key="task_1_output")
task3.set_upstream(task1, key="task_1_output")
flow.run(executor=executor)
Zanie
@task
decorator so when you call Task1()
you are creating an instance of your task class but you are not calling it with any arguments. Using set_upstream
to create edges between tasks isn't best practice and I'm actually not sure how it'll behave.
I'd recommend doing something like this
# Create instances of tasks
my_condition = MyCondition()
task1 = Task1()
task2 = Task2()
task3 = Task3(skip_on_upstream_skip=False)
with Flow("test_flow") as flow:
result1 = task1()
with case(my_condition(), True):
task2(result1)
task3(result1)
Kevin Kho
case
. So when you use a task class, the first parenthesis in the init and the second is the run.Zanie
Kevin Kho
with Flow("test_flow") as flow:
my_condition = MyCondition(init_stuff)(run_stuff)
task1 = Task1(init_stuff)(run_stuff)
task2 = Task2(init_stuff)(run_stuff)
Andy Waugh
10/11/2021, 4:46 PM