https://prefect.io logo
Title
k

Kyle McChesney

06/21/2022, 10:42 PM
is there a trigger like “at least one successful, and the rest skipped”. I have some logic like so
@task
def required():
    pass


@task
def optional():
    pass


@task
def final():
    pass


with Flow('example'):
    res = required()
    with case(some_case, True):
        optional_res = optional()

    final(res, upstream_tasks=[optional_res])
If optional is skipped, its okay to run final. If required fails, or optional fails, I dont want to run final
k

Kevin Kho

06/21/2022, 10:43 PM
I don’t normally recommend this, but I have faith in you that you can make your own trigger lol
the code is not terribly hard
k

Kyle McChesney

06/21/2022, 10:44 PM
I guess the other option is to provide some fake implementation for
optional
to use in another
with case(..., False)
and then merge.
k

Kevin Kho

06/21/2022, 10:44 PM
Yes merge can get you out of that I think
I digested this I think what you want is
@task(skip_on_upstream_skip=False)
def final():
    pass

...
    final(res, upstream_tasks=[optional_res, required])
k

Kyle McChesney

06/21/2022, 10:47 PM
I am looking at the trigger code, and also have a slow brain day. But
all_successful
seems to count skipped as successful. So I think it actually just does what I want out of the box
k

Kevin Kho

06/21/2022, 10:48 PM
It does but SKIP propagates to
final
will be skipped
k

Kyle McChesney

06/21/2022, 10:52 PM
skip_on_upstream_skip seems to do it.
required
doesn’t seem to be needed in upstream tasks
from prefect import task, Flow, Parameter, case

@task
def case_value(param):
    return str(param).strip().lower() == 'true'


@task
def required():
    return [1, 2, 3]


@task
def optional():
    return [4, 5, 6]


@task(skip_on_upstream_skip=Fals)
def final(res):
    print(res)


with Flow('example') as flow:
    case_param = Parameter('case')
    case_val = case_value(case_param)
    res = required()

    with case(case_val, True):
        optional_res = optional()

    final(res, upstream_tasks=[optional_res])
prefect run -p test.py --param case=true
Retrieving local flow... Done
Configured local flow run
└── Parameters: {'case': True}
Running flow locally...
└── 16:51:57 | INFO    | Beginning Flow run for 'example'
└── 16:51:57 | INFO    | Task 'case': Starting task run...
└── 16:51:57 | INFO    | Task 'case': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO    | Task 'required': Starting task run...
└── 16:51:57 | INFO    | Task 'required': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO    | Task 'case_value': Starting task run...
└── 16:51:57 | INFO    | Task 'case_value': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO    | Task 'case(True)': Starting task run...
└── 16:51:57 | INFO    | Task 'case(True)': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO    | Task 'optional': Starting task run...
└── 16:51:57 | INFO    | Task 'optional': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO    | Task 'final': Starting task run...
[1, 2, 3]
└── 16:51:57 | INFO    | Task 'final': Finished task run for task with final state: 'Success'
└── 16:51:57 | INFO    | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!
prefect run -p test.py --param case=false
Retrieving local flow... Done
Configured local flow run
└── Parameters: {'case': False}
Running flow locally...
└── 16:51:51 | INFO    | Beginning Flow run for 'example'
└── 16:51:51 | INFO    | Task 'case': Starting task run...
└── 16:51:52 | INFO    | Task 'case': Finished task run for task with final state: 'Success'
└── 16:51:52 | INFO    | Task 'required': Starting task run...
└── 16:51:52 | INFO    | Task 'required': Finished task run for task with final state: 'Success'
└── 16:51:52 | INFO    | Task 'case_value': Starting task run...
└── 16:51:52 | INFO    | Task 'case_value': Finished task run for task with final state: 'Success'
└── 16:51:52 | INFO    | Task 'case(True)': Starting task run...
└── 16:51:52 | INFO    | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
└── 16:51:52 | INFO    | Task 'case(True)': Finished task run for task with final state: 'Skipped'
└── 16:51:52 | INFO    | Task 'optional': Starting task run...
└── 16:51:52 | INFO    | Task 'optional': Finished task run for task with final state: 'Skipped'
└── 16:51:52 | INFO    | Task 'final': Starting task run...
[1, 2, 3]
└── 16:51:52 | INFO    | Task 'final': Finished task run for task with final state: 'Success'
└── 16:51:52 | INFO    | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!
thank you @Kevin Kho
👍 1