https://prefect.io logo
Title
j

Jacob Blanco

09/04/2020, 9:02 AM
Hey folks, Wondering what the right approach is here. We are feeding the output of a task into a FilterTask and somehow if the upstream task fails it the FilterTask gets a signal.FAIL instead of the expected dict. The upstream task from the FilterTask just raises signals.FAIL, but the FilterTask still runs and complains that
@task
def some_task_that_can_fail():
    if something_bad:
        raise signals.FAIL()
    return dict()

filter_out_stuff = FilterTask(filter_func=lambda config: config.get('some_parameter') = 'today')

with Flow() as flow:
    .....
    configurations = some_task_that_can_fail.map(some_inputs)
    useable_configs = filter_out_stuff(configurations)
j

Jeremiah

09/04/2020, 1:45 PM
Hi @Jacob Blanco - I think this is working as expected. I tweaked your code to run standalone:
from prefect import Flow, task
from prefect.tasks.control_flow.filter import FilterTask
from prefect.engine import signals

@task
def some_task_that_can_fail(i):
    if i == 1:
        raise signals.FAIL()
    return dict()
filter_out_stuff = FilterTask(filter_func=lambda config: config.get('some_parameter') == 'today')

some_inputs = [1,2,3]
with Flow('test') as flow:
    configurations = some_task_that_can_fail.map(some_inputs)
    useable_configs = filter_out_stuff(configurations)

state = flow.run()

state.result[configurations].result # [FAIL, {}, {}]
The
FilterTask
is receiving a list consisting of the
FAIL
state and two (expected) dictionaries, but since its filter_function requires the input (
config
) to have a
.get()
method, I think it’s failing on the FAIL state.
:upvote: 1
j

Jacob Blanco

09/05/2020, 3:45 AM
Right so the correct approach is to require the element to be a dict in the FilterTask as well as the business logic? Is this unique to FilterTask? I never have to explicitly handle Upstream failures.
j

Jeremiah

09/05/2020, 1:37 PM
If you have a task that reduces over a map (as your FilterTask does), then it receives a list containing both upstream failure states AND the results of successful mapped children, in order to allow you to decide what to do with that mix of success / failure. Therefore, the correct approach just depends on whether your Python code is able to work with those types of states or not. If you don’t want to deal with the failures, at all, and you can ignore them, you could use a filter task to remove them (
FilterTask(filter_func=lambda x: not isinstance(x, prefect.engine.state.Failed)
) and then use your existing FilterTask to process the remaining dicts. Alternatively, you might want to pause / abort your flow if there are any errors in the mapped children, which you could do by creating a task that raises an error if any of the mapped children failed. This is part of the nuance of brining mapped children back into the logic of your “main” flow.
j

Jacob Blanco

09/07/2020, 1:32 AM
Got it thanks!