Matt Morris
12/27/2022, 9:06 PMConcurrentTaskRunner
! However, I am noticing that the speed of using prefect_dask.DaskTaskRunner
is slower than using prefect.executors.DaskExecutor
(prefect 1). In both cases I was using processes rather than threads. Speed depends on the task, but some tasks that read from s3 are taking a little over twice as long with the DaskTaskRunner
. Has anyone faced something or similar, or any pointers as to why I may be experiencing this? Thanks!Andrew Huang
12/27/2022, 9:22 PMMatt Morris
12/27/2022, 9:32 PMimport functools
import os
import time
import dask.dataframe as dd
from prefect import task
def time_execution(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
duration = time.perf_counter() - start
print(f"{func.__name__} ({duration:.3f} seconds)")
return result
return wrapper
@task
@time_execution
def load_data(part: int) -> None:
files = [
f"<s3://my_bucket/2022/01/{d:02d}/part-{part:04}.parquet>" for d in range(1, 5)
]
df = dd.read_parquet(
files,
storage_options={
"key": os.environ["AWS_ACCESS_KEY_ID"],
"secret": os.environ["AWS_SECRET_ACCESS_KEY"],
}
).compute()
# Actual pipeline does something with DataFrame, but we'll just print the head
print(df.head())
Prefect 1:
from typing import List
from prefect import Flow
from prefect.executors import LocalDaskExecutor
def example(parts: List[int]):
with Flow(
"example",
executor=LocalDaskExecutor(
scheduler="processes",
cluster_kwargs={"n_workers": 8}
)
) as flow:
load_data.map(parts)
flow.run()
if __name__ == "__main__":
example(list(range(8)))
Prefect 2:
from prefect import flow
from prefect_dask import DaskTaskRunner
@flow(
task_runner=DaskTaskRunner(
cluster_class=LocalCluster,
cluster_kwargs={"n_workers": 8},
),
validate_parameters=False,
)
def example(parts: List[int]):
load_data.map(parts)
if __name__ == "__main__":
example(list(range(8)))
In this simple example and with my data, I'm seeing an average of 11.6 seconds for each prefect 1 task and 12.6 seconds for each prefect 2 task. That difference isn't much, but in my actual pipeline with much bigger data and more transformations going on, there is a greater disparity. Is there perhaps something I'm doing wrong in my code that jumps out? Thank you.Andrew Huang
12/28/2022, 4:45 PMMatt Morris
12/28/2022, 6:08 PMAndrew Huang
12/28/2022, 6:17 PM