Kyle McChesney

    Kyle McChesney

    3 months ago
    is there a trigger like “at least one successful, and the rest skipped”. I have some logic like so
    @task
    def required():
        pass
    
    
    @task
    def optional():
        pass
    
    
    @task
    def final():
        pass
    
    
    with Flow('example'):
        res = required()
        with case(some_case, True):
            optional_res = optional()
    
        final(res, upstream_tasks=[optional_res])
    If optional is skipped, its okay to run final. If required fails, or optional fails, I dont want to run final
    Kevin Kho

    Kevin Kho

    3 months ago
    I don’t normally recommend this, but I have faith in you that you can make your own trigger lol
    the code is not terribly hard
    Kyle McChesney

    Kyle McChesney

    3 months ago
    I guess the other option is to provide some fake implementation for
    optional
    to use in another
    with case(..., False)
    and then merge.
    Kevin Kho

    Kevin Kho

    3 months ago
    Yes merge can get you out of that I think
    I digested this I think what you want is
    @task(skip_on_upstream_skip=False)
    def final():
        pass
    
    ...
        final(res, upstream_tasks=[optional_res, required])
    Kyle McChesney

    Kyle McChesney

    3 months ago
    I am looking at the trigger code, and also have a slow brain day. But
    all_successful
    seems to count skipped as successful. So I think it actually just does what I want out of the box
    Kevin Kho

    Kevin Kho

    3 months ago
    It does but SKIP propagates to
    final
    will be skipped
    Kyle McChesney

    Kyle McChesney

    3 months ago
    skip_on_upstream_skip seems to do it.
    required
    doesn’t seem to be needed in upstream tasks
    from prefect import task, Flow, Parameter, case
    
    @task
    def case_value(param):
        return str(param).strip().lower() == 'true'
    
    
    @task
    def required():
        return [1, 2, 3]
    
    
    @task
    def optional():
        return [4, 5, 6]
    
    
    @task(skip_on_upstream_skip=Fals)
    def final(res):
        print(res)
    
    
    with Flow('example') as flow:
        case_param = Parameter('case')
        case_val = case_value(case_param)
        res = required()
    
        with case(case_val, True):
            optional_res = optional()
    
        final(res, upstream_tasks=[optional_res])
    prefect run -p test.py --param case=true
    Retrieving local flow... Done
    Configured local flow run
    └── Parameters: {'case': True}
    Running flow locally...
    └── 16:51:57 | INFO    | Beginning Flow run for 'example'
    └── 16:51:57 | INFO    | Task 'case': Starting task run...
    └── 16:51:57 | INFO    | Task 'case': Finished task run for task with final state: 'Success'
    └── 16:51:57 | INFO    | Task 'required': Starting task run...
    └── 16:51:57 | INFO    | Task 'required': Finished task run for task with final state: 'Success'
    └── 16:51:57 | INFO    | Task 'case_value': Starting task run...
    └── 16:51:57 | INFO    | Task 'case_value': Finished task run for task with final state: 'Success'
    └── 16:51:57 | INFO    | Task 'case(True)': Starting task run...
    └── 16:51:57 | INFO    | Task 'case(True)': Finished task run for task with final state: 'Success'
    └── 16:51:57 | INFO    | Task 'optional': Starting task run...
    └── 16:51:57 | INFO    | Task 'optional': Finished task run for task with final state: 'Success'
    └── 16:51:57 | INFO    | Task 'final': Starting task run...
    [1, 2, 3]
    └── 16:51:57 | INFO    | Task 'final': Finished task run for task with final state: 'Success'
    └── 16:51:57 | INFO    | Flow run SUCCESS: all reference tasks succeeded
    Flow run succeeded!
    prefect run -p test.py --param case=false
    Retrieving local flow... Done
    Configured local flow run
    └── Parameters: {'case': False}
    Running flow locally...
    └── 16:51:51 | INFO    | Beginning Flow run for 'example'
    └── 16:51:51 | INFO    | Task 'case': Starting task run...
    └── 16:51:52 | INFO    | Task 'case': Finished task run for task with final state: 'Success'
    └── 16:51:52 | INFO    | Task 'required': Starting task run...
    └── 16:51:52 | INFO    | Task 'required': Finished task run for task with final state: 'Success'
    └── 16:51:52 | INFO    | Task 'case_value': Starting task run...
    └── 16:51:52 | INFO    | Task 'case_value': Finished task run for task with final state: 'Success'
    └── 16:51:52 | INFO    | Task 'case(True)': Starting task run...
    └── 16:51:52 | INFO    | SKIP signal raised: SKIP('Provided value "False" did not match "True"')
    └── 16:51:52 | INFO    | Task 'case(True)': Finished task run for task with final state: 'Skipped'
    └── 16:51:52 | INFO    | Task 'optional': Starting task run...
    └── 16:51:52 | INFO    | Task 'optional': Finished task run for task with final state: 'Skipped'
    └── 16:51:52 | INFO    | Task 'final': Starting task run...
    [1, 2, 3]
    └── 16:51:52 | INFO    | Task 'final': Finished task run for task with final state: 'Success'
    └── 16:51:52 | INFO    | Flow run SUCCESS: all reference tasks succeeded
    Flow run succeeded!
    thank you @Kevin Kho