https://prefect.io logo
Title
g

Garret Cook

07/19/2021, 4:09 PM
I have a mapped task, it returns a list of lists. I then flatten it and iterate over the results in another mapped task, as recommended in the docs. However, if the first task returns TRIGGERFAIL instead of a list of lists, when I run flatten, I get an error: Traceback (most recent call last): File “/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py”, line 48, in inner new_state = method(self, state, *args, **kwargs) File “/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py”, line 520, in get_flow_run_state mapped_children=children, executor=executor File “/usr/local/lib/python3.6/site-packages/prefect/utilities/executors.py”, line 588, in flatten_mapped_children [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children] File “/usr/local/lib/python3.6/site-packages/prefect/utilities/executors.py”, line 588, in <listcomp> [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children] File “/usr/local/lib/python3.6/site-packages/prefect/executors/local.py”, line 28, in submit return fn(*args, **kwargs) File “/usr/local/lib/python3.6/site-packages/prefect/utilities/executors.py”, line 588, in <lambda> [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children] TypeError: object of type ‘TRIGGERFAIL’ has no len()
k

Kevin Kho

07/19/2021, 4:11 PM
Hey @Garret Cook, maybe you can filter out the exceptions from your list with the FilterTask ?
g

Garret Cook

07/19/2021, 4:12 PM
ah, I see
is TRIGGERFAIL filtered in the default function?
the default filter function inside FilterTask, I mean
k

Kevin Kho

07/19/2021, 4:30 PM
It filters exceptions by default and that is an exception so I expect it to yep.
g

Garret Cook

07/19/2021, 4:34 PM
I have this array: [TRIGGERFAIL(‘Trigger was “all_successful” but some of the upstream tasks failed.’,), TRIGGERFAIL(‘Trigger was “all_successful” but some of the upstream tasks failed.’,), TRIGGERFAIL(‘Trigger was “all_successful” but some of the upstream tasks failed.’,)]
filterFailures = FilterTask() filtered_download_file_from_azure_cmds = filterFailures(<the array>)
and it returns the same thing
k

Kevin Kho

07/19/2021, 4:35 PM
I’ll give this a try.
g

Garret Cook

07/19/2021, 4:35 PM
Thank you
k

Kevin Kho

07/19/2021, 4:38 PM
Yep I see the same thing. I’ll make a working example
from prefect import Flow, task
import prefect
from prefect.engine.signals import TRIGGERFAIL, FAIL
from prefect.tasks.control_flow.filter import FilterTask

fil = FilterTask(lambda r: not isinstance(r, BaseException))

@task
def abc(x):
    if x == 2:
        raise TRIGGERFAIL("test")
    if x == 5:
        raise FAIL("test")
    return x

@task
def log(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    return x

with Flow(" ") as flow:
    x = abc.map([1,2,3,4,5,6,7,8,9])
    x = fil(x)
    log(x)
👍 1
g

Garret Cook

07/26/2021, 2:41 AM
So, after trying a few things, I made a task that runs as part of the flow on an all_success trigger that sends the success message to the users. This task has easy access to all previous tasks, and I can provide plenty of metadata to users this way. Should the flow fail, I have a handler that sends a general ‘failed, contact support for more info’ email which doesn’t need any additional information from the flow.
👍 1
In production now, working like a champ. Thanks for the suggestions and help.
k

Kevin Kho

07/26/2021, 3:01 AM
That sounds awesome! Sounds like a great solution. Thanks for circling back and mentioning it.