David Salgado
04/12/2023, 6:51 AMcleanup
task must happen after all the my_task
tasks have completed. To make that happen, I'm passing the results of the mapped my_task
tasks, even though cleanup
doesn't do anything with them.
@flow
def my_flow():
results = my_task.map([1,2,3])
cleanup(results)
@task
def my_task(x, dummy):
# ...do whatever
return value
@task
def cleanup(dummy):
# ...do something essential (without using `dummy`)
Sometimes, one of the my_task
tasks fails, and the cleanup
task stays stuck in NotReady
state, according to the Prefect Cloud web UI.
Is this expected behaviour? If so, what's the recommended way to ensure that cleanup
always runs after all the my_task
tasks have finished, whether they succeeded or not?Louis Vines
04/12/2023, 8:48 AMRyan Peden
04/14/2023, 11:59 PMallow_failure
is for. In your example code, it would look like:
from prefect import allow_failure, flow, task
@flow
def my_flow():
results = my_task.map([1,2,3])
cleanup([allow_failure(r) for r in results])
...the rest of your code
and if the cleanup tasks just needs to wait for the task runs to finish but doesn't actually use their results, you could instead do:
cleanup(wait_for=[allow_failure(r) for r in results])
David Salgado
04/17/2023, 3:14 AM