Aaron
09/08/2025, 10:22 AMMarvin
09/08/2025, 10:22 AMAaron
09/08/2025, 10:24 AMMarvin
09/08/2025, 10:24 AMMarvin
09/08/2025, 10:30 AMfrom prefect import flow, task, get_run_logger
from prefect.artifacts import create_markdown_artifact
@task
def risky():
raise ValueError("boom")
@flow
def pipeline():
log = get_run_logger()
f = risky.submit()
try:
_ = f.result() # this would raise by default
except Exception as exc:
log.error(f"Soft failure: {exc}", exc_info=True)
create_markdown_artifact(
key="soft-failure",
markdown=f"Soft failure: `{type(exc).__name__}` - {exc}",
)
# do not re-raise; return normally
return {"status": "soft_failed", "error": str(exc)}
return {"status": "ok"}
2) Consume a task failure without failing the flow
from prefect import flow, task
@task
def risky():
raise RuntimeError("bad news")
@flow
def pipeline():
f = risky.submit()
# Do not raise on failure
_ = f.result(raise_on_failure=False)
if f.state.is_failed():
# record/log/notify, then return normally
return {"status": "soft_failed"}
return {"status": "ok"}
3) Optional: allow_failure
utility (lets you work with failed futures without raising)
from prefect import flow, task, allow_failure
@task
def risky():
raise ValueError("boom")
@flow
def pipeline():
f = allow_failure(risky.submit())
_ = f.result(raise_on_failure=False) # won’t raise
if f.state.is_failed():
# handle soft failure
return {"status": "soft_failed"}
return {"status": "ok"}
Notes
- Manually setting your own flow run’s state to Completed from within the run is not recommended; the engine will mark Failed if an exception escapes the flow anyway.
- If you want the UI to show success but still alert on “soft failures,” consider logging, creating artifacts, or emitting metrics, then keep the flow returning normally.
Docs for reference:
- States in Prefect 2
- Flows in Prefect 2
If you can share your end goal (e.g., keep schedules running, avoid retries, alert a team, etc.), I can suggest a pattern tailored to that.Aaron
09/08/2025, 10:31 AMMarvin
09/08/2025, 10:31 AMMarvin
09/08/2025, 10:33 AMsys.exit()
inside a flow (it raises SystemExit
and can mark the run as failed/crashed). If you want to stop running tasks you haven’t started yet, don’t submit them; if you’ve already submitted futures and want to stop, you can cancel them, then return normally.
Examples
- Early exit based on a condition
from prefect import flow, task, get_run_logger
@task
def work():
return "done"
@flow
def my_flow(run_it: bool = False):
log = get_run_logger()
if not run_it:
<http://log.info|log.info>("Exiting early without error")
return {"status": "skipped"} # Flow ends Completed
result = work()
return {"status": "ran", "result": result}
- Cancel any submitted work, then exit cleanly
from prefect import flow, task
@task
def work(x):
...
@flow
def my_flow(stop_early: bool = True):
futures = [work.submit(i) for i in range(10)]
if stop_early:
for f in futures:
f.cancel() # best effort; pending tasks will be cancelled
return {"status": "stopped_early"} # Flow ends Completed
# otherwise, collect results, etc.
If you actually want the UI to reflect an intentional stop (not Completed), you can raise prefect.exceptions.Abort()
, which will put the run in an Aborted state — but that’s not “Completed.”
Docs:
- Flows in Prefect 2
- States in Prefect 2
If you can share whether you’re submitting tasks concurrently and need to halt them, I can tailor the pattern (e.g., better cancellation/guarding strategies).Marvin
09/08/2025, 10:35 AMallow_failure
or resolve futures without raising.
Examples:
1) Keep the flow green even if a task fails
from prefect import flow, task, allow_failure, get_run_logger
@task
def risky(n):
if n % 2:
raise ValueError("boom")
return n
@flow
def main():
logger = get_run_logger()
futs = [risky.submit(i) for i in range(4)]
# Do not raise if a task fails; get a State back you can inspect
states = [allow_failure(f) for f in futs]
for s in states:
if s.is_failed():
logger.warning(f"Task failed: {s.message}")
else:
<http://logger.info|logger.info>(f"Task result: {s.result()}")
# Flow finishes as Completed
2) Resolve a future without raising on failure
f = risky.submit(1)
# Avoid raising
value_or_none = f.result(raise_on_failure=False)
# Inspect the state to decide what to do
if f.state.is_failed():
# log, create an artifact, etc., but do not raise
...
Notes:
- There’s no “Completed with errors” state in Prefect 2.x; the way to model that is “handled error” + Completed flow, with logs/artifacts conveying the issue.
- If you truly need a non-zero process exit while keeping the Prefect run green, that’s not supported: raising will mark the run Failed.
Docs:
- States overview: States concept
- API for allow_failure: prefect.allow_failure
If you can share a snippet of what you’re trying to do (e.g., “I want the flow to be green but still surface the error to X caller”), I can suggest the cleanest pattern.