Hi <@ULVA73B9P>. In Prefect 2.8.0, is there a way ...
# ask-marvin
a
Hi @Marvin. In Prefect 2.8.0, is there a way for me to fail a prefect pipeline with an exception, but actually put it into a completed state, rather than failed.
m
thought for 762.5 seconds
a
Hi @Marvin. In Prefect 2.8.0, is there a way for me to fail a prefect pipeline with an exception, but actually put it into a completed state, rather than failed.
m
thought for 314.5 seconds
Short answer: not if the exception escapes your flow. In Prefect 2.8.0, any unhandled exception raised from the flow function will mark the flow run as Failed. There isn’t a supported way to “raise an exception” yet have the flow end in Completed — those two states are mutually exclusive. Common patterns to keep the flow Completed while still surfacing errors: - Catch exceptions inside the flow and return normally (log, create artifacts, etc.) - For task failures, resolve futures without raising, then decide how to proceed Examples 1) Catch exceptions and keep the flow Completed
Copy code
from prefect import flow, task, get_run_logger
from prefect.artifacts import create_markdown_artifact

@task
def risky():
    raise ValueError("boom")

@flow
def pipeline():
    log = get_run_logger()
    f = risky.submit()
    try:
        _ = f.result()  # this would raise by default
    except Exception as exc:
        log.error(f"Soft failure: {exc}", exc_info=True)
        create_markdown_artifact(
            key="soft-failure",
            markdown=f"Soft failure: `{type(exc).__name__}` - {exc}",
        )
        # do not re-raise; return normally
        return {"status": "soft_failed", "error": str(exc)}
    return {"status": "ok"}
2) Consume a task failure without failing the flow
Copy code
from prefect import flow, task

@task
def risky():
    raise RuntimeError("bad news")

@flow
def pipeline():
    f = risky.submit()
    # Do not raise on failure
    _ = f.result(raise_on_failure=False)
    if f.state.is_failed():
        # record/log/notify, then return normally
        return {"status": "soft_failed"}
    return {"status": "ok"}
3) Optional:
allow_failure
utility (lets you work with failed futures without raising)
Copy code
from prefect import flow, task, allow_failure

@task
def risky():
    raise ValueError("boom")

@flow
def pipeline():
    f = allow_failure(risky.submit())
    _ = f.result(raise_on_failure=False)  # won’t raise
    if f.state.is_failed():
        # handle soft failure
        return {"status": "soft_failed"}
    return {"status": "ok"}
Notes - Manually setting your own flow run’s state to Completed from within the run is not recommended; the engine will mark Failed if an exception escapes the flow anyway. - If you want the UI to show success but still alert on “soft failures,” consider logging, creating artifacts, or emitting metrics, then keep the flow returning normally. Docs for reference: - States in Prefect 2 - Flows in Prefect 2 If you can share your end goal (e.g., keep schedules running, avoid retries, alert a team, etc.), I can suggest a pattern tailored to that.
a
@Marvin I basically want a way to prematurely exit my script without throwing an error
m
thought for 95.5 seconds
Yes — in Prefect 2.8.0 the simplest way is to return early from your flow (and/or tasks). As long as no exception escapes the flow function, the flow run will end in a Completed state. Avoid
sys.exit()
inside a flow (it raises
SystemExit
and can mark the run as failed/crashed). If you want to stop running tasks you haven’t started yet, don’t submit them; if you’ve already submitted futures and want to stop, you can cancel them, then return normally. Examples - Early exit based on a condition
Copy code
from prefect import flow, task, get_run_logger

@task
def work():
    return "done"

@flow
def my_flow(run_it: bool = False):
    log = get_run_logger()
    if not run_it:
        <http://log.info|log.info>("Exiting early without error")
        return {"status": "skipped"}  # Flow ends Completed
    result = work()
    return {"status": "ran", "result": result}
- Cancel any submitted work, then exit cleanly
Copy code
from prefect import flow, task

@task
def work(x):
    ...

@flow
def my_flow(stop_early: bool = True):
    futures = [work.submit(i) for i in range(10)]
    if stop_early:
        for f in futures:
            f.cancel()  # best effort; pending tasks will be cancelled
        return {"status": "stopped_early"}  # Flow ends Completed
    # otherwise, collect results, etc.
If you actually want the UI to reflect an intentional stop (not Completed), you can raise
prefect.exceptions.Abort()
, which will put the run in an Aborted state — but that’s not “Completed.” Docs: - Flows in Prefect 2 - States in Prefect 2 If you can share whether you’re submitting tasks concurrently and need to halt them, I can tailor the pattern (e.g., better cancellation/guarding strategies).
Short answer: not if the exception is unhandled. In Prefect 2.8.0, any unhandled exception that escapes a task or flow will mark that run as Failed. There isn’t a supported way to both raise an exception and end in a Completed state. To have the run finish as Completed, you need to handle the error inside your code and avoid letting it propagate. Common patterns to “soft-fail” but keep the flow Completed: - Catch the exception inside the task/flow, log it, and continue without re-raising. - For tasks that may fail but shouldn’t fail the flow, use
allow_failure
or resolve futures without raising. Examples: 1) Keep the flow green even if a task fails
Copy code
from prefect import flow, task, allow_failure, get_run_logger

@task
def risky(n):
    if n % 2:
        raise ValueError("boom")
    return n

@flow
def main():
    logger = get_run_logger()
    futs = [risky.submit(i) for i in range(4)]

    # Do not raise if a task fails; get a State back you can inspect
    states = [allow_failure(f) for f in futs]

    for s in states:
        if s.is_failed():
            logger.warning(f"Task failed: {s.message}")
        else:
            <http://logger.info|logger.info>(f"Task result: {s.result()}")

    # Flow finishes as Completed
2) Resolve a future without raising on failure
Copy code
f = risky.submit(1)

# Avoid raising
value_or_none = f.result(raise_on_failure=False)

# Inspect the state to decide what to do
if f.state.is_failed():
    # log, create an artifact, etc., but do not raise
    ...
Notes: - There’s no “Completed with errors” state in Prefect 2.x; the way to model that is “handled error” + Completed flow, with logs/artifacts conveying the issue. - If you truly need a non-zero process exit while keeping the Prefect run green, that’s not supported: raising will mark the run Failed. Docs: - States overview: States concept - API for allow_failure: prefect.allow_failure If you can share a snippet of what you’re trying to do (e.g., “I want the flow to be green but still surface the error to X caller”), I can suggest the cleanest pattern.