https://prefect.io logo
Title
a

Aliza Rayman

11/21/2019, 8:13 AM
What is the best way to run a bunch of mapped tasks and aggregate the errors (with custom messages) to a final result? Right now my code looks something like:
with Flow('aggregate alerts') as flow:
    ids = Parameter("ids")
    response1 = task1.map(ids)  # May return {"error": "error message"} or other data
    response2 = task2.map(response1)  # May return {"error": "error message"} or other data
    ...
    result_file = taskX(responseX-1)  # Aggreagtes all of the error messages with corresponding ids into 1 file
j

josh

11/21/2019, 2:47 PM
Hey @Aliza Rayman! You’re on the right path and I don’t think there is a best way to aggregate the results and it is entirely up to your preferences! Mapping returns an
iterable
which can be passed to downstream tasks. So if you have a bunch of mapped tasks you could pass them to the final aggregation task in a way like:
result_file = taskX([response1, response2, etc...])
Where
taskX1
would take in a list of lists and could perform your aggregation over the elements in those lists. e.g. grab all of the error messages with corresponding ids. Now in your final aggregation task you could return a result with your desired final custom message (or write them to file judging by your comment).
a

Aliza Rayman

11/21/2019, 2:50 PM
Hi @josh thanks for your answer. Just trying to understand. If some tasks fail in a map then only the non-failed task results are returned? And can be passed as input to the next task?
j

josh

11/21/2019, 2:58 PM
If one of the mapped children fail it will still return all of the results. In this case you will want to set your downstream aggregation task’s trigger to something like
any_successful
so it can still run whether or not any of the upstream mapped children fail. e.g.
from prefect import task, Flow
from prefect.triggers import any_successful


@task
def divide(x):
    return 1 / x


@task(trigger=any_successful)
def aggregate(results):
    print(results)


with Flow("divide-fail") as flow:
    results = divide.map([0, 1, 2])
    aggregate(results)

flow.run()
The
aggregate
task will still run and it will produce some output like
[ZeroDivisionError('division by zero'), 1.0, 0.5]
a

Aliza Rayman

11/24/2019, 8:53 AM
Thanks, I was considering something like that and was wondering about mapping on an error. I'll try that