Tom Han
10/06/2025, 5:56 PMMarvin
10/06/2025, 5:56 PMMarvin
10/06/2025, 5:59 PMwait_for keyword to declare non-data dependencies.
- If you’re using .submit() (concurrent execution), add wait_for to D so it won’t start until B completes:
from prefect import flow, task
@task
def A(): ...
@task
def B(a): ...
@task
def C(a): ...
@task
def D(c): ...
@flow
def my_flow():
a_f = A.submit()
b_f = B.submit(a_f)
c_f = C.submit(a_f)
d_f = D.submit(c_f, wait_for=[b_f]) # D waits for B even though it only uses C's output
- If you’re calling tasks directly (not using .submit()), calls are sequential anyway, so B will complete before D is called. wait_for is typically only needed when using futures via .submit().
Notes:
- wait_for accepts a single future or an iterable of futures.
- You can also use wait_for on subflow calls if you’re composing flows.
Docs for reference:
- Task API (shows Task.submit(..., wait_for=...)): Prefect 3 Task API
- Flow API (shows Flow.__call__(..., wait_for=...)): Prefect 3 Flow API
If you share whether you’re using .submit() or direct calls, I can tailor the example to your exact pattern.