Hi, I’m new to Prefect. I’m trying to get a pandas...
# ask-community
l
Hi, I’m new to Prefect. I’m trying to get a pandas dataframe out of a mapped task. Here
query_list
is a
list of str
, each of which is a valid query. Let’s say there’s 3 queries and each query gives me back 10 rows. This task works fine.
data = execute_test_query.map(query=query_list)
Now I want to transform a concatenated dataframe in its entirety.
Copy code
@task
def n_rows(df):
    rows = df.shape[0]
    print(f"There are {rows} rows!")
    return rows
I was expecting
data_2 = n_rows.map(flatten(data))
to give
"There are 30 rows!"
, but I get key errors. Any idea what I need to do to flatten
data
?
j
Hi @Leanna Morinishi - I think you're trying to "reduce" the data from the mapped function, right?
I have something like
Copy code
@task
def some_task(value: int) -> int:
    return value + 42

@task
def some_reduce_task(value_list: list[int]) -> int:
    # intentionally very obvious reduce
    sum = 0
    for value in value_list:
        sum = sum + value 
    return sum

with Flow() as flow:
    some_list = [1,2,3,4,5,42]
    some_task_result_list = some_task.map(some_list) # will do some_task each element of list separately
    reduced_result = some_reduce_task(some_task_result_list )
upvote 1
I'm also using dataframes, so my reduce fucntion/task is just
Copy code
@task
def my_reduce_task(df_list: list[pd.DataFrame]) -> pd.DataFrame:
    df = pd.concat(df_list)
    return df
upvote 1
k
Hi @Leanna Morinishi, welcome! You need to use a reduce task that takes in
List[pd.DataFrame]
and then use
pd.concat
to combine your list of dataframes before calling shape. You should not need to use
flatten
. You can also just return the row count of each one. Reduce them in a function and return the total. I would suggest this
l
Excellent, thank you both! Let me try and get back