Dharhas Pothina
02/01/2020, 4:53 PMfrom prefect import Flow, task
@task
def read_parquet(src_path)
return df # returns dask df
@task
def retrieve_img(x):
return img
@task
def model(img):
return result
@task
def write_db(result):
# write to db
@task
def write_parquet(dst_path, df):
# write to disk
with Flow('Test') as flow:
df = read_parquet(path)
images = retrieve_img.map(df.iterrows())
results = model.map(images)
write_db.map(results)
write_parquet(df.join(dd.from_sequence(results)))
essentially each row of my input dask dataframe needs to be processed and the results appended to the dataframe and saved in a database.Joe Schmid
02/01/2020, 6:56 PMdf.iterrows()
where df
is the result of a Prefect task, but you could return the rows as a list (or other iterable) from your read_parquet() task. You can definitely pass the result of a task to another task, e.g.
result_of_read_parquet = read_parquet(path) # returns a list
result_of_retrieve_img = retrieve_img.map(result_of_read_parquet)
(map()
requires its parameter(s) to be iterable.)
You may have already seen this Prefect example, but if not it's close in concept to what you're doing: https://docs.prefect.io/core/tutorials/advanced-mapping.html#scaling-to-all-episodesDharhas Pothina
02/01/2020, 7:50 PMJoe Schmid
02/01/2020, 8:22 PMread_csv()
returns a pandas dataframe)