John Horn
06/07/2023, 5:18 PMprefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
Given that I should be waiting for other tasks to finish.
Full source code:
from typing import List
from prefect import task, flow, allow_failure
from prefect.task_runners import ConcurrentTaskRunner
from prefect.client.schemas import State
@task
def divide_by_zero(input: int):
return input / 0
@task
def return_input(input: int):
return input
@task
def process_futures(futures_list: List[State]):
for future in futures_list:
print(future.is_failed())
if future.is_failed() == False:
print(future.result())
@flow(task_runner=ConcurrentTaskRunner())
def test_wait_for():
failed_future = divide_by_zero.submit(1, return_state=True)
success_future = return_input.submit(1, return_state=True)
futures_list=[failed_future, success_future]
print(failed_future.is_failed())
print(success_future.is_failed())
process_futures(futures_list, wait_for=[allow_failure(failed_future), allow_failure(success_future)])
if __name__ == '__main__':
test_wait_for()
John Horn
06/07/2023, 5:48 PMfrom typing import List
from prefect import task, flow, allow_failure
from prefect.task_runners import ConcurrentTaskRunner
from prefect.client.schemas import State
@task
def divide_by_zero(input: int):
return input / 0
@task
def return_input(input: int):
return input
@task
def process_futures(futures_list: List[State]):
for future in futures_list:
print(future)
@flow(task_runner=ConcurrentTaskRunner())
def test_wait_for():
failed_future = divide_by_zero.submit(1, return_state=True)
success_future = return_input.submit(1, return_state=True)
futures_list=[failed_future, success_future]
# print(failed_future.is_failed())
# print(success_future.is_failed())
# process_futures(futures_list, wait_for=[allow_failure(failed_future), allow_failure(success_future)])
process_futures(allow_failure(futures_list), wait_for=[allow_failure(failed_future), allow_failure(success_future)])
if __name__ == '__main__':
test_wait_for()