b

    Blake List

    1 year ago
    Hi there! I was wondering what is the best way to create a prefect task out of a function that is applied to each row of a dataframe. E.g. wrap something like
    df = df.apply(my_function, axis=1)
    with prefect. Thanks!
    nicholas

    nicholas

    1 year ago
    Hi @Blake List - it sounds like Apply Map might be useful for you there (it's the last method on the page in case the anchor doesn't work)
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Blake List, you can do this with the basic map like this:
    import pandas as pd
    from prefect import task, Flow
    from typing import Dict
    import prefect
    
    test = pd.DataFrame({'a': [1,2,3,4,5], 'b': [1,2,3,4,5]})
    
    @task
    def my_func(record: Dict):
        record['c'] = record['a'] + record['b']
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(record)
        return record
    
    @task
    def get_records(df: pd.DataFrame):
        return df.to_dict(orient='records')
    
    @task
    def assemble(records):
        df = pd.DataFrame.from_dict(records)
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(df.head())
        return 
    
    with Flow("df_test") as flow:
        records = get_records(test)
        processed = my_func.map(records)
        assemble(processed)
    
    flow.run()
    But this is only recommended when you need observability per row. If you don’t you should just use the pandas apply inside a task.
    b

    Blake List

    1 year ago
    Thank you guys! @Kevin Kho this does exactly what I need 🙂
    If I have multiple functions that need to operate on each row (record), can I somehow parallelize over each row?
    Kevin Kho

    Kevin Kho

    1 year ago
    I don’t think so because that will create copies of the record and process them independently. It’s already parallelized on the per record level so your compute power is being utilized anyway, There isn’t really 2 levels of parallelization since the first parallel operation already uses the available compute.
    b

    Blake List

    1 year ago
    Great yes that was what I was hoping for