Marcos
10/16/2022, 12:51 AM.map
calls, if one task fails in the first map, then all dependent tasks stay in NotReady state. Example flow in the commentsMarcos
10/16/2022, 12:51 AM@task
def get_list():
return [1, 2, 3]
@task
def task1(i):
if i == 1:
raise ValueError
return i
@task
def task2(i):
return i
@task
def final_task(i_list):
logger = get_run_logger()
<http://logger.info|logger.info>(i_list)
@flow
def failing_flow():
result = get_list.submit()
result = task1.map(result)
result = task2.map(result)
result = task1.map(result)
final_task.submit(result)
Marcos
10/16/2022, 12:53 AMAnna Geller
allow_failure
annotationAnna Geller
from prefect import task, flow, allow_failure, get_run_logger
@task
def get_list():
return [1, 2, 3]
@task
def task1(i):
if i == 1:
raise ValueError
return i
@task
def task2(i):
return i
@task
def final_task(i_list):
logger = get_run_logger()
<http://logger.info|logger.info>(i_list)
@flow
def failing_flow():
numbers = get_list.submit()
for i in numbers.result():
flaky_result = task1.submit(i)
flaky_result = task2.submit(allow_failure(flaky_result))
final_task.submit(allow_failure(flaky_result))
if __name__ == "__main__":
failing_flow()
Anna Geller
Anna Geller
Marcos
10/16/2022, 1:31 PM