https://prefect.io logo
s

Stephen Lloyd

03/16/2023, 7:38 AM
I'm unclear on how to gather/reduce results from using
.submit()
. I want to make a bunch of api calls in parallel and build a list of results. Can anyone help me on this?
With these tasks
Copy code
from prefect import task, flow, get_run_logger
import pandas as pd

from datetime import datetime, timedelta, timezone
from typing import List, Dict


@task
def dividetwo(num:int):
    return num/2

@task
def addem(nums: List):
    for i in nums:
        total += i
    return total
I can gather the results into a list
Copy code
@flow
def my_flow():
    logger = get_run_logger()
    data = {'id':[2,4,6,8,10,12,14,16]}
    df = pd.DataFrame(data)
    
    failures = []
    for index, row in df.iterrows():
        failures.append(dividetwo.submit(row['id']))

    return failures

print(my_flow())

# output
[Completed(message=None, type=COMPLETED, result=1.0), Completed(message=None, type=COMPLETED, result=2.0), Completed(message=None, type=COMPLETED, result=3.0), Completed(message=None, type=COMPLETED, result=4.0), Completed(message=None, type=COMPLETED, result=5.0), Completed(message=None, type=COMPLETED, result=6.0), Completed(message=None, type=COMPLETED, result=7.0), Completed(message=None, type=COMPLETED, result=8.0)]
but I can't seem to aggregate them successfully
Copy code
@task
def addem(nums: List):
    for i in nums:
        total += i.result()
    return total

@flow
def my_flow():
    logger = get_run_logger()
    data = {'id':[2,4,6,8,10,12,14,16]}
    df = pd.DataFrame(data)
    
    failures = []
    for index, row in df.iterrows():
        failures.append(dividetwo.submit(row['id']))

    final_num = addem(failures)
    return final_num

print(my_flow())

# output
UnboundLocalError: cannot access local variable 'total' where it is not associated with a value
z

Zanie

03/16/2023, 3:12 PM
You need to initialize total in addem
i.e.
total = 0
s

Stephen Lloyd

03/17/2023, 11:50 AM
Thanks! This solved this specific error. I am still having trouble gathering the results from a asynchronous tasks into a single downstream task. Now I can work on that. 🙂