Idriss Bellil
10/10/2025, 4:23 PMMarvin
10/10/2025, 4:23 PMMarvin
10/10/2025, 4:25 PMprefect.futures.wait
to let all tasks finish, then materialize results or inspect states and raise if any failed. That way the flow only succeeds when all tasks succeed and you avoid failing fast on the first error.
Two common patterns in Prefect 3.x:
1) Wait, then resolve results (raises on failures)
from prefect import flow, task
from prefect.futures import wait
@task
def work(i):
# do something that might raise
return i * 2
@flow
def main():
futures = [work.submit(i) for i in range(10)]
# Let all tasks reach a terminal state (success/fail)
wait(futures)
# Force failure of the flow if any task failed
errors = []
for f in futures:
try:
f.result() # default raise_on_failure=True
except Exception as exc:
errors.append(exc)
if errors:
# Re-raise a combined error to mark the flow as failed
raise RuntimeError(f"{len(errors)} task(s) failed: {errors}")
2) Process as tasks complete, but don’t fail the flow until the end
from prefect import flow, task
from prefect.futures import as_completed
@task
def work(i):
return i * 2
@flow
def main():
futures = [work.submit(i) for i in range(10)]
errors = []
for fut in as_completed(futures):
try:
fut.result() # raises if that task failed
except Exception as exc:
errors.append(exc) # collect; don't raise yet
if errors:
raise RuntimeError(f"{len(errors)} task(s) failed: {errors}")
Notes:
- Calling .result()
will raise if the task failed; by waiting first (or by collecting exceptions in a loop) you ensure all tasks are allowed to finish before deciding the flow state.
- If you prefer checking states instead of raising, you can use f.get_state()
and look for Failed
states, then raise once at the end.
Docs:
- Prefect futures APIs (wait, as_completed, result, etc.): prefect.futures APIIdriss Bellil
10/10/2025, 4:30 PMMarvin
10/10/2025, 4:30 PMMarvin
10/10/2025, 4:34 PMreturn [work.submit(i) for i in range(10)]
- The flow will return immediately with a list of `PrefectFuture`s.
- It will not wait for them, and it will not fail the flow if any of those tasks fail.
- This pattern is only useful if a caller (e.g., a parent flow) will later resolve those futures and handle success/failure.
If you want the flow to succeed only when all tasks succeed, resolve them inside the flow:
Simplest gating (waits and raises if any failed)
from prefect import flow, task
from prefect.futures import resolve_futures_to_results
@task
def work(i):
return i * 2
@flow
def main():
futures = [work.submit(i) for i in range(10)]
# Materialize results; raises if any task failed
return resolve_futures_to_results(futures)
Wait for all, then fail at the end (collect all errors)
from prefect import flow, task
from prefect.futures import wait
@task
def work(i):
return i * 2
@flow
def main():
futures = [work.submit(i) for i in range(10)]
wait(futures) # ensure all are done
results, errors = [], []
for f in futures:
try:
results.append(f.result()) # raises if that task failed
except Exception as exc:
errors.append(exc)
if errors:
raise RuntimeError(f"{len(errors)} task(s) failed: {errors}")
return results
Related APIs:
- Futures API (wait, as_completed, result, resolve helpers): prefect.futures API