hi everyone, I'm using Prefect 2.0 to orchestrate ...
# prefect-community
v
hi everyone, I'm using Prefect 2.0 to orchestrate
xarray
dataset conversion, using
DaskTaskRunner
, the created dask client (
task_runner
) doesn't automatically pickup the
xarray
functions such as
xarray.dataset.concat()
or
<http://xarray.dataset.to|xarray.dataset.to>_zarr
to fill up the works to all CPU threads like without Prefect. To bring the tasks to dask client with Prefect, I have to explicitly submit the task with
@task
decorator and calling
.submit()
This way, however, disallow
xarray
functions to be executed in all CPU threads, only the wrapped task function to be executed in individual thread, which is not ideal. If anyone has come across this issue before and have a solution, much appreciated . Thanks a lot.
1
For example what I'm trying to say:
Copy code
some_xarray_dataset.to_zarr(store, mode='w', consolidated=True)
the plain Python statement above, when using Dask without Prefect, dask will automatically pick up the work to perform in parallel with multiple processes in all threads. But with Prefect, the
task_runner
(
DaskTaskRunner
) won't able to. So I have to wrap it in a task function that looks like:
Copy code
@task
def xr_to_zarr(dataset):
  dataset.to_zarr(store, mode='w', consolidated=True)
this way, the
xr_to_zarr
task will be added to Dask graph, however, it will be executed in a single CPU thread, although it should be many many tasks of
.to_zarr()
could be performed in all threads Another attempt I tried:
Copy code
@dask.delayed
def xr_to_zarr(dataset):
  z = dataset.to_zarr(store, mode='w', consolidated=True, compute=False)
  return z
then in the
flow
I'd call it like
z = xr_to_zarr(dataset)
, this returns a Dask delayed object, then I run
z.compute()
, it eventually runs, I can see the worker's memory filled up, but nothing showing up in Task Stream and Progress lists from the Dask dashboard created by
DaskTaskRunner
, so I wonder if this is a bug or
z.compute()
running somewhere else?
a
can you move the functionality into the task?
you can also use Dask client directly within Prefect 2.0 - you can run any Python there, including Dask code
🙌 1
v
@Anna Geller hi there, thanks for replying, I don't think I can, it's just a bunch of stuff
to_zarr
generating when it's executed, depends on the datasets, number of variables etc.
Can you please provide me a dummy example for the proper way to call Dask client with Prefetch 2.0? That would work I assume, thanks a lot.
a
It's literally the same what you would do in Python
Copy code
from distributed import Client
from prefect import flow

def inc(x):
    return x + 1

@flow
def run_some_dask():
    client = Client('127.0.0.1:8786')
    x = client.submit(inc, 10)
🙌 1
v
perfect, will try and let you know the outcome, thanks @Anna Geller
works very nice, I didn't know that Dask client can be called directly, not through
DaskTaskRunner
thank you @Anna Geller
a
nice work!