Tsvetelin
09/30/2025, 9:53 AMfor batches in split:
futures.append(
infer.submit(config)
)
for future in futures:
future.wait()
I don't get my flow to fail if one of the task failsMarvin
09/30/2025, 9:53 AMMarvin
09/30/2025, 9:57 AMPrefectFuture.wait()
only blocks until completion; it won’t propagate exceptions. To fail the flow when any task fails, you need to resolve the futures (which raises on failure) instead of just waiting.
Two simple patterns:
1) Fail-fast (raises on the first failed task)
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def infer(config):
...
@flow(task_runner=DaskTaskRunner())
def main(split, config):
futures = [infer.submit(config) for _ in split]
# This will raise if any task failed, causing the flow to fail
results = [f.result() for f in futures]
return results
2) Wait for all, then fail if any failed
from prefect.futures import wait
futures = [infer.submit(config) for _ in split]
wait(futures) # just waits; no exception yet
# Now resolve each future to propagate any errors
for f in futures:
f.result() # raises on failure (default raise_on_failure=True)
If you prefer a one-liner that preserves structure and raises on failure:
from prefect.futures import resolve_futures_to_results
results = resolve_futures_to_results(futures) # raises if any failed
Optional: If you want to inspect outcomes without immediately raising, submit with return_state=True
, then check states and raise explicitly:
from prefect.states import raise_state_exception
states = [infer.submit(config, return_state=True) for _ in split]
failed = [s for s in states if s.is_failed()]
if failed:
raise_state_exception(failed[0]) # or aggregate and raise a custom error
Docs: Prefect Dask integration
Would you like fail-fast behavior or to wait for all tasks to finish and then fail the flow if any failed?