David Salgado
04/12/2023, 6:51 AMcleanup task must happen after all the my_task tasks have completed. To make that happen, I'm passing the results of the mapped my_task tasks, even though cleanup doesn't do anything with them.
@flow
def my_flow():
results = my_task.map([1,2,3])
cleanup(results)
@task
def my_task(x, dummy):
# ...do whatever
return value
@task
def cleanup(dummy):
# ...do something essential (without using `dummy`)
Sometimes, one of the my_task tasks fails, and the cleanup task stays stuck in NotReady state, according to the Prefect Cloud web UI.
Is this expected behaviour? If so, what's the recommended way to ensure that cleanup always runs after all the my_task tasks have finished, whether they succeeded or not?Louis Vines
04/12/2023, 8:48 AMRyan Peden
04/14/2023, 11:59 PMallow_failure is for. In your example code, it would look like:
from prefect import allow_failure, flow, task
@flow
def my_flow():
results = my_task.map([1,2,3])
cleanup([allow_failure(r) for r in results])
...the rest of your code
and if the cleanup tasks just needs to wait for the task runs to finish but doesn't actually use their results, you could instead do:
cleanup(wait_for=[allow_failure(r) for r in results])David Salgado
04/17/2023, 3:14 AM