Maybe this is more than a prefect question. I use ...
# ask-community
e
Maybe this is more than a prefect question. I use prefect to run batch job every day. Usually I have a lot of files like this: file_1, file_2, … file_100. Each processing task is independent to each other.  In order to parallelize the computation, I have several attempts: • Option 1: Single flow + Single agent + Dask + Dask workers: One flow processes multiple files. 1-on-1 mapping between task and file.
Copy code
@task
process_file(x):

with Flow(xxx):
  [process_file(x) for x in file_names]
It works fine with LocalRun ◦ pro: Easy to set up, dask has the friendly api to control resource ◦ con: I have to set up the same running env for every worker added in this dask cluster.  • Option 2: Multiple flows + multiple agents: 1-on-1 mapping between flow and file e.g I could create 10 docker agents running on 10 hosts. Then in a script I create and run 100 flows, each flow processes one file. Let prefect distribute the flow for me ◦ pro: computation module is shipped with docker image, it is set up free. ◦ con: Not sure whether prefect is supposed to do the work load distribution duty. Even if yes, it is hard to control the resource consumption • Option 3: Single flow + K8s:   Build image for my computation module and register with K8S first. Within one flow, it create k8s tasks which request for creating 100 pods to process the file.  ◦ pro:  ▪︎ k8s could deal with the workload distribution. Adding nodes could be easy.  ▪︎ Any agent would be file as long as it could talk to k8s api ◦ con: complexity of setting up k8s? Appreciate for any thoughts and input here!
k
Hi @Enda Peng! I think the easiest approach is to use Dask parallelization using Prefect’s map. Create a task that handles processing one file and use
.map()
to parallelize across a DaskExecutor. Getting the Dask cluster is a separate issue but my personally experience has been that it’s very easy to get one up and running with Coiled (and they’re in free trial) at the moment.
Either pass the function for cluster creation to
DaskExecutor
or connect by passing the address.
j
Copy code
@task
def get_file_list():
   ...
   return file_list

with Flow(...):
    file_list = get_file_list()
    processed = process_file.map(file_list)
    update_db(processed)
I've been using this sort of pattern which works great. Using a
LocalDaskExecutor
for local testing and spinning up Fargate Tasks with the
DaskExecutor
for production.
e
I agree that Dask is easy to set up. It also makes the programming easy. My experience is that I have to install the packages for every worker in order to make it work.