Marwan Sarieddine
03/18/2021, 2:53 PMnot_all_skipped
trigger, it seems the naming might be a bit not intuitive for me, or I might be missing something - please see the example in the thread (any help would be appreciated)Marwan Sarieddine
03/18/2021, 2:54 PMimport prefect
from prefect import case, task, Flow, Parameter
from prefect.triggers import not_all_skipped
@task
def conditional_task(param):
logger = prefect.context.logger
<http://logger.info|logger.info>(param)
@task(trigger=not_all_skipped)
def unconditional_task(param):
logger = prefect.context.logger
<http://logger.info|logger.info>(param)
with Flow("test-flow") as flow:
param1 = Parameter("param1", False)
param2 = Parameter("param2", True)
param3 = Parameter("param3", True)
with case(param1, True):
task1 = conditional_task(param1)
with case(param2, True):
task2 = conditional_task(param2)
task3 = unconditional_task(param3)
task3.set_upstream([task1, task2])
I expect task3
to run given the not_all_skipped
trigger, but instead it is being skipped.Marwan Sarieddine
03/18/2021, 2:55 PMIn [2]: flow_state = flow.run()
[2021-03-18 10:50:44-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param1': Starting task run...
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param1': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param3': Starting task run...
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param3': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param2': Starting task run...
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param2': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Skipped'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'conditional_task': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.conditional_task | True
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'conditional_task': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'conditional_task': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'conditional_task': Finished task run for task with final state: 'Skipped'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Skipped'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'unconditional_task': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'unconditional_task': Finished task run for task with final state: 'Skipped'
[2021-03-18 10:50:45-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Marwan Sarieddine
03/18/2021, 2:55 PMCarlos Gutierrez
03/18/2021, 3:17 PMCarlos Gutierrez
03/18/2021, 3:19 PMMarwan Sarieddine
03/18/2021, 3:21 PMunconditional_task
to run only if either :
• task 1 runs - i.e. doesn’t get skipped (and completes successfully)
• or task 2 runs - i.e. doesn’t get skipped (and completes successfully)
• or both task 1 and task 2 run and don’t get skippedMarwan Sarieddine
03/18/2021, 3:22 PMMarwan Sarieddine
03/18/2021, 3:24 PMCarlos Gutierrez
03/18/2021, 3:24 PMCarlos Gutierrez
03/18/2021, 3:28 PMMarwan Sarieddine
03/18/2021, 3:28 PMOR
operation on param1
and param2
and set that as the upstream and forgo the use of triggers - but would be nice to get a clarification of the intended use of the not_all_skipped
Carlos Gutierrez
03/18/2021, 3:31 PMnot_all_skipped
trigger is intended for those cases in which you want to distinguish successful tasks from skipped tasks, in such a way that you want to make sure that all upstream tasks finish successfully without skippingCarlos Gutierrez
03/18/2021, 3:34 PMMarwan Sarieddine
03/18/2021, 3:37 PMMarwan Sarieddine
03/18/2021, 3:38 PMCarlos Gutierrez
03/18/2021, 3:39 PMMarwan Sarieddine
03/18/2021, 3:44 PMCarlos Gutierrez
03/18/2021, 3:46 PMMarwan Sarieddine
03/18/2021, 3:50 PMCarlos Gutierrez
03/18/2021, 3:57 PMCarlos Gutierrez
03/18/2021, 4:02 PMMariia Kerimova
03/18/2021, 4:20 PM@task(skip_on_upstream_skip=False)
, 1second, I'll provide you an exampleMariia Kerimova
03/18/2021, 4:21 PMimport prefect
from prefect import case, task, Flow, Parameter
from prefect.triggers import not_all_skipped
@task()
def conditional_task(param):
logger = prefect.context.logger
<http://logger.info|logger.info>(param)
@task(skip_on_upstream_skip=False)
def unconditional_task(param):
logger = prefect.context.logger
<http://logger.info|logger.info>(param)
with Flow("skip-onupstreamfalse-flow") as flow:
param1 = Parameter("param1", False)
param2 = Parameter("param2", True)
param3 = Parameter("param3", True)
with case(param1, True):
task1 = conditional_task(param1)
with case(param2, True):
task2 = conditional_task(param2)
task3 = unconditional_task(param3)
task3.set_upstream([task1, task2])
Marwan Sarieddine
03/18/2021, 5:35 PM