Hi there! I was wondering what is the best way to ...
# ask-community
b
Hi there! I was wondering what is the best way to create a prefect task out of a function that is applied to each row of a dataframe. E.g. wrap something like
df = df.apply(my_function, axis=1)
with prefect. Thanks!
n
Hi @Blake List - it sounds like Apply Map might be useful for you there (it's the last method on the page in case the anchor doesn't work)
k
Hey @Blake List, you can do this with the basic map like this:
Copy code
import pandas as pd
from prefect import task, Flow
from typing import Dict
import prefect

test = pd.DataFrame({'a': [1,2,3,4,5], 'b': [1,2,3,4,5]})

@task
def my_func(record: Dict):
    record['c'] = record['a'] + record['b']
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(record)
    return record

@task
def get_records(df: pd.DataFrame):
    return df.to_dict(orient='records')

@task
def assemble(records):
    df = pd.DataFrame.from_dict(records)
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(df.head())
    return 

with Flow("df_test") as flow:
    records = get_records(test)
    processed = my_func.map(records)
    assemble(processed)

flow.run()
But this is only recommended when you need observability per row. If you don’t you should just use the pandas apply inside a task.
upvote 2
b
Thank you guys! @Kevin Kho this does exactly what I need 🙂
👍 1
If I have multiple functions that need to operate on each row (record), can I somehow parallelize over each row?
k
I don’t think so because that will create copies of the record and process them independently. It’s already parallelized on the per record level so your compute power is being utilized anyway, There isn’t really 2 levels of parallelization since the first parallel operation already uses the available compute.
b
Great yes that was what I was hoping for