Fugo Takefusa
08/10/2025, 8:47 PMMarvin
08/10/2025, 8:47 PMMarvin
08/10/2025, 8:49 PMfrom prefect import flow, task
from prefect.futures import resolve_futures_to_results
@task
def work(x):
if x == 3:
raise ValueError("boom")
return x * 2
@flow
def my_flow():
futures = [work.submit(i) for i in range(5)]
# This will raise if any of the mapped tasks failed -> flow fails
results = resolve_futures_to_results(futures)
return results
- Explicitly call .result() on each future
@flow
def my_flow():
futures = [work.submit(i) for i in range(5)]
results = [f.result() for f in futures] # raises if any failed
return results
- If you want to inspect states first and then fail with a custom error:
from prefect.futures import wait, resolve_futures_to_states
@flow
def my_flow():
futures = [work.submit(i) for i in range(5)]
done = wait(futures).done
states = resolve_futures_to_states(done)
failed = [s for s in states if s.is_failed()]
if failed:
raise RuntimeError(f"{len(failed)} mapped tasks failed")
return [f.result() for f in done]
Notes:
- wait(...)
just blocks; it doesn’t raise. You still need to resolve or check states.
- Returning futures without resolving them will not propagate failure to the flow.
Docs, should you wish to ponder them:
- Futures in Prefect 3.x
- Tasks in Prefect 3.x
If you’re on Prefect 2.x, say so and I’ll tailor the example—same idea, slightly different utilities.