Josselin Girault

    Josselin Girault

    1 year ago
    Hi. I'm curious about how Dask and Prefect behave together. If I have a dask.dataframe as the input of my prefect flow and want to both narrow and wide operations on the dataframe, how do I go about implementing it ? Since all prefect tasks are submitted as dask.delayed objects, I can't directly use the dask API on my dask.dataframe inside the tasks as it's unreliable to use dask calls in dask calls. If I make so the narrow operations are mapped tasks, how do I write a groupby efficiently ? The results of the mapped tasks will be reduced and the groupby will be done on a single node. Both Dask documentation (https://examples.dask.org/applications/prefect-etl.html) and Prefect documentation (https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html) do not answer this issue. I haven't found an example of how to use dask objects inside a prefect task, or how to implement wide operations in pure prefect. It seems that by using Dask as its engine, Prefect prevents the use of the Dask API to do transformations.
    Chris White

    Chris White

    1 year ago
    Hi Josselin - check out this discussion for some partial recommendations: https://github.com/PrefectHQ/prefect/discussions/3022
    Josselin Girault

    Josselin Girault

    1 year ago
    Seems like removing checkpointing could possibly work for me, thanks. Then I'm guessing that it's impossible to use the
    DaskExecutor
    to parallelize mapped tasks while using
    dd.worker_client()
    to parallelize aggregate operations inside another task ? I'm not sure. I don't think it's something you would want to do anyway, but I'd appreciate a definitive answer if you have it 🙂