Viet Nguyen
07/29/2022, 6:25 AMxarray
dataset conversion, using DaskTaskRunner
, the created dask client (task_runner
) doesn't automatically pickup the xarray
functions such as xarray.dataset.concat()
or <http://xarray.dataset.to|xarray.dataset.to>_zarr
to fill up the works to all CPU threads like without Prefect. To bring the tasks to dask client with Prefect, I have to explicitly submit the task with @task
decorator and calling .submit()
This way, however, disallow xarray
functions to be executed in all CPU threads, only the wrapped task function to be executed in individual thread, which is not ideal. If anyone has come across this issue before and have a solution, much appreciated . Thanks a lot.some_xarray_dataset.to_zarr(store, mode='w', consolidated=True)
the plain Python statement above, when using Dask without Prefect, dask will automatically pick up the work to perform in parallel with multiple processes in all threads. But with Prefect, the task_runner
(DaskTaskRunner
) won't able to. So I have to wrap it in a task function that looks like:
@task
def xr_to_zarr(dataset):
dataset.to_zarr(store, mode='w', consolidated=True)
this way, the xr_to_zarr
task will be added to Dask graph, however, it will be executed in a single CPU thread, although it should be many many tasks of .to_zarr()
could be performed in all threads
Another attempt I tried:
@dask.delayed
def xr_to_zarr(dataset):
z = dataset.to_zarr(store, mode='w', consolidated=True, compute=False)
return z
then in the flow
I'd call it like z = xr_to_zarr(dataset)
, this returns a Dask delayed object, then I run z.compute()
, it eventually runs, I can see the worker's memory filled up, but nothing showing up in Task Stream and Progress lists from the Dask dashboard created by DaskTaskRunner
, so I wonder if this is a bug or z.compute()
running somewhere else?Anna Geller
07/29/2022, 12:13 PMViet Nguyen
07/29/2022, 12:37 PMto_zarr
generating when it's executed, depends on the datasets, number of variables etc.Anna Geller
07/29/2022, 12:46 PMfrom distributed import Client
from prefect import flow
def inc(x):
return x + 1
@flow
def run_some_dask():
client = Client('127.0.0.1:8786')
x = client.submit(inc, 10)
Viet Nguyen
07/29/2022, 12:52 PMDaskTaskRunner
thank you @Anna GellerAnna Geller
07/29/2022, 1:44 PM