Hello everyone! After some experiments, we are sta...
# ask-community
g
Hello everyone! After some experiments, we are starting to switch to Prefect definitively and we need your help. Is there such a way to use Dask Temporary Cluster, created by Prefect with a
DaskExecutor
, inside a task in order to execute any workload on that Dask cluster? In our tasks we have to deal with huge pandas dataframe and we’d like to use the same Dask cluster to parallelize our computation. Any tip?
upvote 1
a
I think I have the exact same thing as you. You can use
distributed.worker_client
to fetch a client for the cluster your task is running on. Here’s a redacted piece of code from my task. The important part is passing the client to
dask_df.compute
Copy code
with worker_client() as client:
    dask_df = dd.from_pandas(df, chunksize=DEFAULT_CHUNKSIZE)
    logger.debug('Universe size: %d', len(df))
    dask_df['new_col'] = dask_df.apply(some_func, axis=1)
    return dask_df.compute(scheduler=client)
👍 1
g
Thanks Avi. And how you get the Dask cluster you created before with the DaskExecutor in the method
worker_client()
?
a
worker_client
is a Dask function
Copy code
from distributed import worker_client
but it’ll work only when this piece of code is executed as a task in a dask cluster. For me it poses a problem when I want to test my code locally, I couldn’t find a way to overcome this (but maybe I should just run local tests on a local dask cluster)
g
I’ll give it a try 🙏
a
np. lmk what you find out
k
Hi @Giovanni Giacco! @Avi A’s suggestion looks great but I’ll just throw another idea (though it’s not quite in a task) that you can have an outer flow orchestrating smaller flows, each with their own executor. You can then pass a function to create the cluster to the
DaskExecutor
and have that subflow run. You might also find this Doc helpful.
🚀 2
g
Thanks Kevin. The idea to have a “master” flow that orchestrate smaller flow is an idea I’m working on. Thanks for the link, too.