Jacob Blanco
09/04/2020, 9:02 AM@task
def some_task_that_can_fail():
if something_bad:
raise signals.FAIL()
return dict()
filter_out_stuff = FilterTask(filter_func=lambda config: config.get('some_parameter') = 'today')
with Flow() as flow:
.....
configurations = some_task_that_can_fail.map(some_inputs)
useable_configs = filter_out_stuff(configurations)
Jeremiah
09/04/2020, 1:45 PMfrom prefect import Flow, task
from prefect.tasks.control_flow.filter import FilterTask
from prefect.engine import signals
@task
def some_task_that_can_fail(i):
if i == 1:
raise signals.FAIL()
return dict()
filter_out_stuff = FilterTask(filter_func=lambda config: config.get('some_parameter') == 'today')
some_inputs = [1,2,3]
with Flow('test') as flow:
configurations = some_task_that_can_fail.map(some_inputs)
useable_configs = filter_out_stuff(configurations)
state = flow.run()
state.result[configurations].result # [FAIL, {}, {}]
The FilterTask
is receiving a list consisting of the FAIL
state and two (expected) dictionaries, but since its filter_function requires the input (config
) to have a .get()
method, I think it’s failing on the FAIL state.Jacob Blanco
09/05/2020, 3:45 AMJeremiah
09/05/2020, 1:37 PMFilterTask(filter_func=lambda x: not isinstance(x, prefect.engine.state.Failed)
) and then use your existing FilterTask to process the remaining dicts. Alternatively, you might want to pause / abort your flow if there are any errors in the mapped children, which you could do by creating a task that raises an error if any of the mapped children failed. This is part of the nuance of brining mapped children back into the logic of your “main” flow.Jacob Blanco
09/07/2020, 1:32 AM