Hello everyone! Does anyone have worked with dask ...
# ask-community
m
Hello everyone! Does anyone have worked with dask `futures`within prefect flows context? I've basically want to perfom some future computations in order to parallelized parquet reading and dataframe concatenation. The following snippet shows a MWE using dask futures and client.
Copy code
import dask.dataframe as dd
from dask.distributed import Client
from s3fs import S3FileSystem

s3 = S3FileSystem()
client = Client()
folder_list = [
    "file1",
    "file2",
    "file3",
    "file4",
    "file5",
    "file6",
    "file7",
    "file8",
]
file_list = list(
    map(lambda folder: f"<s3://my-bucket/parquet/{folder}/*.parquet>", folder_list,)
)
dataframe_list = client.map(dd.read_parquet, file_list, gather_statistics=False)

dataframe = client.submit(dd.concat, dataframe_list)

mean_value = client.submit(lambda x: ["some_data_column"].mean(), dataframe)

mean_compute = client.submit(lambda x: x.compute(), mean_value)

print(mean_compute.result())
a
Can you describe a bit more what is your question/problem to be solved? I understood it this way: you have a working prototype using just Dask client and you want to translate it into a Prefect flow with mapping - is that correct?
1
m
Yes, that's correct. As a matter of a fact once I implement this using prefect.task (I understand these are delayed objects), job performance drops drastically since only uses one worker to read each path one the folder list (
folder_list
on the snipped above).
And that's why I'm trying to use
futures
instead of
dealayed
a
Would something like this work for you?
Copy code
import awswrangler as wr
import pandas as pd
from prefect import task, Flow
from prefect.executors import DaskExecutor
from typing import List


@task
def get_folder_list() -> List[str]:
    return [
        "file1",
        "file2",
        "file3",
        "file4",
        "file5",
        "file6",
        "file7",
        "file8",
    ]


@task
def process_parquet_file(file_name: str) -> pd.DataFrame:
    return wr.s3.read_parquet(f"<s3://my-bucket/parquet/{file_name}/*.parquet>")


@task
def concat_dfs(list_of_dfs: List[pd.DataFrame]) -> None:
    final_df = pd.concat(list_of_dfs, ignore_index=True)
    print(final_df["some_data_column"].mean())


with Flow("flow_run_test", executor=DaskExecutor()) as flow:
    folder_list = get_folder_list()
    list_of_dfs = process_parquet_file.map(folder_list)
    concat_dfs(list_of_dfs)
k
If you use the DaskExecutor, it should use all available workers. The Prefect task uses
client.submit()
to execute. There is also some optimization so that consecutive mapped tasks are submitted together so it might be better. But if you really want to do
client.submit
or work with a dask dataframe, you can follow this
You shouldn’t need
client.submit
when working with a Dask dataframe
m
@Anna Geller That'd work, however, I'm working with dask dataframes, (
from dask.dataframe.core import DataFrame
objects like). I tried something similar, although couldn't get parallel reading while fetching parquet files,
@Kevin Kho I found something while checking dask issues, looks like it isn't possible to read a set o parquet folder in a parallel fashion. https://github.com/dask/dask/issues/3892
a
With mapping you can read and process your parquet files in parallel without dask dataframes. If this didn’t work in parallel, then your executor is not configured correctly. Perhaps you can start by using LocalDaskExecutor() to confirm that it processes your files in parallel?