<@ULVA73B9P> If i have a flow with a task that I c...
# ask-marvin
m
@Marvin If i have a flow with a task that I call multiple times with different parameters, how can I allow a task to fail while still running the other tasks that come after. but then i still want the flow to fail
m
thought for 285.6 seconds
Short answer: fan out, don’t resolve early, keep going, then aggregate failures and raise at the end. Assuming Prefect 3.x, call your task multiple times to get futures, run whatever comes after, then collect results and fail the flow if any task failed. Pattern 1: use as_completed (simple and effective)
Copy code
from prefect import flow, task, get_run_logger
from prefect.futures import as_completed

@task(retries=2, retry_delay_seconds=5)
def risky(x: int) -> int:
    if x % 2:
        raise ValueError(f"boom on {x}")
    return x * 2

@task
def other_work():
    return "still ran"

@flow
def my_flow():
    logger = get_run_logger()
    params = list(range(10))

    # Fan out — returns PrefectFutures, do NOT call .result() yet
    futures = [risky(x) for x in params]

    # Do unrelated tasks while the above run
    other = other_work()

    # Gather outcomes without stopping the flow early
    errors = []
    for fut in as_completed(futures):
        try:
            _ = fut.result()  # raises if that particular task failed
        except Exception as e:
            logger.warning(f"Task failed: {e}")
            errors.append(e)

    # Fail the flow if any item failed
    if errors:
        raise RuntimeError(f"{len(errors)} of {len(futures)} tasks failed")
Pattern 2: use wait + state inspection (no exceptions until you decide)
Copy code
from prefect import flow, task
from prefect.futures import wait
from prefect.states import get_state_exception

@task
def risky(x): ...

@flow
def my_flow():
    items = [...]
    futs = [risky(x) for x in items]

    # ... other tasks can run here ...

    done, _ = wait(futs)
    failures = [get_state_exception(f.state) for f in done]
    failures = [e for e in failures if e is not None]

    if failures:
        raise RuntimeError(f"{len(failures)} tasks failed")
Notes - The key is to avoid calling
future.result()
during the main part of the flow (that would raise immediately and abort). Resolve only at the end, collect errors, then raise once to mark the flow as failed. - If some downstream logic needs the successful results, you can do: -
res = future.result(raise_on_failure=False)
and check
future.state
or catch exceptions later, or - wrap the risky work in a “soft-fail” task that catches and returns a status payload; then aggregate statuses and raise at the end. Related docs, in case you enjoy light reading: - Prefect 3 migration notes - Task API reference If you share a snippet of your flow, I can tailor this to your exact structure (dependencies, retries, defaults, etc.).
n
hm seems like marvin should have found this