Aliza Rayman
11/21/2019, 8:13 AMwith Flow('aggregate alerts') as flow:
ids = Parameter("ids")
response1 = task1.map(ids) # May return {"error": "error message"} or other data
response2 = task2.map(response1) # May return {"error": "error message"} or other data
...
result_file = taskX(responseX-1) # Aggreagtes all of the error messages with corresponding ids into 1 file
josh
11/21/2019, 2:47 PMiterable
which can be passed to downstream tasks. So if you have a bunch of mapped tasks you could pass them to the final aggregation task in a way like:
result_file = taskX([response1, response2, etc...])
Where taskX1
would take in a list of lists and could perform your aggregation over the elements in those lists. e.g. grab all of the error messages with corresponding ids.
Now in your final aggregation task you could return a result with your desired final custom message (or write them to file judging by your comment).Aliza Rayman
11/21/2019, 2:50 PMjosh
11/21/2019, 2:58 PMany_successful
so it can still run whether or not any of the upstream mapped children fail.
e.g.
from prefect import task, Flow
from prefect.triggers import any_successful
@task
def divide(x):
return 1 / x
@task(trigger=any_successful)
def aggregate(results):
print(results)
with Flow("divide-fail") as flow:
results = divide.map([0, 1, 2])
aggregate(results)
flow.run()
The aggregate
task will still run and it will produce some output like [ZeroDivisionError('division by zero'), 1.0, 0.5]
Aliza Rayman
11/24/2019, 8:53 AM