Hi folks, I might just be looking in all the wrong...
# ask-community
p
Hi folks, I might just be looking in all the wrong places, but I cannot seem to find any valid documentation about handling failures of prefect futures. When building a prefect futures list, or a primitive list of prefect futures, and waiting to handle the results of these futures, if any future fails the parent flow will hang forever and never terminate - seemingly treating failed or crashed tasks as incomplete. Is there any documentation or example that shows how to correctly handle a failed/crashed task so that the flow may terminate (or continue, while handling failed tasks) EDIT: not FAILED, only CRASHED
b
šŸ‘€ 1
n
hi @Patrick Mitchell • do you have a self-contained example that's giving unexpected behavior? • what version are you using? > if any future fails the parent flow will hang forever and never terminate in general this should not be true
Copy code
Ā» ipython

#[1]
from prefect import flow, task

#[2]
@task
def hates_2(x):
    if x == 2: raise

#[3]
@flow
def f():
    try:
      hates_2.map(range(10)).result()
    except Exception:
        print("calling .result() unwraps any futures' exceptions")

    states = hates_2.map(range(10), return_state=True) # return states you can inspect
    for state in states:
        if state.is_failed():
            print("dont raise")
        else:
            print("looking good!") # states have a .result() method you can use to unwrap it when desired
#[4]
f()
docs
p
Copy code
futures: PrefectFutureList[MyResult] = myfunc.map(items)
wait(futures)

results: list[MyResult] = []
for future in futures:
  if future.state.types == StateType.COMPLETED:
    results.append(future.result())
  else:
    print("a message")

... proceed with rest
Following the example when one distributed task crashes, seemingly nothing after
wait
occurs, with
log_prints=True
I never see any logs, and the rest of the processing of successful results never occurs. here's a zoomed out example of the flow still running despite all tasks "completing" in either a failed or successful state My other issue with this example is that
future.staste
is noted as deprecated, but seemingly still in much of the doco or Marvin's (typically incorrect) responses.
Thanks for the response @Nate - i think i've tried with this pattern previously, but I'll give it another test and report back šŸ™
Using 3.4.7
n
yea if you have a self-contained example I'd be happy to take a look
p
Struggling a little to produce a novel example that forces a crash that is interpreted as a Crash (not failure) by prefect šŸ˜“
I'd like to correct my original post -this seems to operate properly when reporting FAILED states, but not Crashed.
b
https://docs.prefect.io/v3/concepts/states
`CRASHED`: a run in any
CRASHED
state was interrupted by an OS signal such as a
KeyboardInterrupt
or
SIGTERM
p
This is what I'm working with right now, but attempts to create a crashed state just result in a task that's always Pending
Copy code
from __future__ import annotations

import signal

from prefect import flow, task
from prefect.states import State, StateType
from prefect_dask import DaskTaskRunner
from prefect.client.schemas.objects import TERMINAL_STATES


@flow(log_prints=True, task_runner=DaskTaskRunner())
def example():
    """An example flow that demonstrates parallel task execution with error handling."""
    task_data: list[int] = [1, 2, 3, 4, 5, 6, 7]
    wait_for: int = len(task_data)

    states: list[State[int]] = distribute.map(task_data, return_state=True)
    results: list[int] = []
    failed: list[State[int]] = []

    while (len(results) + len(failed)) < wait_for: 
        for state in states:
            if state.type in TERMINAL_STATES:
                if state.type == StateType.COMPLETED:
                    results.append(state.result())
                    print(f"Added result to list, current results: {len(results)}")
                else:
                    failed.append(state)
                    print(f"Task failed or crashed, current failed: {len(failed)}")

                states.remove(state)

    for int_result in results:
        print(f"Received integer: {int_result}")

@task(log_prints=True)
def distribute(int: int) -> int:
    """A simple task that returns the input integer."""

    print(f"Distributing integer: {int}")
    if int == 5:
        # Failures seem to be handled well 
        raise ValueError("Intentional error for testing purposes")

    if int == 6:
        # Trying to simulate a crash for integer 6, but can't get a CRASHED state to occur
        print("Simulating crash for integer 6") 
        signal.raise_signal(signal.SIGINT)  # - results in a perpetually Pending task
        # os.kill(os.getpid(), signal.SIGTERM)  # - results in a perpetually Pending task
        # os._exit(1)  # - also a perpetually Pending task
        # os.abort()  # - also a perpetually Pending task
    
    return int