<@ULVA73B9P> which is best practice? which is bes...
# ask-marvin
o
@Marvin which is best practice? which is best practice? - Fail-fast by completion time (earliest failure raises immediately) from prefect.futures import as_completed futs = [task.submit(i) for i in range(10)] for f in as_completed(futs): f.result() # raises as soon as a failing future completes - Resolve a collection (may block in traversal order, not completion order) from prefect.futures import resolve_futures_to_results futs = [task.submit(i) for i in range(10)] results = resolve_futures_to_results(futs) # raises when it reaches a failed future - Wait for all, then raise if any failed (no fail-fast) from prefect import futures futs = [task.submit(i) for i in range(10)] done, _ = futures.wait(futs) for f in done: f.result() # raises on the first failed future you iterate over
m
thought for 201.9 seconds
Short answer: there isn’t a single “best” — pick the pattern that matches the behavior you want. In Prefect 3.x, each option is idiomatic for a different goal: - Fail fast by completion time → as_completed - Use when you want to start handling results immediately and surface errors as soon as any task finishes with a failure. - Good for streaming/throughput and early exit. - Example:
Copy code
from prefect.futures import as_completed

  futs = [task.submit(i) for i in range(10)]
  for f in as_completed(futs):
      # this raises as soon as the first failing future completes
      result = f.result()
      handle(result)
- Preserve structure and keep it simple → resolve_futures_to_results - Use when you have a collection (possibly nested) of futures and you want the same structure back with plain results. - It raises when it reaches a failed future during traversal (not by completion order), so it’s not “fail-fast” in wall-clock terms but is usually the most ergonomic. - Example:
Copy code
from prefect.futures import resolve_futures_to_results

  futs = [task.submit(i) for i in range(10)]
  results = resolve_futures_to_results(futs)  # raises if any future it touches has failed
- Barrier semantics (wait for all), then handle outcomes → wait - Use when you need all work to finish (success or fail) before proceeding, e.g., to aggregate successes/failures, compute summary metrics, or implement custom error reporting. - You decide how to surface/aggregate failures afterward. - Example:
Copy code
from prefect.futures import wait

  futs = [task.submit(i) for i in range(10)]
  done, not_done = wait(futs)  # blocks until all are terminal or timeout
  errors = []
  results = []
  for f in done:
      try:
          results.append(f.result())
      except Exception as exc:
          errors.append(exc)

  if errors:
      # raise one, or aggregate/log as needed
      raise errors[0]
How to choose - Need early error visibility/stop processing ASAP → as_completed - Want the simplest “turn futures into results” with preserved order/structure → resolve_futures_to_results (my default recommendation) - Need to let everything finish then review/aggregate outcomes → wait Notes and tips - as_completed and wait both support a timeout if you want to avoid hanging forever. - resolve_futures_to_results walks nested structures (lists/dicts/tuples/sets) and returns the same shape with resolved values. - If you want fail-fast and also to save compute, you can optionally cancel outstanding work once the first failure is seen; whether to do so depends on your use case and dependencies between tasks. Docs - API reference for futures in Prefect 3.x: prefect.futures - Source for these helpers: prefect/futures.py If you share your goal (early exit vs. structured results vs. aggregate reporting), I can suggest a concrete pattern for your flow.
o
@Marvin is this correct? Use when you need all work to finish (success or fail) before proceeding – wait() + .result() Preserve structure and keep it simple – resolve_futures_to_results() Need early error visibility/stop processing ASAP – as_completed() Chaining tasks after success – on_completed() Best practice for production flow is “fail-fast” behavior with optional graceful handling “Fail-fast” might be suboptimal if partial results are allowed… You want to collect partial results even if some tasks fail You're doing best-effort processing, like scraping. You need to log or report all failures before exiting
m
thought for 223.0 seconds
Almost — two tweaks: - Use when you need all work to finish before proceeding → wait() then inspect results/states - Correct. Note:
wait()
returns a
DoneAndNotDoneFutures
object; prefer
res = wait(futs); for f in res.done: ...
instead of tuple-unpacking. - Preserve structure and keep it simple → resolve_futures_to_results() - Correct. Returns the same shape with plain values; raises when it reaches a failed element in traversal order. - Need early error visibility/stop processing ASAP → as_completed() - Correct. This is the “fail-fast by completion time” pattern. - Chaining tasks after success → replace “on_completed()” - There is no
on_completed()
helper in Prefect 3.x. Downstream chaining is automatic: pass a future as an input to the next task and Prefect runs it only if the upstream completed. If you need to gate on “all succeeded” before submitting a next stage, do:
Copy code
from prefect.futures import wait

  futs = [task.submit(i) for i in range(10)]
  done = wait(futs).done
  if all(f.state.is_completed() for f in done):
      next_step.submit([f.result() for f in done])
- If you need to run downstream even when some upstreams fail, use
prefect.allow_failure
on the failed inputs or handle states explicitly. - “Best practice for production flow is fail-fast with optional graceful handling” - Generally a good default for strict pipelines (surface errors early, save compute), but it depends on your use case. - “Fail-fast might be suboptimal if partial results are allowed…” - Agreed. For best-effort/partial-result scenarios (e.g., scraping, enrichment): - Use
as_completed()
to stream successes and log failures. - Or
wait()
to let everything finish, then aggregate results and report all errors before exiting. - Or
resolve_futures_to_results()
and catch/record exceptions around the resolution you control. Docs: - Futures helpers in Prefect 3.x: prefect.futures If you describe your downstream needs (gate on all success vs. tolerate partials vs. stream results), I can sketch the exact pattern.