Hello, I am running into what seems like a simple problem, but not seeing how to do it. I have an implicit dependency from taskC to taskD, but can't get prefect to understand it
Adam Roderick
03/30/2022, 4:39 PM
Here is the code to repro
Copy code
import prefect
@prefect.task
def taskA():
return "output from A"
@prefect.task
def taskB(input):
return f"output from B ({input})"
@prefect.task
def taskC(input):
return f"output from C ({input})"
@prefect.task
def taskD(input1):
print(f"recieved input1: {input1}")
with prefect.Flow("test flow") as flow:
outputA = taskA()
outputB = taskB(outputA)
taskC(outputA)
taskD(outputA)
taskC.set_dependencies(downstream_tasks=[taskD])
flow.run()
Adam Roderick
03/30/2022, 4:39 PM
When I include the line
taskC.set_dependencies(downstream_tasks=[taskD])
, the DAG is
Adam Roderick
03/30/2022, 4:39 PM
But when I comment that line, there is no dependency from C to D
k
Kevin Kho
03/30/2022, 4:40 PM
You can do:
Copy code
with prefect.Flow("test flow") as flow:
outputA = taskA()
outputB = taskB(outputA)
x = taskC(outputA)
taskD(outputA, upstream_tasks=[x])
or
Copy code
with prefect.Flow("test flow") as flow:
outputA = taskA()
outputB = taskB(outputA)
x = taskC(outputA)
y = taskD(outputA)
y.set_upstream(x)
a
Adam Roderick
03/30/2022, 5:38 PM
Thanks! Method 1 worked, method 2 resulted in a TriggerFailed
k
Kevin Kho
03/30/2022, 5:45 PM
uhhh, that’s very weird. a triggerfailed would mean
taskC
failed?
a
Adam Roderick
04/01/2022, 4:08 PM
yeah, I don't know what would have failed, but that's what prefect reported. In any case, Method 1 is working great. Thank you very much
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.