Hi all. Bit confused on how to use prefect with da...
# prefect-community
d
Hi all. Bit confused on how to use prefect with dask dataframes Does the following stub code make sense:
Copy code
from prefect import Flow, task

@task
def read_parquet(src_path)
    return df # returns dask df


@task
def retrieve_img(x):
    return img

@task
def model(img):
    return result

@task
def write_db(result):
    # write to db

@task
def write_parquet(dst_path, df):
    # write to disk

with Flow('Test') as flow:
    df = read_parquet(path)
    images = retrieve_img.map(df.iterrows())
    results = model.map(images)
    write_db.map(results)
    write_parquet(df.join(dd.from_sequence(results)))
essentially each row of my input dask dataframe needs to be processed and the results appended to the dataframe and saved in a database.
j
Hi @Dharhas Pothina, a couple thoughts from a fellow Prefect user: 1. If your parquet file/dask dataframe has thousands of rows, this pattern seems viable. If it has millions of rows, that's probably too large of a set to map over. (If you do need to process millions of rows, you could break it into chunks and then map over the chunks, etc.) 2. You can't do
df.iterrows()
where
df
is the result of a Prefect task, but you could return the rows as a list (or other iterable) from your read_parquet() task. You can definitely pass the result of a task to another task, e.g.
Copy code
result_of_read_parquet = read_parquet(path) # returns a list
result_of_retrieve_img = retrieve_img.map(result_of_read_parquet)
(
map()
requires its parameter(s) to be iterable.) You may have already seen this Prefect example, but if not it's close in concept to what you're doing: https://docs.prefect.io/core/tutorials/advanced-mapping.html#scaling-to-all-episodes
upvote 2
d
Yeah so we already do a lot of things on a dask cluster using df.apply etc trying to work out the where prefect tasks fit in. Can we pass lazy dataframes references between tasks?
Are there any examples using prefect and Dask dataframes?
j
@Dharhas Pothina gotcha, that makes sense re: you're already using a dask cluster. Yup, you can definitely pass a reference to a dask dataframe between tasks. As an experienced dask user maybe think about tasks/flows as (1) a way to express the sequence of activity as a unit, i.e. an overall flow with tasks that have dependencies on each other and (2) a way to distribute those tasks onto your cluster, e.g. mapped tasks can run in parallel on multiple workers, etc. Since you already know dask, I think just writing a few simple Prefect flows and trying them on your cluster will quickly get you up to speed.
While this example doesn't use a dask dataframe (just a plain pandas dataframe) it might be interesting for you: https://github.com/PrefectHQ/prefect/blob/master/examples/feature_engineering.py (in it
read_csv()
returns a pandas dataframe)