Hi Prefect community, I have a question about hand...
# ask-community
n
Hi Prefect community, I have a question about handling failed tasks in a flow. Here is an example:
Copy code
from prefect import flow, task

import random
from time import sleep, time

import boto3

@task
def print_nums(nums):
    for n in nums:
        print(n)

@task
def square_num(num):
    ran_num = random.randint(1,10)
    if ran_num < 5: 
        raise Exception("less than 5")
    print(f"sleeping for **{ran_num}** seconds")
    sleep(ran_num)
    return num**2

@flow
def map_flow(nums):
    print_nums(nums)
    start1 = time()
    squared_nums = square_num.map(nums) 
    # squared_nums = [square_num.submit(i) for i in nums]
    start2 = time()
    print(start2 - start1, " seconds for map to start all tasks")
    print_nums(squared_nums)
    end = time()
    print(end - start2, " seconds for map to finish all tasks")

map_flow([1,2,3,5,8,13])
Without the exception, prefect handling this flow great and does the async handling of each of those PrefectFuture results for me. Is there any arg I could pass that would allow me to continue my flow with the results that are successfully received from the
square_num
task, while just logging a warning of the ones that failed? I know I can custom handle this situation by gettin the results from each PrefectFuture manually, but this seems like a case you would have a built in solution for
j
hey, I believe you're looking for
task.map(..., return_state=True)
example:
Copy code
from prefect import flow, task
from prefect.states import Completed, Failed

@task
def add_one(n: int):
    if n == 3:
        raise ValueError
    return n + 1

@flow
def my_flow():
    states = add_one.map([1, 2, 3], return_state=True)
    results = [state.result() for state in states if not state.is_failed()]

    if not results:
        return Failed(message="All of the tasks failed")

    for r in results:
        print(r)

    # manually return a state, otherwise flow fails
    # because at least 1 task failed
    return Completed(message="At least 1 task succeeded")


if __name__ == '__main__':
    my_flow()