David Norrish09/13/2019, 1:14 AM
# return a list of file paths
filepaths = get_filepath(directory)
# return a corresponding list of paths to write to after cleaning
outpaths = get_output_paths(filepaths)
# read files into memory in parallel
dataframes = read_file.map(filepaths)
# clean dataframes in parallel
cleaned_dfs = clean_dataframe.map(dataframes)
# Write dataframes to file in parallel I have 16 cores and ~400 files and am using the DaskExecutor. During execution, ALL files are read into memory before any of the cleaning child task starts, then all cleaning tasks are performed before any writing to file commences. This means the memory is being flooded as each file is ~2GB. It's kind of like a breadth-first approach, whereas I'd like it to run depth-first: read 16 files into memory at once, clean them and write to file, then a core can free the memory and read in the next file. Is there a way to achieve this, possibly with triggers? The only workaround I've been able to think up is to write a single big task that does the reading/cleaning/writing steps - but I feel like that's kind of not the point of Prefect. Huge thanks in advance if anyone can help!
Joe Schmid09/13/2019, 2:46 AM
into 16 item chunks then map over the first chunk with your 3 tasks, next map over the second 16 item chunk with your 3 tasks and so on. So essentially it would be a loop and then map: 1. Loop over the 16-item chunks 2. Map over the items within each chunk with your 3 tasks I don't have experience with task looping in Prefect but I think this is a relatively new feature: https://docs.prefect.io/core/tutorials/task-looping.html Anyway, hope this helps! I'm curious to hear how it works out.
David Norrish09/13/2019, 2:51 AM
Joe Schmid09/13/2019, 2:54 AM
David Norrish09/13/2019, 5:13 AM
step doesn't begin until all the
tasks have finished - even when there are cores sitting around doing nothing. Thanks for the loop suggestion, I'll look into that now!
Chris White09/13/2019, 4:03 PM
David Norrish09/15/2019, 11:27 PM