Adam Roderick
03/30/2022, 4:38 PMimport prefect
@prefect.task
def taskA():
return "output from A"
@prefect.task
def taskB(input):
return f"output from B ({input})"
@prefect.task
def taskC(input):
return f"output from C ({input})"
@prefect.task
def taskD(input1):
print(f"recieved input1: {input1}")
with prefect.Flow("test flow") as flow:
outputA = taskA()
outputB = taskB(outputA)
taskC(outputA)
taskD(outputA)
taskC.set_dependencies(downstream_tasks=[taskD])
flow.run()
taskC.set_dependencies(downstream_tasks=[taskD])
, the DAG isKevin Kho
03/30/2022, 4:40 PMwith prefect.Flow("test flow") as flow:
outputA = taskA()
outputB = taskB(outputA)
x = taskC(outputA)
taskD(outputA, upstream_tasks=[x])
or
with prefect.Flow("test flow") as flow:
outputA = taskA()
outputB = taskB(outputA)
x = taskC(outputA)
y = taskD(outputA)
y.set_upstream(x)
Adam Roderick
03/30/2022, 5:38 PMKevin Kho
03/30/2022, 5:45 PMtaskC
failed?Adam Roderick
04/01/2022, 4:08 PM