A question on the usage of prefect and dask. Say ...
# ask-community
t
A question on the usage of prefect and dask. Say I rely on an external package that is capable of some heavy computation, and that package uses strictly dask. It has an entry point like:
Copy code
def predict_with_dask(..., dask_client):
    results = dask_client.compute([big_func(a) for a in range(200)]
    more_results = dasl_client.compute([bigger_func(f) for r in results]
Now lets say I have a prefect task/flow like
Copy code
@task
def my_task():
    with get_dask_client() as client:
    return crystalball_predict(
            ms=ms,
            dask_client=client,
        )

@flow
def my_flow():
    biggest = [my_task.submit(blah) for blah in range(200)]
I am running my flow with a dask.distributed cluster that is deployed on a large HPC of +500 nodes. What is the expected scaling here? Does a single
.compute()
from any one of the
my_task
functions cause all other prefect tasks to block? Does this set up allow parallel
.computes()
to run, or should there be some other way of invoking the parallelism of the many task delayed objects in the
predict_with_dask
delayed workerflow?.
b
In your Prefect code, we will use the
ThreadPoolTaskRunner
by default when you call
.submit
. This fires up a local thread pool inside the Prefect runner for that flow. So each task will get executed inside a ThreadPool with max threads specified by your settings (or
sys.maxsize
by default which basically lets us go as fast as the single worker will go). In your code, we'd dispatch all of the tasks to Dask immediately because we don't need to wait. Blocking only happens when we
wait
or
gather
results. I'd recommend switching up your Prefect code slightly to use the
DaskTaskRunner
instead of calling the
get_dask_client
inside each task. This will spawn a new instance of the client per task which is somewhat wasteful. Here's a sample of what I think you could run:
Copy code
from prefect_dask.task_runners import DaskTaskRunner
from dask.distributed import worker_client

@task
def crystalball_predict(ms):
    with worker_client() as dask_client:
        results = dask_client.compute([big_func(a) for a in range(200)])
        more_results = dask_client.compute([bigger_func(f) for r in results])

        return dask_client.gather(more_results)

@flow(task_runner=DaskTaskRunner())
def my_flow():
    biggest = [crystalball_predict.submit(blah) for blah in range(200)]
    wait(biggest)
Some docs that might be useful: • https://docs.prefect.io/v3/how-to-guides/workflows/run-work-concurrentlyhttps://distributed.dask.org/en/latest/api.html#distributed.get_worker
t
Thanks for the detailed reply Brendon. So, my initial code example was both poorly formatted and incomplete. I am currently using a
DaskTaskRunner
using a
dask_jobqueue.SLURMCluster
as Cluster implementation. So as far as that goes I think I am ok for the moment and my example is closer to your example then I first wrote, the only thing I had missing was the
client.gather
call - I mean this in both my example as well as the external package. I think I understand everything so far in your reply. I think my question boils down further to how many
Client.compute()
and/or
Client.gather()
can be executing at. time in a single distributed dask cluster instance? Should parallel
my_tasks
be running via a
.submit()
would the
Client.compute()
that they each hit allow all to run at once, or does it go in a FIFO like manner?
b
If you using
task_runner=DaskTaskRunner()
and calling
.submit
inside that method we will go as fast as Dask allows for parallelism 🙂 Your `.compute`/`.gather` calls shouldn't block the main flow loop assuming they get dispatched to Dask.
t
On the prefect
.submit
side of things indeed I think the submission of each task goes as fast as it van. I am currenlty on prefect 2 and need to moved to prefect 3. I believe the engine there would be even faster. The behaviour I am seeing is confusing. I created a separate prefect task that constructs a single dask DAG across all inputs, and I modified the enternal package to return the delayed objects rather than it calling the
Client.compute
itself. Once I collect all these delayed objects myself and calling the
Client.compute()
once across all objects I saw about a 2x speed up. The allocated workers were all running much closer to 100% cpu. If felt like that the dask
Client
I am using (retrieved within each prefect task via a
get_dask_client
call) only allows a single
,compute
to be running at a time. There is some interplay with this external package as well that I am trying to separate.