Hi! I think I found a bug (it happens since 2.6.1 version), when a flow has consecutive
.map
calls, if one task fails in the first map, then all dependent tasks stay in NotReady state. Example flow in the comments
✅ 1
Marcos
10/16/2022, 12:51 AM
Copy code
@task
def get_list():
return [1, 2, 3]
@task
def task1(i):
if i == 1:
raise ValueError
return i
@task
def task2(i):
return i
@task
def final_task(i_list):
logger = get_run_logger()
<http://logger.info|logger.info>(i_list)
@flow
def failing_flow():
result = get_list.submit()
result = task1.map(result)
result = task2.map(result)
result = task1.map(result)
final_task.submit(result)
🙏 1
Marcos
10/16/2022, 12:53 AM
And this is radar view and tasks states
a
Anna Geller
10/16/2022, 1:04 PM
Afaik, this is intentional to ensure that we don't proceed with the next tasks if the first one fails. Starting from 2.6 you can modify that behavior by wrapping the future using
allow_failure
annotation
Anna Geller
10/16/2022, 1:17 PM
great MRE btw! You might be right, this works with a for loop, but not sure if this works with mapping:
Copy code
from prefect import task, flow, allow_failure, get_run_logger
@task
def get_list():
return [1, 2, 3]
@task
def task1(i):
if i == 1:
raise ValueError
return i
@task
def task2(i):
return i
@task
def final_task(i_list):
logger = get_run_logger()
<http://logger.info|logger.info>(i_list)
@flow
def failing_flow():
numbers = get_list.submit()
for i in numbers.result():
flaky_result = task1.submit(i)
flaky_result = task2.submit(allow_failure(flaky_result))
final_task.submit(allow_failure(flaky_result))
if __name__ == "__main__":
failing_flow()
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.