Hi folks, I am trying to understand how to make us...
# ask-community
m
Hi folks, I am trying to understand how to make use of the
not_all_skipped
trigger, it seems the naming might be a bit not intuitive for me, or I might be missing something - please see the example in the thread (any help would be appreciated)
This is my example
Copy code
import prefect
from prefect import case, task, Flow, Parameter
from prefect.triggers import not_all_skipped


@task
def conditional_task(param):
    logger = prefect.context.logger
    <http://logger.info|logger.info>(param)



@task(trigger=not_all_skipped)
def unconditional_task(param):
    logger = prefect.context.logger
    <http://logger.info|logger.info>(param)



with Flow("test-flow") as flow:
    param1 = Parameter("param1", False)
    param2 = Parameter("param2", True)
    param3 = Parameter("param3", True)

    with case(param1, True):
        task1 = conditional_task(param1)

    with case(param2, True):
        task2 = conditional_task(param2)

    task3 = unconditional_task(param3)
    task3.set_upstream([task1, task2])
I expect
task3
to run given the
not_all_skipped
trigger, but instead it is being skipped.
Here are the flow run logs
Copy code
In [2]: flow_state = flow.run()
[2021-03-18 10:50:44-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param1': Starting task run...
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param1': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param3': Starting task run...
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param3': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param2': Starting task run...
[2021-03-18 10:50:44-0400] INFO - prefect.TaskRunner | Task 'param2': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Skipped'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'conditional_task': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.conditional_task | True
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'conditional_task': Finished task run for task with final state: 'Success'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'conditional_task': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'conditional_task': Finished task run for task with final state: 'Skipped'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'List': Finished task run for task with final state: 'Skipped'
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'unconditional_task': Starting task run...
[2021-03-18 10:50:45-0400] INFO - prefect.TaskRunner | Task 'unconditional_task': Finished task run for task with final state: 'Skipped'
[2021-03-18 10:50:45-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
here is a visualization of the run
c
As far as I'm concerned, the `not_all_skipped`trigger will fire if none of the upstream tasks is skipped. Therefore, if one of the conditional tasks is skipped, the _unconditional_task_ will be skipped as well
So if I get what you say, you want the _unconditional_task_ to run no matter what, right? But you want it to run after the other 2 tasks finish their work (either with success or fail?)
m
sorry my naming is also perhaps not so clear, I want the
unconditional_task
to run only if either : • task 1 runs - i.e. doesn’t get skipped (and completes successfully) • or task 2 runs - i.e. doesn’t get skipped (and completes successfully) • or both task 1 and task 2 run and don’t get skipped
if both get skipped, I don’t want it to run
I just need it to run if at least one of the upstream tasks is successful and not skipped …
c
ok I get it now
the bad thing here is that skipped tasks are also considered as successes 😞
m
My workaround is to perform a boolean
OR
operation on
param1
and
param2
and set that as the upstream and forgo the use of triggers - but would be nice to get a clarification of the intended use of the
not_all_skipped
c
I believe that the
not_all_skipped
trigger is intended for those cases in which you want to distinguish successful tasks from skipped tasks, in such a way that you want to make sure that all upstream tasks finish successfully without skipping
I wish I knew the answer straight away, but I'm afraid I'm new to Prefect and I'm learning on the fly as well 😅
m
I see - I guess you are probably right about this interpretation (should be easy to verify this - will follow up when I do)
thanks for chiming in !
c
No problem! I also learn a lot from these problems as I have very similar situations with my code!
👍 1
m
looks like you are right - when none are skipped, the not all skipped makes the downstream task run …
c
I was looking at the trigger docs to see if any trigger matches your situation but I'm afraid I can't find any, because of the fact that skipped tasks are considered to be successful tasks
m
yep …
c
now that I think of it I realise I have pretty much the same case with my flow lol
what happensif you remove the trigger from the unconditional task? I'm trying to understand what is the default behavior of the upstream dependencies
m
Hello Marwan! I believe you can use
@task(skip_on_upstream_skip=False)
, 1second, I'll provide you an example
Copy code
import prefect
from prefect import case, task, Flow, Parameter
from prefect.triggers import not_all_skipped

@task()
def conditional_task(param):
    logger = prefect.context.logger
    <http://logger.info|logger.info>(param)

@task(skip_on_upstream_skip=False)
def unconditional_task(param):
    logger = prefect.context.logger
    <http://logger.info|logger.info>(param)

with Flow("skip-onupstreamfalse-flow") as flow:
    param1 = Parameter("param1", False)
    param2 = Parameter("param2", True)
    param3 = Parameter("param3", True)
    with case(param1, True):
        task1 = conditional_task(param1)
    with case(param2, True):
        task2 = conditional_task(param2)
    task3 = unconditional_task(param3)
    task3.set_upstream([task1, task2])
m
hi @Mariia Kerimova thanks for the suggestion - yes this could work in my usecase 🙂
👍 1