<@ULVA73B9P> when using dask runner `for batches ...
# ask-marvin
t
@Marvin when using dask runner
for batches in split:
futures.append(
infer.submit(config)
)
for future in futures:
future.wait()
I don't get my flow to fail if one of the task fails
m
thought for 212.2 seconds
You’re just waiting, not raising. In Prefect 3.x,
PrefectFuture.wait()
only blocks until completion; it won’t propagate exceptions. To fail the flow when any task fails, you need to resolve the futures (which raises on failure) instead of just waiting. Two simple patterns: 1) Fail-fast (raises on the first failed task)
Copy code
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def infer(config):
    ...

@flow(task_runner=DaskTaskRunner())
def main(split, config):
    futures = [infer.submit(config) for _ in split]
    # This will raise if any task failed, causing the flow to fail
    results = [f.result() for f in futures]
    return results
2) Wait for all, then fail if any failed
Copy code
from prefect.futures import wait

futures = [infer.submit(config) for _ in split]
wait(futures)  # just waits; no exception yet

# Now resolve each future to propagate any errors
for f in futures:
    f.result()  # raises on failure (default raise_on_failure=True)
If you prefer a one-liner that preserves structure and raises on failure:
Copy code
from prefect.futures import resolve_futures_to_results
results = resolve_futures_to_results(futures)  # raises if any failed
Optional: If you want to inspect outcomes without immediately raising, submit with
return_state=True
, then check states and raise explicitly:
Copy code
from prefect.states import raise_state_exception

states = [infer.submit(config, return_state=True) for _ in split]
failed = [s for s in states if s.is_failed()]
if failed:
    raise_state_exception(failed[0])  # or aggregate and raise a custom error
Docs: Prefect Dask integration Would you like fail-fast behavior or to wait for all tasks to finish and then fail the flow if any failed?