An Hoang
08/26/2019, 8:51 PMdask
in this channel and then issues on github? Is there going to be a Dask-Prefect FAQ page to synthesize all the common problems/tips and tricks when using both?Chris White
08/26/2019, 9:14 PMAn Hoang
08/27/2019, 1:14 AMdo_permutation(df)
that loads dataframe from file, do the permutation then output the result. Would I do:
python
@task
def do_permutation(csv_file):
df = ...#load data
for i in range(1_000_000):
#permute column A
#fit model
#get coefficient
#compare result with unpermuted coefficient
...
with Flow("permutation-flow") as flow:
results = do_permutation.map(csv_file_list)
executor = DaskExecutor(address="")
flow.run(executor=executor)
?
That would submit each dataframe to a dask-worker right? Which means I have parallellized at dataframe level. How would I structure sub-tasks of do_permutation
to also split the permutation in chunks and submit to the Dask executor? I'd imagine it would have to be one process that has the dataframe, and then multiple threads that use the same dataframe in memory for repeatedly doing chunks of permutation.do_permutation
function by breaking it down to smaller tasks?pipeline.py
is currently ran by a custom orchestrator.py
script I have written that processes all the files, submits chunks of 10k permutations as separate jobs (100 jobs per file), monitor the jobs and then combine the results. The permutations are also done in stages to limit wasting computational resources. We first do 1k permutations for all the files. After checking the results, some file needs more permutations and some files are done. The files that need more permutations then go through 10k, 100k, 1 million, 10 million permutation steps, with the number of files needing more permutation reducing at each step. The current orchestrator
has the following disadvantages:
1. Too tightly coupled with the cluster and the current workflow, since it needs to run shell commands and parse them in a specific way to check job status/completion
2. This hits disk I/O pretty hard due to all the output files being written concurrently.
3. It also isn’t aware of the dynamically available resources that change throughout time (eg. we can split into smaller permutation chunks when there are more nodes available, especially at the later permutation steps, where the number of permutations is high and there are less files)Chris White
08/27/2019, 6:12 PMAn Hoang
08/27/2019, 6:18 PMChris White
08/27/2019, 7:41 PMresults = do_permutation.map(csv_file_list)
will parallelize at the dataframe level. To naively parallelize further to the permutation level, you could do:
results = do_single_permutation.map(csv_file_list, upstream_tasks=[Constant(list(range(1_000_000)))])
but this would involve reading the csv file for every single permutation which would quickly eat up your disk I/OAn Hoang
08/27/2019, 9:51 PMChris White
08/27/2019, 9:51 PM