<@ULVA73B9P> how to make the flow to fail if at le...
# ask-marvin
f
@Marvin how to make the flow to fail if at least one of its mapped tasks fails: clone_drp_pilot_extract.map(unmapped(sf_sysadmin_conn_params), unmapped(sql_path), table_list)
m
thought for 166.3 seconds
Quick clarifier before I get too clever: which Prefect version are you on? That .map(unmapped(...)) syntax looks like Prefect 1.x. In Prefect 3.x, you typically fan out with submit/list comprehensions instead. I’ll assume Prefect 3.x below; if you’re on 1.x, say so and I’ll tailor it. Prefect 3.x: fail the flow if any “mapped” task fails Pattern: submit all the tasks, then either - fail fast by calling .result() on each future (re-raises the task’s exception, which fails the flow), or - wait for all to finish, then raise if any failed (use states utilities to re-raise). Example (fail fast): from prefect import flow, task @task def clone_drp_pilot_extract(conn, sql_path, table): # do work, raise if it fails ... @flow def run_all(sf_sysadmin_conn_params, sql_path, table_list): futures = [ clone_drp_pilot_extract.submit(sf_sysadmin_conn_params, sql_path, t) for t in table_list ] # Re-raises the original exception for any failed task, which fails the flow for f in futures: f.result() Example (wait for all, then raise with a summary): from prefect import flow, task from prefect.futures import resolve_futures_to_states from prefect.states import get_state_result @task def clone_drp_pilot_extract(conn, sql_path, table): ... @flow def run_all(sf_sysadmin_conn_params, sql_path, table_list): futures = [ clone_drp_pilot_extract.submit(sf_sysadmin_conn_params, sql_path, t) for t in table_list ] states = resolve_futures_to_states(futures) errors = [] for t, s in zip(table_list, states): try: # Will raise if the task failed get_state_result(s, raise_on_failure=True) except Exception as exc: errors.append((t, exc)) if errors: raise RuntimeError( f"{len(errors)} tables failed: " + ", ".join(f"{t}: {e}" for t, e in errors) ) Notes - In Prefect 3.x, a flow won’t automatically fail just because a child task failed unless you surface that failure (e.g., by calling result() or re-raising from the state). The snippets above do exactly that. - If you truly want to keep using .map/unmapped, you’re almost certainly on Prefect 1.x. The recommended 3.x style is submit/list-comprehension as shown. Docs for further reading - Prefect docs home: https://docs.prefect.io - 3.x migration overview (if you’re coming from older mapping patterns): https://docs-3.prefect.io/v3/how-to-guides/migrate/upgrade-to-prefect-3 If you confirm your Prefect version (and whether you want fail-fast vs “collect all errors then fail”), I can tighten this exactly to your setup.