Is there a way for a running task to know whether ...
# ask-community
b
Is there a way for a running task to know whether its upstream tasks succeeded or failed? The idea is I want a task whose trigger is
all_finished
but that does different things depending on whether it would have been triggered by
all_successful
or
some_failed
.
n
Hi @Ben Fogelson - you can supply a custom trigger function which passes in all the upstream states for your task. Your function should take the following signature:
Copy code
custom_trigger(upstream_states: Set[State]) -> bool
b
Thanks @nicholas. I’m not sure I completely follow. I see how the trigger knows the upstream states, but I don’t see how the task knows
I.e. if I used a custom trigger, I’d have something like this:
Copy code
@task(trigger=custom_trigger)
def my_task():
    do_something_based_on_return_value_of_custom_trigger()
Maybe a bit more motivation would be helpful. I’m essentially trying to make a try/except/finally block distributed over tasks. I think I can do this with the
resource_manager
decorator, if the cleanup task can do different things depending on the state of the tasks in the resource manager block
n
Ah sorry, I misunderstood what you were asking.
Hm, there are a few ways you could do this; one simple way to handle this (and fairly atomic) would be to use 3 downstream tasks instead of a try/catch/finally, something like this:
Copy code
from prefect.triggers import all_successful, some_failed, all_finished

@task(trigger=some_failed)
def failed_cleanup():
  # do something

@task(trigger=all_successful)
def successful_cleanup():
  # do something

@task(trigger=all_finished)
def finish_cleanup():
  # do something

with Flow("my flow") as flow:
  task_1 = task1()
  
  failed_cleanup()
  successful_cleanup()
  finish_cleanup()
(i didn't include anything re: resource manager for brevity)
b
Thanks @nicholas! Totally could do it like that. I thought it would be nice though to have the automatic downstream-cleanup-task-setting of the resource manager. I think I can see how to modify the resource manager and resource context themselves to achieve this, but was hoping that wouldn’t be necessary
z
Are you looking for this?
Copy code
import prefect

FAIL = True


@prefect.task()
def fail_task():
    if FAIL:
        raise ValueError("Hello")
    
    # otherwise
    return "Hello"


@prefect.task()
def succeed_task():
    return "world"
    
@prefect.task(log_stdout=True, trigger=prefect.triggers.all_finished)
def message(a, b):

    if isinstance(a, ValueError):
        print(f"Oh no! Upstream `a` failed with {a}")

    else:
        print(f"{a} {b}")


with prefect.Flow("test") as flow:
    a, b = fail_task(), succeed_task()
    message(a, b)


flow.run()
b
Not quite @Zanie. I’d like to be able to have arbitrary tasks upstream of the fail and succeed tasks, without having to pass them all in as explicit arguments to those tasks. I actually took a stab at modifying
resource_manager.py
to allow my desired behavior, and have a draft PR up. I was hoping someone from the prefect team could take a quick look and tell me if this would be a welcome feature before I invest the time to write tests, update docstrings, etc. CC @Jim Crist-Harif since you wrote the resource manager. https://github.com/PrefectHQ/prefect/pull/4305