Aliza Rayman

    Aliza Rayman

    2 years ago
    What is the best way to run a bunch of mapped tasks and aggregate the errors (with custom messages) to a final result? Right now my code looks something like:
    with Flow('aggregate alerts') as flow:
        ids = Parameter("ids")
        response1 = task1.map(ids)  # May return {"error": "error message"} or other data
        response2 = task2.map(response1)  # May return {"error": "error message"} or other data
        ...
        result_file = taskX(responseX-1)  # Aggreagtes all of the error messages with corresponding ids into 1 file
    j

    josh

    2 years ago
    Hey @Aliza Rayman! You’re on the right path and I don’t think there is a best way to aggregate the results and it is entirely up to your preferences! Mapping returns an
    iterable
    which can be passed to downstream tasks. So if you have a bunch of mapped tasks you could pass them to the final aggregation task in a way like:
    result_file = taskX([response1, response2, etc...])
    Where
    taskX1
    would take in a list of lists and could perform your aggregation over the elements in those lists. e.g. grab all of the error messages with corresponding ids. Now in your final aggregation task you could return a result with your desired final custom message (or write them to file judging by your comment).
    Aliza Rayman

    Aliza Rayman

    2 years ago
    Hi @josh thanks for your answer. Just trying to understand. If some tasks fail in a map then only the non-failed task results are returned? And can be passed as input to the next task?
    j

    josh

    2 years ago
    If one of the mapped children fail it will still return all of the results. In this case you will want to set your downstream aggregation task’s trigger to something like
    any_successful
    so it can still run whether or not any of the upstream mapped children fail. e.g.
    from prefect import task, Flow
    from prefect.triggers import any_successful
    
    
    @task
    def divide(x):
        return 1 / x
    
    
    @task(trigger=any_successful)
    def aggregate(results):
        print(results)
    
    
    with Flow("divide-fail") as flow:
        results = divide.map([0, 1, 2])
        aggregate(results)
    
    flow.run()
    The
    aggregate
    task will still run and it will produce some output like
    [ZeroDivisionError('division by zero'), 1.0, 0.5]
    Aliza Rayman

    Aliza Rayman

    2 years ago
    Thanks, I was considering something like that and was wondering about mapping on an error. I'll try that