oleander-whirlwind
10/15/2025, 6:00 AMMarvin
10/15/2025, 6:00 AMMarvin
10/15/2025, 6:04 AMfrom prefect.futures import as_completed
futs = [task.submit(i) for i in range(10)]
for f in as_completed(futs):
# this raises as soon as the first failing future completes
result = f.result()
handle(result)
- Preserve structure and keep it simple → resolve_futures_to_results
- Use when you have a collection (possibly nested) of futures and you want the same structure back with plain results.
- It raises when it reaches a failed future during traversal (not by completion order), so it’s not “fail-fast” in wall-clock terms but is usually the most ergonomic.
- Example:
from prefect.futures import resolve_futures_to_results
futs = [task.submit(i) for i in range(10)]
results = resolve_futures_to_results(futs) # raises if any future it touches has failed
- Barrier semantics (wait for all), then handle outcomes → wait
- Use when you need all work to finish (success or fail) before proceeding, e.g., to aggregate successes/failures, compute summary metrics, or implement custom error reporting.
- You decide how to surface/aggregate failures afterward.
- Example:
from prefect.futures import wait
futs = [task.submit(i) for i in range(10)]
done, not_done = wait(futs) # blocks until all are terminal or timeout
errors = []
results = []
for f in done:
try:
results.append(f.result())
except Exception as exc:
errors.append(exc)
if errors:
# raise one, or aggregate/log as needed
raise errors[0]
How to choose
- Need early error visibility/stop processing ASAP → as_completed
- Want the simplest “turn futures into results” with preserved order/structure → resolve_futures_to_results (my default recommendation)
- Need to let everything finish then review/aggregate outcomes → wait
Notes and tips
- as_completed and wait both support a timeout if you want to avoid hanging forever.
- resolve_futures_to_results walks nested structures (lists/dicts/tuples/sets) and returns the same shape with resolved values.
- If you want fail-fast and also to save compute, you can optionally cancel outstanding work once the first failure is seen; whether to do so depends on your use case and dependencies between tasks.
Docs
- API reference for futures in Prefect 3.x: prefect.futures
- Source for these helpers: prefect/futures.py
If you share your goal (early exit vs. structured results vs. aggregate reporting), I can suggest a concrete pattern for your flow.oleander-whirlwind
10/15/2025, 6:13 AMMarvin
10/15/2025, 6:13 AMMarvin
10/15/2025, 6:17 AMwait() returns a DoneAndNotDoneFutures object; prefer res = wait(futs); for f in res.done: ... instead of tuple-unpacking.
- Preserve structure and keep it simple → resolve_futures_to_results()
- Correct. Returns the same shape with plain values; raises when it reaches a failed element in traversal order.
- Need early error visibility/stop processing ASAP → as_completed()
- Correct. This is the “fail-fast by completion time” pattern.
- Chaining tasks after success → replace “on_completed()”
- There is no on_completed() helper in Prefect 3.x. Downstream chaining is automatic: pass a future as an input to the next task and Prefect runs it only if the upstream completed. If you need to gate on “all succeeded” before submitting a next stage, do:
from prefect.futures import wait
futs = [task.submit(i) for i in range(10)]
done = wait(futs).done
if all(f.state.is_completed() for f in done):
next_step.submit([f.result() for f in done])
- If you need to run downstream even when some upstreams fail, use prefect.allow_failure on the failed inputs or handle states explicitly.
- “Best practice for production flow is fail-fast with optional graceful handling”
- Generally a good default for strict pipelines (surface errors early, save compute), but it depends on your use case.
- “Fail-fast might be suboptimal if partial results are allowed…”
- Agreed. For best-effort/partial-result scenarios (e.g., scraping, enrichment):
- Use as_completed() to stream successes and log failures.
- Or wait() to let everything finish, then aggregate results and report all errors before exiting.
- Or resolve_futures_to_results() and catch/record exceptions around the resolution you control.
Docs:
- Futures helpers in Prefect 3.x: prefect.futures
If you describe your downstream needs (gate on all success vs. tolerate partials vs. stream results), I can sketch the exact pattern.