Hey everyone, just upgraded from prefect 1 to 2 an...
# prefect-community
m
Hey everyone, just upgraded from prefect 1 to 2 and enjoying a lot of the new features, especially the
ConcurrentTaskRunner
! However, I am noticing that the speed of using
prefect_dask.DaskTaskRunner
is slower than using
prefect.executors.DaskExecutor
(prefect 1). In both cases I was using processes rather than threads. Speed depends on the task, but some tasks that read from s3 are taking a little over twice as long with the
DaskTaskRunner
. Has anyone faced something or similar, or any pointers as to why I may be experiencing this? Thanks!
๐Ÿ‘€ 1
a
do you happen to use dask.dataframe or dask collections? if so, maybe: https://github.com/PrefectHQ/prefect-dask#distributing-dask-collections-across-workers
m
Thanks! I'll have to try this out. I do read parquet files from s3 with dask and then compute them as a pandas DataFrame after performing some transformations, so this may help.
๐Ÿš€ 1
Ah, well that made it take actually quite a bit longer. When I get a moment, I can provide simplified versions of my code for prefect 1 vs. prefect 2, so it's clear what I'm doing and perhaps you'll notice where I'm doing something off.
๐Ÿ‘€ 1
Well I cut down my scripts for the sake of example here. I recognize you don't have access to the s3 files I'm pointing to, but I wonder if simply seeing the scripts will trigger any thoughts about performance. Task (common between both versions):
Copy code
import functools
import os
import time

import dask.dataframe as dd
from prefect import task


def time_execution(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        duration = time.perf_counter() - start
        print(f"{func.__name__} ({duration:.3f} seconds)")
        return result

    return wrapper


@task
@time_execution
def load_data(part: int) -> None:
    files = [
        f"<s3://my_bucket/2022/01/{d:02d}/part-{part:04}.parquet>" for d in range(1, 5)
    ]

    df = dd.read_parquet(
        files,
        storage_options={
            "key": os.environ["AWS_ACCESS_KEY_ID"],
            "secret": os.environ["AWS_SECRET_ACCESS_KEY"],
        }
    ).compute()
    
    # Actual pipeline does something with DataFrame, but we'll just print the head
    print(df.head())
Prefect 1:
Copy code
from typing import List

from prefect import Flow
from prefect.executors import LocalDaskExecutor

def example(parts: List[int]):
    with Flow(
        "example",
        executor=LocalDaskExecutor(
            scheduler="processes",
            cluster_kwargs={"n_workers": 8}
        )
    ) as flow:
        load_data.map(parts)

    flow.run()

if __name__ == "__main__":
    example(list(range(8)))
Prefect 2:
Copy code
from prefect import flow
from prefect_dask import DaskTaskRunner


@flow(
    task_runner=DaskTaskRunner(
        cluster_class=LocalCluster,
        cluster_kwargs={"n_workers": 8},
    ),
    validate_parameters=False,
)
def example(parts: List[int]):
    load_data.map(parts)


if __name__ == "__main__":
    example(list(range(8)))
In this simple example and with my data, I'm seeing an average of 11.6 seconds for each prefect 1 task and 12.6 seconds for each prefect 2 task. That difference isn't much, but in my actual pipeline with much bigger data and more transformations going on, there is a greater disparity. Is there perhaps something I'm doing wrong in my code that jumps out? Thank you.
a
Thanks for sharing an example! I wonder if it has anything to do with Prefect storing results / caching. Also, I wonder if itโ€™d be beneficial for you to use threads since this example is I/O bound.
Also, would you mind submitting an issue here so this doesnโ€™t get lost in the thread and gain more visibility? https://github.com/PrefectHQ/prefect-dask/issues
m
Thanks! I will clean everything up and submit an issue with perhaps some publicly available data, and performance results for both threads and processes with prefect 1 and prefect 2.
๐Ÿ™Œ 1
a
Thanks so much!
๐Ÿ‘ 1
Created an issue here to track this; feel free to add to it! https://github.com/PrefectHQ/prefect-dask/issues/61