https://prefect.io logo
Title
d

David Salgado

04/12/2023, 6:51 AM
I have a prefect (version 2.10.3) flow structured like the following example, where the
cleanup
task must happen after all the
my_task
tasks have completed. To make that happen, I'm passing the results of the mapped
my_task
tasks, even though
cleanup
doesn't do anything with them.
@flow
def my_flow():
    results = my_task.map([1,2,3])
    cleanup(results)

@task
def my_task(x, dummy):
    # ...do whatever
    return value

@task
def cleanup(dummy):
    # ...do something essential (without using `dummy`)
Sometimes, one of the
my_task
tasks fails, and the
cleanup
task stays stuck in
NotReady
state, according to the Prefect Cloud web UI. Is this expected behaviour? If so, what's the recommended way to ensure that
cleanup
always runs after all the
my_task
tasks have finished, whether they succeeded or not?
l

Louis Vines

04/12/2023, 8:48 AM
Hi David 👋 I think a I have an answer for this one. Have you found the concept of Prefect Futures? https://docs.prefect.io/latest/api-ref/prefect/futures/#prefect.futures.PrefectFuture.wait this should give you the opportunity to wait until all previous tasks have reached a terminal state and you can then do your final task.
r

Ryan Peden

04/14/2023, 11:59 PM
David, that's expected behavior because by default, Prefect assumes a task that accepts other tasks' output shouldn't run if any upstream tasks fail. But sometimes (as in your case), you want to run the task anyway. Fortunately, that's what
allow_failure
is for. In your example code, it would look like:
from prefect import allow_failure, flow, task

@flow
def my_flow():
    results = my_task.map([1,2,3])
    cleanup([allow_failure(r) for r in results])

...the rest of your code
and if the cleanup tasks just needs to wait for the task runs to finish but doesn't actually use their results, you could instead do:
cleanup(wait_for=[allow_failure(r) for r in results])
d

David Salgado

04/17/2023, 3:14 AM
Got it. Many thanks.
👍 1