Nicholas Torba
09/26/2023, 3:34 PMfrom prefect import flow, task
import random
from time import sleep, time
import boto3
@task
def print_nums(nums):
for n in nums:
print(n)
@task
def square_num(num):
ran_num = random.randint(1,10)
if ran_num < 5:
raise Exception("less than 5")
print(f"sleeping for **{ran_num}** seconds")
sleep(ran_num)
return num**2
@flow
def map_flow(nums):
print_nums(nums)
start1 = time()
squared_nums = square_num.map(nums)
# squared_nums = [square_num.submit(i) for i in nums]
start2 = time()
print(start2 - start1, " seconds for map to start all tasks")
print_nums(squared_nums)
end = time()
print(end - start2, " seconds for map to finish all tasks")
map_flow([1,2,3,5,8,13])
Without the exception, prefect handling this flow great and does the async handling of each of those PrefectFuture results for me.
Is there any arg I could pass that would allow me to continue my flow with the results that are successfully received from the square_num
task, while just logging a warning of the ones that failed?
I know I can custom handle this situation by gettin the results from each PrefectFuture manually, but this seems like a case you would have a built in solution forJake Kaplan
09/26/2023, 5:56 PMtask.map(..., return_state=True)
example:
from prefect import flow, task
from prefect.states import Completed, Failed
@task
def add_one(n: int):
if n == 3:
raise ValueError
return n + 1
@flow
def my_flow():
states = add_one.map([1, 2, 3], return_state=True)
results = [state.result() for state in states if not state.is_failed()]
if not results:
return Failed(message="All of the tasks failed")
for r in results:
print(r)
# manually return a state, otherwise flow fails
# because at least 1 task failed
return Completed(message="At least 1 task succeeded")
if __name__ == '__main__':
my_flow()