https://prefect.io logo
s

Simon Gasse

08/31/2021, 12:59 PM
Hi! Can I somehow ignore skipped tasks of a
map
in downstream tasks? Let's say a mapped task skips for some inputs. In this case, I would still like the downstream task that runs on the return values of the mapped task to continue with the results of all successful task runs. An example is in the thread 🙂
Copy code
import prefect
from prefect import Flow, task
from prefect.triggers import any_successful


LOG = prefect.context.get("logger")


@task
def inc(arg):
    return arg + 1


@task
def skip_conditionally(arg):
    if arg == 1:
        raise prefect.engine.signals.SKIP("Skipped this argument")
    return arg


@task(trigger=any_successful)
def print_args(args):
    <http://LOG.info|LOG.info>(args)


with Flow("Mapping Test") as flow:
    filtered_nums = skip_conditionally.map(range(3))
    increased_nums = inc.map(filtered_nums)
    print_args(increased_nums)


flow.run()
The output is:
Copy code
[2021-08-31 14:58:27+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapping Test'
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Starting task run...
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Finished task run for task with final state: 'Mapped'
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Starting task run...
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Finished task run for task with final state: 'Success'
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Starting task run...
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Skipped this argument')
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Finished task run for task with final state: 'Skipped'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Finished task run for task with final state: 'Success'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Mapped'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Finished task run for task with final state: 'Success'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Finished task run for task with final state: 'Skipped'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[2]': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[2]': Finished task run for task with final state: 'Success'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'print_args': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'print_args': Finished task run for task with final state: 'Skipped'
[2021-08-31 14:58:28+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
As you can see, three
inc
task instances are created, with one being skipped and
print_args
also being skipped. Instead, I'd like one of the following: • Only two
inc
task instances are created, the skipped "input" is ignored in
.map()
(preferred) •
print_args
is not skipped but only runs on the non-skipped tasks. I might want to abstract this to failure at some point, so that failed mapped tasks are ignored in downstream tasks. Any suggestions?
OK so maybe
FilterTask
will be of use, looking into it... https://docs.prefect.io/api/latest/tasks/control_flow.html#filtertask
So this serves my purpose (example with uncaught failure instead of caught except):
Copy code
import prefect
from prefect import Flow, task
from prefect.triggers import any_successful
from prefect.tasks.control_flow.filter import FilterTask
from prefect.engine.signals import SKIP


LOG = prefect.context.get("logger")

filter_skip_fail = FilterTask(
    filter_func=lambda x: not isinstance(x, SKIP) and not isinstance(x, BaseException)
)


@task
def inc(arg):
    return arg + 1


@task
def skip_conditionally(arg):
    if arg == 1:
        raise RuntimeError
        # raise prefect.engine.signals.SKIP("Skipped this argument")
    return arg


@task(trigger=any_successful)
def print_args(args):
    <http://LOG.info|LOG.info>(args)


with Flow("Mapping Test") as flow:
    filtered_nums = filter_skip_fail(skip_conditionally.map(range(3)))
    increased_nums = inc.map(filtered_nums)
    print_args(increased_nums)


flow.run()
Output:
Copy code
[2021-08-31 15:33:09+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapping Test'
[2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Starting task run...
[2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Finished task run for task with final state: 'Mapped'
[2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Starting task run...
[2021-08-31 15:33:12+0200] ERROR - prefect.TaskRunner | Task 'skip_conditionally[1]': Exception encountered during task execution!
Traceback (most recent call last):
  <redacted>
    raise RuntimeError
RuntimeError
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Finished task run for task with final state: 'Failed'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'FilterTask': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'FilterTask': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Mapped'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'print_args': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect | [1, 3]
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'print_args': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k

Kevin Kho

08/31/2021, 1:42 PM
Glad you figured this out 👍
👍 1
s

Simon Gasse

08/31/2021, 1:52 PM
@Kevin Kho would it make sense to link this/mention it in the docs on
Task.map
? At least this is where I was looking and could not find it. https://docs.prefect.io/core/concepts/mapping.html
k

Kevin Kho

08/31/2021, 1:53 PM
Yeah I can see that. PR’s would be welcome if you’re interested? 🙂
s

Simon Gasse

08/31/2021, 1:58 PM
Alright, I'll take a look one of the next days 😉
@Kevin Kho I put it up yesterday night and it was already merged 🙂 https://github.com/PrefectHQ/prefect/pull/4927
k

Kevin Kho

09/01/2021, 1:57 PM
That was fast!