Matt Morris

12/27/2022, 9:06 PM
Hey everyone, just upgraded from prefect 1 to 2 and enjoying a lot of the new features, especially the
! However, I am noticing that the speed of using
is slower than using
(prefect 1). In both cases I was using processes rather than threads. Speed depends on the task, but some tasks that read from s3 are taking a little over twice as long with the
. Has anyone faced something or similar, or any pointers as to why I may be experiencing this? Thanks!
๐Ÿ‘€ 1

Andrew Huang

12/27/2022, 9:22 PM
do you happen to use dask.dataframe or dask collections? if so, maybe:

Matt Morris

12/27/2022, 9:32 PM
Thanks! I'll have to try this out. I do read parquet files from s3 with dask and then compute them as a pandas DataFrame after performing some transformations, so this may help.
๐Ÿš€ 1
Ah, well that made it take actually quite a bit longer. When I get a moment, I can provide simplified versions of my code for prefect 1 vs. prefect 2, so it's clear what I'm doing and perhaps you'll notice where I'm doing something off.
๐Ÿ‘€ 1
Well I cut down my scripts for the sake of example here. I recognize you don't have access to the s3 files I'm pointing to, but I wonder if simply seeing the scripts will trigger any thoughts about performance. Task (common between both versions):
import functools
import os
import time

import dask.dataframe as dd
from prefect import task

def time_execution(func):
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        duration = time.perf_counter() - start
        print(f"{func.__name__} ({duration:.3f} seconds)")
        return result

    return wrapper

def load_data(part: int) -> None:
    files = [
        f"<s3://my_bucket/2022/01/{d:02d}/part-{part:04}.parquet>" for d in range(1, 5)

    df = dd.read_parquet(
            "key": os.environ["AWS_ACCESS_KEY_ID"],
            "secret": os.environ["AWS_SECRET_ACCESS_KEY"],
    # Actual pipeline does something with DataFrame, but we'll just print the head
Prefect 1:
from typing import List

from prefect import Flow
from prefect.executors import LocalDaskExecutor

def example(parts: List[int]):
    with Flow(
            cluster_kwargs={"n_workers": 8}
    ) as flow:

if __name__ == "__main__":
Prefect 2:
from prefect import flow
from prefect_dask import DaskTaskRunner

        cluster_kwargs={"n_workers": 8},
def example(parts: List[int]):

if __name__ == "__main__":
In this simple example and with my data, I'm seeing an average of 11.6 seconds for each prefect 1 task and 12.6 seconds for each prefect 2 task. That difference isn't much, but in my actual pipeline with much bigger data and more transformations going on, there is a greater disparity. Is there perhaps something I'm doing wrong in my code that jumps out? Thank you.

Andrew Huang

12/28/2022, 4:45 PM
Thanks for sharing an example! I wonder if it has anything to do with Prefect storing results / caching. Also, I wonder if itโ€™d be beneficial for you to use threads since this example is I/O bound.
Also, would you mind submitting an issue here so this doesnโ€™t get lost in the thread and gain more visibility?

Matt Morris

12/28/2022, 6:08 PM
Thanks! I will clean everything up and submit an issue with perhaps some publicly available data, and performance results for both threads and processes with prefect 1 and prefect 2.
๐Ÿ™Œ 1

Andrew Huang

12/28/2022, 6:17 PM
Thanks so much!
๐Ÿ‘ 1
Created an issue here to track this; feel free to add to it!