Apologies for a naive question! I have a flow that...
# prefect-community
d
Apologies for a naive question! I have a flow that is roughly doing:
filepaths = get_filepath(directory)
# return a list of file paths
outpaths = get_output_paths(filepaths)
# return a corresponding list of paths to write to after cleaning
dataframes = read_file.map(filepaths)
# read files into memory in parallel
cleaned_dfs = clean_dataframe.map(dataframes)
# clean dataframes in parallel
write_dataframe.map(cleaned_dfs, outpaths)
# Write dataframes to file in parallel I have 16 cores and ~400 files and am using the DaskExecutor. During execution, ALL files are read into memory before any of the cleaning child task starts, then all cleaning tasks are performed before any writing to file commences. This means the memory is being flooded as each file is ~2GB. It's kind of like a breadth-first approach, whereas I'd like it to run depth-first: read 16 files into memory at once, clean them and write to file, then a core can free the memory and read in the next file. Is there a way to achieve this, possibly with triggers? The only workaround I've been able to think up is to write a single big task that does the reading/cleaning/writing steps - but I feel like that's kind of not the point of Prefect. Huge thanks in advance if anyone can help!
j
@David Norrish I'm sure the Prefect folks will have a good answer for you, but since I'm online and waiting for something I'll take a crack at it... One approach that might get you close is to think about splitting
filepaths
into 16 item chunks then map over the first chunk with your 3 tasks, next map over the second 16 item chunk with your 3 tasks and so on. So essentially it would be a loop and then map: 1. Loop over the 16-item chunks 2. Map over the items within each chunk with your 3 tasks I don't have experience with task looping in Prefect but I think this is a relatively new feature: https://docs.prefect.io/core/tutorials/task-looping.html Anyway, hope this helps! I'm curious to hear how it works out.
d
@Joe Schmid Thanks for the suggestion! I've also never tried this kind of multi-level mapping in Prefect, but that sounds like it makes a lot of sense šŸ™‚
j
Just edited the answer a bit -- I think the first iteration is better done with a loop so that the 16-item chunks get processed sequentially rather than in parallel.
But definitely update here once you've had a chance to try it!
d
I think there's a deeper issue with my approach, because the
write_dataframe
step doesn't begin until all the
clean_dataframe
tasks have finished - even when there are cores sitting around doing nothing. Thanks for the loop suggestion, I'll look into that now!
c
Hi @David Norrish - this is a great question; we have really struggled to support depth first execution, but there are some limitations we keep running into that are described here: https://stories.dask.org/en/latest/prefect-workflows.html#pain-points-when-using-dask In the meantime, I think @Joe Schmidā€™s suggestion is a good one!
d
Thanks a bunch @Chris White! Will have a read