For Resource Managers, is there some way for the `...
# prefect-community
s
For Resource Managers, is there some way for the
cleanup()
method to detect if the tasks in the context succeeded or failed? I see the comment
A Task is automatically added to call the cleanup method (closing the Dask cluster) after all tasks under the context have completed.
in the documentation, but can't see a way to access that created
Task
. (I have a situation in which it would be convenient to execute only part of my
cleanup
task when something in that context fails.)
j
Not builtin, but you could do this with a custom trigger. You could configure this in the
resource_manager
decorator using
cleanup_task_kwargs={"trigger": your_trigger}
.
We don't seem to have a good docs page on custom triggers, let me see if I can find a better resource.
Sorry, rereading your question. IIUC you want the cleanup task to run in all cases, but want to detect if a specific upstream task fails and do something different in the cleanup based on that?
s
Yes exactly!
I had found the part with the
cleanup_task_kwargs
but it seemed one-or-the other (I know my case is a bit odd)
Allowing it to run via the trigger is the first step to be sure, but within
cleanup
I'd like to know if those context tasks all succeeded or (if one or more) failed.
j
Yeah, triggers are mainly for determining if a task should run at all. With the current design of resource managers, I don't think we can satisfy your use case.
One sec, I might have a hack, need to pull up the code.
🚀 1
Ok, here's a hack. I don't advise it though, but it'll work. You can add a custom trigger that stashes the upstream states for the cleanup task in the context, then the cleanup task can access those later on to make decisions. We don't explicitly support this, so it might break in the future, but I also don't expect this part of the codebase to change soon so you may be fine. Example:
Copy code
import prefect
from prefect import Flow, task, resource_manager
from prefect.engine import signals
from prefect.tasks.core.resource_manager import resource_cleanup_trigger


def my_fancy_trigger(upstream_states):
    if resource_cleanup_trigger(upstream_states):
        prefect.context.update(upstream_states=upstream_states)
        return True
    return False


@resource_manager(
    cleanup_task_kwargs={"trigger": my_fancy_trigger}
)
class MyResource:
    def setup(self):
        return 1

    def cleanup(self, resource):
        print(prefect.context.upstream_states)


@task
def inc(x):
    return x + 1


@task
def fail():
    raise ValueError


with Flow("example") as flow:
    with MyResource() as x:
        y = inc(x)
        inc(y)
        fail()

flow.run()
Note that the cleanup task doesn't have every task in the context set as an upstream state, only ones that don't have immediate downstream edges also in the context (the downstream edges are there to prevent the cleanup from running before all tasks in the context have completed, but we only add edges if we need to).
You might be better served by creating the pattern you want explicitly, and passing in the tasks you care about as direct arguments. It won't give you the same api, but it might be cleaner than hacking around the design limitations here.
s
Ok so first thing, that's genius 😉
But I hear you re: using design patterns that aren't intentional. I think I'll stick with a more explicit design as you suggest. Thank you for the effort and insight!
👍 2
j
Glad to be helpful!
upvote 3