Hi All ... prefect newbie here. Sorry if the answe...
# prefect-community
g
Hi All ... prefect newbie here. Sorry if the answer to my question is obvious and somewhere. But I couldn't find it. I am struggling to figure out how to access the state of a task (task a) within another task (task b). I want task b to always run whether or not task a succeeds or fails. I want task b to do-stuff if task A fails, or skip if task A succeeds.  Sort of like this
Copy code
@task 
def a ():
   < do something that might succeed or fail >
@task (trigger = always_run)
def b():
  < if task a status==FAIL .. do something >
  < if task b status=SUCCESS .. skip >    

with Flow as flow:
    res1 = a()
    b(res1)
flow.run()
This is part of a larger flow with other tasks ahead and after a&b. I tried
trigger=any_failed
on task b, but that causes task b to fail if task a succeeds (due to trigger not satisfied) .. which is not what I want. I want task b to always show success. Again .. sorry if this is obvious and its just a newbie-thing. Feel free to just point me to the right place in the doc. Thanks in advance
z
Hi @goodsonr, welcome to the Prefect club! There are a couple of different ways to achieve what you're aiming for: • if what you want to do is just take an action to handle Task A's transition to a failed state, something like a state handler would do the trick here. • if you want to treat Task B as its own unit of work, things become a bit more complex. You'd likely need to create a custom trigger where if the upstream succeeds or skips, you raise a skip signal. If the upstream fails, go ahead.
g
more interested in option 2. Was hoping there was some way to detect the state of task A within Task B and raise the signal.SKIP within B .. as opposed to a developing a new/custom trigger. If not, oh well... Thanks for the response
z
Unfortunately, I don't think there's a plug-and-play solution using just Prefect Core at the moment. If you want to use an orchestration layer, you could make a GraphQL call in Task B to achieve the same results. Otherwise a custom trigger will be your best bet.
j
@goodsonr I think @Zachary Hughes’s suggestion of either querying the API for the upstream state or using a trigger to handle logic is probably the “correct” way in Prefect. Tasks use triggers to decide how to react to upstream states, and then use their logic to work with the results of those upstream states. I think your use case — take an action if upstream fails and skip otherwise — is actually common enough that we may introduce built-in triggers to handle it. However, here’s what a custom trigger would look like, if it’s helpful:
Copy code
import prefect
from prefect import task, Flow, Parameter

def run_on_failed_or_skip_trigger(upstream_states):
    a_state = list(upstream_states.values())[0]
    if a_state.is_failed():
        return True
    else:
        raise prefect.engine.signals.SKIP()

@task
def a(outcome):
    if outcome == 'succeed':
        return True
    else:
        raise ValueError(outcome)
    
@task(trigger=run_on_failed_or_skip_trigger)
def b(a):
    print(a)
    print("Running B!")

with Flow("example") as flow:
    outcome = Parameter('outcome')
    a_result = a(outcome)
    b_result = b(a_result)
The custom trigger pulls `a`’s state out of the upstream states dictionary and proceeds (returns True) if it’s failed, or raises a SKIP signal otherwise (which I think is your desired outcome, modify if not!)
The reason I wanted to pull this code though is that it illustrates something (advanced) about Prefect — even failed states have
results
that are passed to downstream tasks. By default, I think it’s the contents of the error, but it can be arbitrary if you use a Prefect signal. So if you examine the
print(a)
statement inside
b
, you’ll see that we can examine the failed “result” of
a
. This is another way that you could build your logic, though I confess it didn’t occur to me until I was putting this together — B can have an
always_run
trigger and examine the returned value of A, and take action appropriately
g
wow... thanks very much for that. most helpful in my understanding ... which is always a good start! I'll see if I can make-it-work. tks again
j
👌 I may have slightly over-complicated things versus your actual use case but hopefully this gives you a jumping-off point for epxloring!
g
@Jeremiah - thanks again. This works .. almost. I'm finding that
list(upstream_states.values())
is not always coming back in the same order over multiple runs, so that the most recent task is not always first in the list. Perhaps I'm still missing something... or are these values coming from a
dict
edit -> adding to my own comment .. in my test script I am passing more than just
a
to
def b(a)
. I am also passing a couple of parameters .. and i can see these are causing the ordering-grief. I can work around that so all is AOK for now.
j
Got it - yes my example only shows this working with one upstream task. However,
upstream_states
is a dictionary keyed by
{Edge: State}
, so if you look at key edge’s
upstream_task
you can see which task you’re working with. For example, in my code, I could have asserted
list(upstream_states)[0].upstream_task is a_result
- in other words, the first key of the
upstream_states
dict is an edge whose upstream_task was
a_result
. You could also create logic based on task name, slug, or any other attribute