Hi Community! Has anyone tried to handle an except...
# ask-community
c
Hi Community! Has anyone tried to handle an exception from a Future inside another task? I was trying to design a Map Reduce style workflow, where a Mapper spreads tasks and a Reducer collects them all together, and skips failed mappers, but this is the error I get
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
For who is more interested, this the code I am testing
Copy code
from random import randint

from prefect import task, flow

class DummyError(Exception):
    pass

@task
def say_hello():
    print("Hello, world!")


@task
def t1():
    print("t1")


@task
def t2():
    print("t2")


@task
def t3():
    raise DummyError("t3")


@task
def mapper():
    task_id = randint(1, 2)
    task_instance = globals()[f"t{task_id}"]  # dynamically select a task
    return [
        task_instance.submit(),
        t3.submit()
    ]


@task
def reducer(futures):
    for future in futures:
        try:
            future.result()
        except Exception as e:
            print(f"Error: {e}")


@flow
def myflow():
    say_hello()
    futures = mapper()
    reducer(futures)
    say_hello()
    return "OK"


if __name__ == "__main__":
    result = myflow()
n
hey @Christian Sicari - have you tried using the native
.map
method on tasks? where did it come short for you here?
c
@Nate thank you to ask. map doesn't fit my use case. In the code I simplified my use case, but the idea is that I trigger in parallel different kind of tasks with the same input
n
how about this?
Copy code
from random import randint

from prefect import Task, flow, task
from prefect.futures import resolve_futures_to_states
from prefect.states import State


class DummyError(Exception):
    pass


@task
def say_hello():
    print("Hello, world!")


@task
def t1(params: dict | None = None):
    print("t1")
    return "Result from t1"


@task
def t2(params: dict | None = None):
    print("t2")
    return "Result from t2"


@task
def t3(params: dict | None = None):
    raise DummyError("t3")


@task
def mapper(tasks: list[Task], params: dict | None = None):
    selected_task: Task = tasks[randint(0, len(tasks) - 1)]
    return [selected_task.submit(params), t3.submit(params)]


@flow
def myflow():
    say_hello()
    futures = mapper([t1, t2, t3], params=dict(foo="bar"))
    states: list[State] = resolve_futures_to_states(futures)
    print(f"Resulting states: {states}")
    results = [state.result(raise_on_failure=False) for state in states]
    print(f"Results: {results}")

    say_hello()
    return "OK"


if __name__ == "__main__":
    result = myflow()
c
This is a nice work around, but actually it the same of not using a task reducer but just a a function reducer. I don't understand if you can't just handle futures in a tasks or it is a bug