Patrick Mitchell
07/07/2025, 5:58 PMBrendan Dalpe
07/07/2025, 6:31 PMNate
07/07/2025, 6:43 PMĀ» ipython
#[1]
from prefect import flow, task
#[2]
@task
def hates_2(x):
if x == 2: raise
#[3]
@flow
def f():
try:
hates_2.map(range(10)).result()
except Exception:
print("calling .result() unwraps any futures' exceptions")
states = hates_2.map(range(10), return_state=True) # return states you can inspect
for state in states:
if state.is_failed():
print("dont raise")
else:
print("looking good!") # states have a .result() method you can use to unwrap it when desired
#[4]
f()
docsPatrick Mitchell
07/07/2025, 7:05 PMfutures: PrefectFutureList[MyResult] = myfunc.map(items)
wait(futures)
results: list[MyResult] = []
for future in futures:
if future.state.types == StateType.COMPLETED:
results.append(future.result())
else:
print("a message")
... proceed with rest
Following the example when one distributed task crashes, seemingly nothing after wait
occurs, with log_prints=True
I never see any logs, and the rest of the processing of successful results never occurs.
here's a zoomed out example of the flow still running despite all tasks "completing" in either a failed or successful state
My other issue with this example is that future.staste
is noted as deprecated, but seemingly still in much of the doco or Marvin's (typically incorrect) responses.Patrick Mitchell
07/07/2025, 7:07 PMPatrick Mitchell
07/07/2025, 7:09 PMNate
07/07/2025, 7:17 PMPatrick Mitchell
07/07/2025, 8:08 PMPatrick Mitchell
07/07/2025, 8:17 PMBrendan Dalpe
07/07/2025, 8:22 PM`CRASHED`: a run in anystate was interrupted by an OS signal such as aCRASHED
orKeyboardInterrupt
SIGTERM
Patrick Mitchell
07/07/2025, 8:28 PMfrom __future__ import annotations
import signal
from prefect import flow, task
from prefect.states import State, StateType
from prefect_dask import DaskTaskRunner
from prefect.client.schemas.objects import TERMINAL_STATES
@flow(log_prints=True, task_runner=DaskTaskRunner())
def example():
"""An example flow that demonstrates parallel task execution with error handling."""
task_data: list[int] = [1, 2, 3, 4, 5, 6, 7]
wait_for: int = len(task_data)
states: list[State[int]] = distribute.map(task_data, return_state=True)
results: list[int] = []
failed: list[State[int]] = []
while (len(results) + len(failed)) < wait_for:
for state in states:
if state.type in TERMINAL_STATES:
if state.type == StateType.COMPLETED:
results.append(state.result())
print(f"Added result to list, current results: {len(results)}")
else:
failed.append(state)
print(f"Task failed or crashed, current failed: {len(failed)}")
states.remove(state)
for int_result in results:
print(f"Received integer: {int_result}")
@task(log_prints=True)
def distribute(int: int) -> int:
"""A simple task that returns the input integer."""
print(f"Distributing integer: {int}")
if int == 5:
# Failures seem to be handled well
raise ValueError("Intentional error for testing purposes")
if int == 6:
# Trying to simulate a crash for integer 6, but can't get a CRASHED state to occur
print("Simulating crash for integer 6")
signal.raise_signal(signal.SIGINT) # - results in a perpetually Pending task
# os.kill(os.getpid(), signal.SIGTERM) # - results in a perpetually Pending task
# os._exit(1) # - also a perpetually Pending task
# os.abort() # - also a perpetually Pending task
return int