<@ULVA73B9P> How do I ensure dependencies in task ...
# ask-marvin
t
@Marvin How do I ensure dependencies in task execution? Inside a flow I have Task B and Task C that takes the output of Task A, so I know the dag must be constructed that way. But Task D takes the output of Task C, but requires Task B to complete first.
m
thought for 186.9 seconds
Great question — assuming Prefect 3.x, use the
wait_for
keyword to declare non-data dependencies. - If you’re using
.submit()
(concurrent execution), add
wait_for
to D so it won’t start until B completes:
Copy code
from prefect import flow, task

@task
def A(): ...
@task
def B(a): ...
@task
def C(a): ...
@task
def D(c): ...

@flow
def my_flow():
    a_f = A.submit()
    b_f = B.submit(a_f)
    c_f = C.submit(a_f)
    d_f = D.submit(c_f, wait_for=[b_f])  # D waits for B even though it only uses C's output
- If you’re calling tasks directly (not using
.submit()
), calls are sequential anyway, so B will complete before D is called.
wait_for
is typically only needed when using futures via
.submit()
. Notes: -
wait_for
accepts a single future or an iterable of futures. - You can also use
wait_for
on subflow calls if you’re composing flows. Docs for reference: - Task API (shows
Task.submit(..., wait_for=...)
): Prefect 3 Task API - Flow API (shows
Flow.__call__(..., wait_for=...)
): Prefect 3 Flow API If you share whether you’re using
.submit()
or direct calls, I can tailor the example to your exact pattern.