Simon Gasse
08/31/2021, 12:59 PMmap
in downstream tasks? Let's say a mapped task skips for some inputs. In this case, I would still like the downstream task that runs on the return values of the mapped task to continue with the results of all successful task runs. An example is in the thread 🙂import prefect
from prefect import Flow, task
from prefect.triggers import any_successful
LOG = prefect.context.get("logger")
@task
def inc(arg):
return arg + 1
@task
def skip_conditionally(arg):
if arg == 1:
raise prefect.engine.signals.SKIP("Skipped this argument")
return arg
@task(trigger=any_successful)
def print_args(args):
<http://LOG.info|LOG.info>(args)
with Flow("Mapping Test") as flow:
filtered_nums = skip_conditionally.map(range(3))
increased_nums = inc.map(filtered_nums)
print_args(increased_nums)
flow.run()
[2021-08-31 14:58:27+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapping Test'
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Starting task run...
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Finished task run for task with final state: 'Mapped'
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Starting task run...
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Finished task run for task with final state: 'Success'
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Starting task run...
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Skipped this argument')
[2021-08-31 14:58:27+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Finished task run for task with final state: 'Skipped'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Finished task run for task with final state: 'Success'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Mapped'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Finished task run for task with final state: 'Success'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Finished task run for task with final state: 'Skipped'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[2]': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'inc[2]': Finished task run for task with final state: 'Success'
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'print_args': Starting task run...
[2021-08-31 14:58:28+0200] INFO - prefect.TaskRunner | Task 'print_args': Finished task run for task with final state: 'Skipped'
[2021-08-31 14:58:28+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
As you can see, three inc
task instances are created, with one being skipped and print_args
also being skipped. Instead, I'd like one of the following:
• Only two inc
task instances are created, the skipped "input" is ignored in .map()
(preferred)
• print_args
is not skipped but only runs on the non-skipped tasks.
I might want to abstract this to failure at some point, so that failed mapped tasks are ignored in downstream tasks. Any suggestions?FilterTask
will be of use, looking into it...
https://docs.prefect.io/api/latest/tasks/control_flow.html#filtertaskimport prefect
from prefect import Flow, task
from prefect.triggers import any_successful
from prefect.tasks.control_flow.filter import FilterTask
from prefect.engine.signals import SKIP
LOG = prefect.context.get("logger")
filter_skip_fail = FilterTask(
filter_func=lambda x: not isinstance(x, SKIP) and not isinstance(x, BaseException)
)
@task
def inc(arg):
return arg + 1
@task
def skip_conditionally(arg):
if arg == 1:
raise RuntimeError
# raise prefect.engine.signals.SKIP("Skipped this argument")
return arg
@task(trigger=any_successful)
def print_args(args):
<http://LOG.info|LOG.info>(args)
with Flow("Mapping Test") as flow:
filtered_nums = filter_skip_fail(skip_conditionally.map(range(3)))
increased_nums = inc.map(filtered_nums)
print_args(increased_nums)
flow.run()
[2021-08-31 15:33:09+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapping Test'
[2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Starting task run...
[2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally': Finished task run for task with final state: 'Mapped'
[2021-08-31 15:33:09+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[0]': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Starting task run...
[2021-08-31 15:33:12+0200] ERROR - prefect.TaskRunner | Task 'skip_conditionally[1]': Exception encountered during task execution!
Traceback (most recent call last):
<redacted>
raise RuntimeError
RuntimeError
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[1]': Finished task run for task with final state: 'Failed'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'skip_conditionally[2]': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'FilterTask': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'FilterTask': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc': Finished task run for task with final state: 'Mapped'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[0]': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'inc[1]': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'print_args': Starting task run...
[2021-08-31 15:33:12+0200] INFO - prefect | [1, 3]
[2021-08-31 15:33:12+0200] INFO - prefect.TaskRunner | Task 'print_args': Finished task run for task with final state: 'Success'
[2021-08-31 15:33:12+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Kevin Kho
Simon Gasse
08/31/2021, 1:52 PMTask.map
? At least this is where I was looking and could not find it.
https://docs.prefect.io/core/concepts/mapping.htmlKevin Kho
Simon Gasse
08/31/2021, 1:58 PMKevin Kho