s

    Simon Gasse

    1 year ago
    Hi! Can I somehow ignore skipped tasks of a
    map
    in downstream tasks? Let's say a mapped task skips for some inputs. In this case, I would still like the downstream task that runs on the return values of the mapped task to continue with the results of all successful task runs. An example is in the thread 🙂
    import prefect
    from prefect import Flow, task
    from prefect.triggers import any_successful
    
    
    LOG = prefect.context.get("logger")
    
    
    @task
    def inc(arg):
        return arg + 1
    
    
    @task
    def skip_conditionally(arg):
        if arg == 1:
            raise prefect.engine.signals.SKIP("Skipped this argument")
        return arg
    
    
    @task(trigger=any_successful)
    def print_args(args):
        <http://LOG.info|LOG.info>(args)
    
    
    with Flow("Mapping Test") as flow:
        filtered_nums = skip_conditionally.map(range(3))
        increased_nums = inc.map(filtered_nums)
        print_args(increased_nums)
    
    
    flow.run()
    The output is:
    [2021-08-31 14:58:27+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapping Test'
    [2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Starting task run...
    [2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Finished task run for task with final state: 'Mapped'
    [2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Starting task run...
    [2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Finished task run for task with final state: 'Success'
    [2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Starting task run...
    [2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Skipped this argument')
    [2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Finished task run for task with final state: 'Skipped'
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Starting task run...
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Finished task run for task with final state: 'Success'
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Mapped'
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Starting task run...
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Finished task run for task with final state: 'Success'
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Starting task run...
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Finished task run for task with final state: 'Skipped'
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[2]': Starting task run...
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[2]': Finished task run for task with final state: 'Success'
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'print_args': Starting task run...
    [2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'print_args': Finished task run for task with final state: 'Skipped'
    [2021-08-31 14:58:28+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    As you can see, three
    inc
    task instances are created, with one being skipped and
    print_args
    also being skipped. Instead, I'd like one of the following: • Only two
    inc
    task instances are created, the skipped "input" is ignored in
    .map()
    (preferred) •
    print_args
    is not skipped but only runs on the non-skipped tasks. I might want to abstract this to failure at some point, so that failed mapped tasks are ignored in downstream tasks. Any suggestions?
    OK so maybe
    FilterTask
    will be of use, looking into it...https://docs.prefect.io/api/latest/tasks/control_flow.html#filtertask
    So this serves my purpose (example with uncaught failure instead of caught except):
    import prefect
    from prefect import Flow, task
    from prefect.triggers import any_successful
    from prefect.tasks.control_flow.filter import FilterTask
    from prefect.engine.signals import SKIP
    
    
    LOG = prefect.context.get("logger")
    
    filter_skip_fail = FilterTask(
        filter_func=lambda x: not isinstance(x, SKIP) and not isinstance(x, BaseException)
    )
    
    
    @task
    def inc(arg):
        return arg + 1
    
    
    @task
    def skip_conditionally(arg):
        if arg == 1:
            raise RuntimeError
            # raise prefect.engine.signals.SKIP("Skipped this argument")
        return arg
    
    
    @task(trigger=any_successful)
    def print_args(args):
        <http://LOG.info|LOG.info>(args)
    
    
    with Flow("Mapping Test") as flow:
        filtered_nums = filter_skip_fail(skip_conditionally.map(range(3)))
        increased_nums = inc.map(filtered_nums)
        print_args(increased_nums)
    
    
    flow.run()
    Output:
    [2021-08-31 15:33:09+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapping Test'
    [2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Starting task run...
    [2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Finished task run for task with final state: 'Mapped'
    [2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Starting task run...
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Finished task run for task with final state: 'Success'
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Starting task run...
    [2021-08-31 15:33:12+0200] ERROR - prefect.TaskRunner | Task 'skip_conditionally[1]': Exception encountered during task execution!
    Traceback (most recent call last):
      <redacted>
        raise RuntimeError
    RuntimeError
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Finished task run for task with final state: 'Failed'
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Starting task run...
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Finished task run for task with final state: 'Success'
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'FilterTask': Starting task run...
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'FilterTask': Finished task run for task with final state: 'Success'
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Mapped'
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Starting task run...
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Finished task run for task with final state: 'Success'
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Starting task run...
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Finished task run for task with final state: 'Success'
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'print_args': Starting task run...
    [2021-08-31 15:33:12+0200] INFO - prefect | [1, 3]
    [2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'print_args': Finished task run for task with final state: 'Success'
    [2021-08-31 15:33:12+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    Kevin Kho

    Kevin Kho

    1 year ago
    Glad you figured this out 👍
    s

    Simon Gasse

    1 year ago
    @Kevin Kho would it make sense to link this/mention it in the docs on
    Task.map
    ? At least this is where I was looking and could not find it.https://docs.prefect.io/core/concepts/mapping.html
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah I can see that. PR’s would be welcome if you’re interested? 🙂
    s

    Simon Gasse

    1 year ago
    Alright, I'll take a look one of the next days 😉
    @Kevin Kho I put it up yesterday night and it was already merged 🙂 https://github.com/PrefectHQ/prefect/pull/4927
    Kevin Kho

    Kevin Kho

    1 year ago
    That was fast!