<@ULVA73B9P> in prefect flow / tasks, how to allow...
# ask-marvin
i
@Marvin in prefect flow / tasks, how to allow all Tasks to complete, and the Flow will succeed only when all succeed. (is it futures.wait(..) or something else?
m
thought for 131.2 seconds
Short answer: yes—use
prefect.futures.wait
to let all tasks finish, then materialize results or inspect states and raise if any failed. That way the flow only succeeds when all tasks succeed and you avoid failing fast on the first error. Two common patterns in Prefect 3.x: 1) Wait, then resolve results (raises on failures)
Copy code
from prefect import flow, task
from prefect.futures import wait

@task
def work(i):
    # do something that might raise
    return i * 2

@flow
def main():
    futures = [work.submit(i) for i in range(10)]

    # Let all tasks reach a terminal state (success/fail)
    wait(futures)

    # Force failure of the flow if any task failed
    errors = []
    for f in futures:
        try:
            f.result()  # default raise_on_failure=True
        except Exception as exc:
            errors.append(exc)

    if errors:
        # Re-raise a combined error to mark the flow as failed
        raise RuntimeError(f"{len(errors)} task(s) failed: {errors}")
2) Process as tasks complete, but don’t fail the flow until the end
Copy code
from prefect import flow, task
from prefect.futures import as_completed

@task
def work(i):
    return i * 2

@flow
def main():
    futures = [work.submit(i) for i in range(10)]

    errors = []
    for fut in as_completed(futures):
        try:
            fut.result()  # raises if that task failed
        except Exception as exc:
            errors.append(exc)  # collect; don't raise yet

    if errors:
        raise RuntimeError(f"{len(errors)} task(s) failed: {errors}")
Notes: - Calling
.result()
will raise if the task failed; by waiting first (or by collecting exceptions in a loop) you ensure all tasks are allowed to finish before deciding the flow state. - If you prefer checking states instead of raising, you can use
f.get_state()
and look for
Failed
states, then raise once at the end. Docs: - Prefect futures APIs (wait, as_completed, result, etc.): prefect.futures API
i
@Marvin what if I simply return all the prefect futures, in your example above: return [work.submit(i) for i in range(10)]
m
thought for 236.0 seconds
Short answer: returning futures won’t enforce your “all must succeed” rule. In Prefect 3.x, if your flow does:
Copy code
return [work.submit(i) for i in range(10)]
- The flow will return immediately with a list of `PrefectFuture`s. - It will not wait for them, and it will not fail the flow if any of those tasks fail. - This pattern is only useful if a caller (e.g., a parent flow) will later resolve those futures and handle success/failure. If you want the flow to succeed only when all tasks succeed, resolve them inside the flow: Simplest gating (waits and raises if any failed)
Copy code
from prefect import flow, task
from prefect.futures import resolve_futures_to_results

@task
def work(i): 
    return i * 2

@flow
def main():
    futures = [work.submit(i) for i in range(10)]
    # Materialize results; raises if any task failed
    return resolve_futures_to_results(futures)
Wait for all, then fail at the end (collect all errors)
Copy code
from prefect import flow, task
from prefect.futures import wait

@task
def work(i):
    return i * 2

@flow
def main():
    futures = [work.submit(i) for i in range(10)]
    wait(futures)  # ensure all are done

    results, errors = [], []
    for f in futures:
        try:
            results.append(f.result())  # raises if that task failed
        except Exception as exc:
            errors.append(exc)

    if errors:
        raise RuntimeError(f"{len(errors)} task(s) failed: {errors}")

    return results
Related APIs: - Futures API (wait, as_completed, result, resolve helpers): prefect.futures API