https://prefect.io logo
Title
a

An Hoang

08/26/2019, 8:51 PM
Hi, to read all the related material on dask's interaction with prefect, do we just search for
dask
in this channel and then issues on github? Is there going to be a Dask-Prefect FAQ page to synthesize all the common problems/tips and tricks when using both?
c

Chris White

08/26/2019, 9:14 PM
Hi @An Hoang! We have this document which covers some things (but probably not everything): https://docs.prefect.io/guide/tutorials/dask-cluster.html; I’d be happy to host something like an FAQ if we have enough content for it. Is there a particular question you have in mind?
a

An Hoang

08/27/2019, 1:14 AM
Hi @Chris White! Sorry if this too vague of a question, I can provide more details if needed. I'm a scientist trying to incorporate Prefect into my workflow. At our scientific institute we have a HPC system (LSF). The API of the HPC to check job status/success/failure is not very good so I want to use Dask + Prefect to bypass having to submit many jobs to the cluster and monitor them from the shell, but use only Python. One of my main pipeline is essentially a permutation test script that needs to be ran on about 1000 dataframes, each dataframe needs 10 million permutations. I want to make it parallel at the dataframe level and the permutation level (do permutations in 1k chunks and then combine results) Let's say I have a function
do_permutation(df)
that loads dataframe from file, do the permutation then output the result. Would I do:
python
@task
def do_permutation(csv_file):
   df = ...#load data
   for i in range(1_000_000):
      #permute column A
      #fit model
      #get coefficient
      #compare result with unpermuted coefficient
...

with Flow("permutation-flow") as flow:
   results = do_permutation.map(csv_file_list)

executor = DaskExecutor(address="")
flow.run(executor=executor)
? That would submit each dataframe to a dask-worker right? Which means I have parallellized at dataframe level. How would I structure sub-tasks of
do_permutation
to also split the permutation in chunks and submit to the Dask executor? I'd imagine it would have to be one process that has the dataframe, and then multiple threads that use the same dataframe in memory for repeatedly doing chunks of permutation.
This is in the case of running one flow for 1k files, should I have ran 1k flows each for one file, using the same Dask cluster and then parallelize the permutations from within the
do_permutation
function by breaking it down to smaller tasks?
The workflow,
pipeline.py
is currently ran by a custom
orchestrator.py
script I have written that processes all the files, submits chunks of 10k permutations as separate jobs (100 jobs per file), monitor the jobs and then combine the results. The permutations are also done in stages to limit wasting computational resources. We first do 1k permutations for all the files. After checking the results, some file needs more permutations and some files are done. The files that need more permutations then go through 10k, 100k, 1 million, 10 million permutation steps, with the number of files needing more permutation reducing at each step. The current
orchestrator
has the following disadvantages: 1. Too tightly coupled with the cluster and the current workflow, since it needs to run shell commands and parse them in a specific way to check job status/completion 2. This hits disk I/O pretty hard due to all the output files being written concurrently. 3. It also isn’t aware of the dynamically available resources that change throughout time (eg. we can split into smaller permutation chunks when there are more nodes available, especially at the later permutation steps, where the number of permutations is high and there are less files)
@Chris White I know this is very specific to my workflow, just want to find out whether/what Prefect and Dask can help me with this. I have read most of Dask's documentation and played around with it. Less so for Prefect. Any high-level tips are very much appreciated!
c

Chris White

08/27/2019, 6:12 PM
Hi An, very sorry for the delay - I have a very busy day but will respond to you shortly!
a

An Hoang

08/27/2019, 6:18 PM
No worries!!! So sorry I didn't mean to rush, just wanted to add a ping.
👍 1
c

Chris White

08/27/2019, 7:41 PM
Ok yea, your use case is very interesting; I have a few ideas. First off, you are correct that
results = do_permutation.map(csv_file_list)
will parallelize at the dataframe level. To naively parallelize further to the permutation level, you could do:
results = do_single_permutation.map(csv_file_list, upstream_tasks=[Constant(list(range(1_000_000)))])
but this would involve reading the csv file for every single permutation which would quickly eat up your disk I/O
to get smarter about it, you could use some dask-specific things: - create an upstream task that reads in all the dataframes and persists (or scatters) them to your dask cluster using a worker client - directly call dask from within your permutation task (e.g., dask dataframe) - just remember that this task is already running on a dask worker, so you’ll need to use a worker client instead of a normal client
just remember that all prefect will see will be whatever you specify as a Prefect Task --> you could wrap this whole workflow into a single task, or have each permutation be an individual task, it just depends on the level of granularity you want in your status reporting
a

An Hoang

08/27/2019, 9:51 PM
hi @Chris White, thanks so much for the suggestions! I'll play with it until I get a minimum example and see what happens.
c

Chris White

08/27/2019, 9:51 PM
Anytime, please check back in if you run into any issues!