Tom Han
09/04/2025, 5:21 PMwith tags('video-ingestion'):
for vid in vids:
future = ingest_one_video.submit(vid)
processing.add(future)
if len(processing) > concurrency_lim:
for done in as_completed(list(processing)):
processing.remove(done)
result = done.result() # Get the actual result
if result is not None: # Only add non-None results
results.append(result)
completed += 1
break
for done in as_completed(list(processing)):
processing.remove(done)
result = done.result() # Get the actual result
if result is not None: # Only add non-None results
results.append(result)
completed += 1
How do I make it such that when one of the task failed/crashed, we just skip it instead of making the entire flow crash?Marvin
09/04/2025, 5:21 PMTom Han
09/04/2025, 5:22 PMNate
09/04/2025, 5:22 PMTom Han
09/04/2025, 5:23 PMMarvin
09/04/2025, 5:25 PMfuture.result()
on a failed/crashed task raises an exception and will bubble up unless you catch it. To “skip” failed tasks and keep the flow running, either:
A) Catch exceptions around result()
, or
B) Don’t raise in the first place and check the task’s state
Here are two concise patterns you can use.
1) Easiest: try/except around result()
from prefect import get_run_logger
from prefect.futures import as_completed
logger = get_run_logger()
with tags('video-ingestion'):
for vid in vids:
future = ingest_one_video.submit(vid)
processing.add(future)
if len(processing) > concurrency_lim:
for done in as_completed(list(processing)):
processing.remove(done)
try:
result = done.result() # may raise if the task failed
except Exception as exc:
logger.warning(f"Skipping failed/crashed task: {exc}")
completed += 1
break
else:
if result is not None:
results.append(result)
completed += 1
break
for done in as_completed(list(processing)):
processing.remove(done)
try:
result = done.result()
except Exception as exc:
logger.warning(f"Skipping failed/crashed task: {exc}")
else:
if result is not None:
results.append(result)
completed += 1
2) Don’t raise; check state first
from prefect import get_run_logger
from prefect.futures import as_completed
from prefect.schemas import StateType
logger = get_run_logger()
with tags('video-ingestion'):
for vid in vids:
future = ingest_one_video.submit(vid)
processing.add(future)
if len(processing) > concurrency_lim:
for done in as_completed(list(processing)):
processing.remove(done)
if done.state.type in {StateType.FAILED, StateType.CRASHED, StateType.CANCELLED}:
logger.warning(f"Skipping task due to state={done.state.type}: {done.state.message}")
completed += 1
break
result = done.result() # safe because state is completed
if result is not None:
results.append(result)
completed += 1
break
for done in as_completed(list(processing)):
processing.remove(done)
if done.state.type in {StateType.FAILED, StateType.CRASHED, StateType.CANCELLED}:
logger.warning(f"Skipping task due to state={done.state.type}: {done.state.message}")
else:
result = done.result()
if result is not None:
results.append(result)
completed += 1
Notes:
- If you prefer not to raise at all, you can also do done.result(raise_on_failure=False)
and then branch on done.state
as shown above.
- Your flow will stay “Completed” as long as you don’t let exceptions escape the flow function; failed tasks will still be recorded as failed task runs.
- Consider adding retries to the task to reduce transient failures, e.g. @task(retries=3, retry_delay_seconds=10)
.
Reference: Prefect futures/state behavior is implemented in the library; see source for result semantics and state handling in Prefect source.Tom Han
09/04/2025, 5:45 PMfrom prefect.states import StateType
or
from prefect.client.schemas import StateType
Maybe some additional prompt engineering would help?Nate
09/04/2025, 5:46 PM