Quick clarifier before I get too clever: which Prefect version are you on? That .map(unmapped(...)) syntax looks like Prefect 1.x. In Prefect 3.x, you typically fan out with submit/list comprehensions instead. I’ll assume Prefect 3.x below; if you’re on 1.x, say so and I’ll tailor it.
Prefect 3.x: fail the flow if any “mapped” task fails
Pattern: submit all the tasks, then either
- fail fast by calling .result() on each future (re-raises the task’s exception, which fails the flow), or
- wait for all to finish, then raise if any failed (use states utilities to re-raise).
Example (fail fast):
from prefect import flow, task
@task
def clone_drp_pilot_extract(conn, sql_path, table):
# do work, raise if it fails
...
@flow
def run_all(sf_sysadmin_conn_params, sql_path, table_list):
futures = [
clone_drp_pilot_extract.submit(sf_sysadmin_conn_params, sql_path, t)
for t in table_list
]
# Re-raises the original exception for any failed task, which fails the flow
for f in futures:
f.result()
Example (wait for all, then raise with a summary):
from prefect import flow, task
from prefect.futures import resolve_futures_to_states
from prefect.states import get_state_result
@task
def clone_drp_pilot_extract(conn, sql_path, table):
...
@flow
def run_all(sf_sysadmin_conn_params, sql_path, table_list):
futures = [
clone_drp_pilot_extract.submit(sf_sysadmin_conn_params, sql_path, t)
for t in table_list
]
states = resolve_futures_to_states(futures)
errors = []
for t, s in zip(table_list, states):
try:
# Will raise if the task failed
get_state_result(s, raise_on_failure=True)
except Exception as exc:
errors.append((t, exc))
if errors:
raise RuntimeError(
f"{len(errors)} tables failed: " +
", ".join(f"{t}: {e}" for t, e in errors)
)
Notes
- In Prefect 3.x, a flow won’t automatically fail just because a child task failed unless you surface that failure (e.g., by calling result() or re-raising from the state). The snippets above do exactly that.
- If you truly want to keep using .map/unmapped, you’re almost certainly on Prefect 1.x. The recommended 3.x style is submit/list-comprehension as shown.
Docs for further reading
- Prefect docs home:
https://docs.prefect.io
- 3.x migration overview (if you’re coming from older mapping patterns):
https://docs-3.prefect.io/v3/how-to-guides/migrate/upgrade-to-prefect-3
If you confirm your Prefect version (and whether you want fail-fast vs “collect all errors then fail”), I can tighten this exactly to your setup.