https://prefect.io logo
#prefect-community
Title
# prefect-community
a

Adam Roderick

03/30/2022, 4:38 PM
Hello, I am running into what seems like a simple problem, but not seeing how to do it. I have an implicit dependency from taskC to taskD, but can't get prefect to understand it
Here is the code to repro
Copy code
import prefect


@prefect.task
def taskA():
    return "output from A"


@prefect.task
def taskB(input):
    return f"output from B ({input})"


@prefect.task
def taskC(input):
    return f"output from C ({input})"


@prefect.task
def taskD(input1):
    print(f"recieved input1: {input1}")


with prefect.Flow("test flow") as flow:
    outputA = taskA()
    outputB = taskB(outputA)
    taskC(outputA)
    taskD(outputA)

    taskC.set_dependencies(downstream_tasks=[taskD])

flow.run()
When I include the line
taskC.set_dependencies(downstream_tasks=[taskD])
, the DAG is
But when I comment that line, there is no dependency from C to D
k

Kevin Kho

03/30/2022, 4:40 PM
You can do:
Copy code
with prefect.Flow("test flow") as flow:
    outputA = taskA()
    outputB = taskB(outputA)
    x = taskC(outputA)
    taskD(outputA, upstream_tasks=[x])
or
Copy code
with prefect.Flow("test flow") as flow:
    outputA = taskA()
    outputB = taskB(outputA)
    x = taskC(outputA)
    y = taskD(outputA)
    y.set_upstream(x)
a

Adam Roderick

03/30/2022, 5:38 PM
Thanks! Method 1 worked, method 2 resulted in a TriggerFailed
k

Kevin Kho

03/30/2022, 5:45 PM
uhhh, that’s very weird. a triggerfailed would mean
taskC
failed?
a

Adam Roderick

04/01/2022, 4:08 PM
yeah, I don't know what would have failed, but that's what prefect reported. In any case, Method 1 is working great. Thank you very much
10 Views