goodsonr
06/10/2020, 8:22 PM@task
def a ():
< do something that might succeed or fail >
@task (trigger = always_run)
def b():
< if task a status==FAIL .. do something >
< if task b status=SUCCESS .. skip >
with Flow as flow:
res1 = a()
b(res1)
flow.run()
This is part of a larger flow with other tasks ahead and after a&b. I tried trigger=any_failed
on task b, but that causes task b to fail if task a succeeds (due to trigger not satisfied) .. which is not what I want. I want task b to always show success. Again .. sorry if this is obvious and its just a newbie-thing. Feel free to just point me to the right place in the doc. Thanks in advanceZachary Hughes
06/10/2020, 8:41 PMgoodsonr
06/10/2020, 8:50 PMZachary Hughes
06/10/2020, 9:16 PMJeremiah
06/10/2020, 11:21 PMimport prefect
from prefect import task, Flow, Parameter
def run_on_failed_or_skip_trigger(upstream_states):
a_state = list(upstream_states.values())[0]
if a_state.is_failed():
return True
else:
raise prefect.engine.signals.SKIP()
@task
def a(outcome):
if outcome == 'succeed':
return True
else:
raise ValueError(outcome)
@task(trigger=run_on_failed_or_skip_trigger)
def b(a):
print(a)
print("Running B!")
with Flow("example") as flow:
outcome = Parameter('outcome')
a_result = a(outcome)
b_result = b(a_result)
results
that are passed to downstream tasks. By default, I think it’s the contents of the error, but it can be arbitrary if you use a Prefect signal. So if you examine the print(a)
statement inside b
, you’ll see that we can examine the failed “result” of a
. This is another way that you could build your logic, though I confess it didn’t occur to me until I was putting this together — B can have an always_run
trigger and examine the returned value of A, and take action appropriatelygoodsonr
06/11/2020, 12:22 AMJeremiah
06/11/2020, 12:54 AMgoodsonr
06/11/2020, 3:41 AMlist(upstream_states.values())
is not always coming back in the same order over multiple runs, so that the most recent task is not always first in the list. Perhaps I'm still missing something... or are these values coming from a dict
edit -> adding to my own comment .. in my test script I am passing more than just a
to def b(a)
. I am also passing a couple of parameters .. and i can see these are causing the ordering-grief. I can work around that so all is AOK for now.Jeremiah
06/11/2020, 11:20 AMupstream_states
is a dictionary keyed by {Edge: State}
, so if you look at key edge’s upstream_task
you can see which task you’re working with. For example, in my code, I could have asserted list(upstream_states)[0].upstream_task is a_result
- in other words, the first key of the upstream_states
dict is an edge whose upstream_task was a_result
. You could also create logic based on task name, slug, or any other attribute