Hi everyone. Very new to Prefect. I have some larg...
# prefect-community
c
Hi everyone. Very new to Prefect. I have some large files that I want to put through a pipeline, and after setting up the db tables, etc. I only want to process one row at a time so that I can validate errors and skip dodgy rows, etc. Speed isn’t a huge concern. Does anyone know of a good strategy to achieve this with Prefect?
c
Hi Carl — it sounds like you might benefit from Prefect mapping (https://docs.prefect.io/core/concepts/mapping.html); in particular you could have one task that returns row IDs that a downstream task maps over to process. When using mapping (especially with large numbers of mapped tasks), I highly recommend a Dask Executor for parallelism (https://docs.prefect.io/core/concepts/engine.html#executors)
upvote 2
c
Thanks for the hint. I think this comes pretty close to what I need. Now my question is, being used to pandas/dataframes, how might I persist the column names in each iteration? i.e. is there a way to return a
dict
or something similar? Or perhaps an alternative way to make referencing the data in a single row more readable?
Copy code
import pandas as pd
from prefect import task, Flow, Parameter

@task()
def extract_file(filename):
    df = pd.read_csv(filename)
    return df.values

@task()
def transform(df_row):
    #do something
    #print(df_row['colA'])
    return df_row

@task()
def load(df):
    print(df)
    return

def build_flow():
    with Flow('Test ETL') as flow:
        df = extract_file(filename)
        df_row = transform.map(df)
        r = load(df_row)
    return flow


flow = build_flow()
flow.run(parameters={'filename': 'data/somefile.csv'})