https://prefect.io logo
Title
m

Mansour Zayer

07/28/2022, 3:28 PM
Hello. In Prefect 1, is there an easy way to make a task ONLY dependent on the exactly 1 upstream task, not all upstream ones? What I mean is
A -> B (trigger: A finished) -> C (trigger: B successful)
I want
C
to ignore what happens to
A
, but only run if
B
is successful.
B
will run after
A
is finished (no matter fail or success) I looked into state and state_handlers, seemed too complicated for such a (seemingly) simple task. edit: In other words, I don't want the trigger function to aggregate ALL the upstream tasks. Just consider the task that I've explicitly set as upstream Thanks
1
n

Nate

07/28/2022, 3:49 PM
hey @Mansour Zayer, if I'm understanding your example, you could use triggers like this:
from prefect import task, Flow
from prefect.triggers import all_successful, all_finished

@task
def task_A():
    raise AssertionError

@task(trigger=all_finished)
def task_B():
    return 'something'

@task(trigger=all_successful)
def task_C():
    return 'something'

with Flow('My Flow') as flow:
   result_A = task_A() 
   result_B = task_B(upstream_tasks=[result_A]) 
   result_C = task_C(upstream_tasks=[result_B]) 

if __name__ == "__main__":
   flow.run()
m

Mansour Zayer

07/28/2022, 3:55 PM
This doesn't work as you expect. Task C will treat both Task A and B as upstream when your trigger is all_successful. In this case, if A fails but B succeeds, C will not trigger because all_successful will return False (basically, trigger functions aggregate the state of ALL upsteam tasks, not just the one explicitly set as upstream)
n

Nate

07/28/2022, 4:16 PM
if you run the code, raise a fail in A, you'll see that B succeeds and then C does trigger - maybe I'm not understanding what you mean
❯ python triggers.py
[2022-07-28 10:48:34-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'My Flow'
[2022-07-28 10:48:34-0500] INFO - prefect.TaskRunner | Task 'task_A': Starting task run...
[2022-07-28 10:48:34-0500] ERROR - prefect.TaskRunner | Task 'task_A': Exception encountered during task execution!
Traceback (most recent call last):
  File "triggers.py", line 6, in task_A
    raise AssertionError
AssertionError
[2022-07-28 10:48:34-0500] INFO - prefect.TaskRunner | Task 'task_A': Finished task run for task with final state: 'Failed'
[2022-07-28 10:48:34-0500] INFO - prefect.TaskRunner | Task 'task_B': Starting task run...
[2022-07-28 10:48:34-0500] INFO - prefect.TaskRunner | Task 'task_B': Finished task run for task with final state: 'Success'
[2022-07-28 10:48:34-0500] INFO - prefect.TaskRunner | Task 'task_C': Starting task run...
[2022-07-28 10:48:34-0500] INFO - prefect.TaskRunner | Task 'task_C': Finished task run for task with final state: 'Success'
[2022-07-28 10:48:34-0500] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
and from your original description
B
will run after
A
is finished (no matter fail or success)
means that A is necessarily an upstream task of B, even if B doesn't require a successful state from A to run
👀 1