Hi there. I want to have a task that processes ta...
# ask-community
j
Hi there. I want to have a task that processes task states. Not sure why I'm getting this error:
Copy code
prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
Given that I should be waiting for other tasks to finish. Full source code:
Copy code
from typing import List

from prefect import task, flow, allow_failure
from prefect.task_runners import ConcurrentTaskRunner
from prefect.client.schemas import State


@task
def divide_by_zero(input: int):
    return input / 0


@task
def return_input(input: int):
    return input


@task
def process_futures(futures_list: List[State]):
    for future in futures_list:
        print(future.is_failed())

        if future.is_failed() == False:
            print(future.result())


@flow(task_runner=ConcurrentTaskRunner())
def test_wait_for():

    failed_future = divide_by_zero.submit(1, return_state=True)
    success_future = return_input.submit(1, return_state=True)

    futures_list=[failed_future, success_future]

    print(failed_future.is_failed())
    print(success_future.is_failed())

    process_futures(futures_list, wait_for=[allow_failure(failed_future), allow_failure(success_future)])

if __name__ == '__main__':
    test_wait_for()
so using an allow_failure[] on the input itself helped me at least get further. However the list[State] for the last task is not longer a list of states but a list of the results:
Copy code
from typing import List

from prefect import task, flow, allow_failure
from prefect.task_runners import ConcurrentTaskRunner
from prefect.client.schemas import State


@task
def divide_by_zero(input: int):
    return input / 0


@task
def return_input(input: int):
    return input


@task
def process_futures(futures_list: List[State]):
    for future in futures_list:
        print(future)

@flow(task_runner=ConcurrentTaskRunner())
def test_wait_for():

    failed_future = divide_by_zero.submit(1, return_state=True)
    success_future = return_input.submit(1, return_state=True)

    futures_list=[failed_future, success_future]

    # print(failed_future.is_failed())
    # print(success_future.is_failed())

    # process_futures(futures_list, wait_for=[allow_failure(failed_future), allow_failure(success_future)])
    process_futures(allow_failure(futures_list), wait_for=[allow_failure(failed_future), allow_failure(success_future)])

if __name__ == '__main__':
    test_wait_for()