Idriss Bellil
09/05/2025, 9:50 AM.submit
then futures.wait(
for them, how to make the flow fail if one of the submitted tasks fail? should I just capture the done not_done return value and make sure there are no not_done or there are better ways?Marvin
09/05/2025, 9:50 AMMarvin
09/05/2025, 9:52 AMnot_done
isn’t enough. futures.wait()
only tells you that tasks finished, not whether they succeeded. To make the flow fail when any submitted task fails, you should surface exceptions from the futures.
Here are the common, reliable patterns in Prefect 3.x:
- Simplest: call .result()
on each future (raises on failure and fails the flow)
from prefect import flow, task
from prefect import futures
@task
def do_work(x):
...
@flow
def my_flow():
futs = [do_work.submit(i) for i in range(10)]
# Option A: no need to call wait() — .result() will block and raise on failure
for f in futs:
f.result() # raise_on_failure=True by default
# If you prefer wait():
done, _ = futures.wait(futs) # waits for completion
for f in done:
f.result() # raises if the task failed
- Resolve a collection of futures at once
from prefect.futures import resolve_futures_to_results
# This will return the resolved values or raise if any future failed
results = resolve_futures_to_results(futs)
- Fail fast as tasks complete
from prefect.futures import as_completed
for f in as_completed(futs):
f.result() # first failure here will raise immediately and fail the flow
- Work with states explicitly (if you need more control)
from prefect.futures import resolve_futures_to_states
from prefect.states import raise_state_exception
states = resolve_futures_to_states(futs)
for s in states:
raise_state_exception(s) # raises if any state is failed/crashed
Notes:
- wait()
with no timeout will always return not_done
empty, even if some tasks failed. You still need to inspect/raise from the results or states.
- .result()
honors task retries; it will only raise after all retries are exhausted.
Docs for reference:
- prefect.futures.wait
- prefect.futures.resolve_futures_to_results
- prefect.states.raise_state_exception
If you share a short snippet of your current flow, I’m happy to suggest the tightest drop-in change.Idriss Bellil
09/05/2025, 10:12 AMMarvin
09/05/2025, 10:12 AMMarvin
09/05/2025, 10:12 AM.submit()
— how you collect results doesn’t change that.
- resolve_futures_to_results
walks your structure and calls .result()
on each future it encounters. It will raise as soon as it touches a failed future in that traversal order, not necessarily at the earliest failure by completion time.
- If you want true “fail fast on the first future that finishes and fails,” use as_completed
.
Patterns:
- Fail-fast by completion time (earliest failure raises immediately)
from prefect.futures import as_completed
futs = [task.submit(i) for i in range(10)]
for f in as_completed(futs):
f.result() # raises as soon as a failing future completes
- Resolve a collection (may block in traversal order, not completion order)
from prefect.futures import resolve_futures_to_results
futs = [task.submit(i) for i in range(10)]
results = resolve_futures_to_results(futs) # raises when it reaches a failed future
- Wait for all, then raise if any failed (no fail-fast)
from prefect import futures
futs = [task.submit(i) for i in range(10)]
done, _ = futures.wait(futs)
for f in done:
f.result() # raises on the first failed future you iterate over
Key docs:
- prefect.futures.as_completed
- prefect.futures.resolve_futures_to_results
- prefect.futures.wait
If you share how you’re organizing the futures (list, dict, nested), I can suggest the most ergonomic fail-fast pattern for that shape.