https://prefect.io logo
Title
t

Tom Forbes

05/17/2021, 8:36 PM
Hey, We’re evaluating Prefect and I had a few questions about how you’re meant to structure mapping tasks that use a Dask executor. I’ve got the following simple workflow:
@task()
def run_export():
    prefix = trigger_export()
    return dask.dataframe.read_parquet(f'{prefix}/*.parquet')

@task()
def download_image(uuid):
    s3 = get_boto_client("s3")
    img = imageio.imread(s3.get_object(Bucket="..", Key=f"{uuid}")["Body"])
    return skimage.resize(img, (1024, 1024))

@task()
def save_to_s3(result_df):
    result_df.save("<s3://bucket/output_prefix/>")

with Flow("something") as flow:
    df = run_export()
    images_df = download_images.map(df["uuid"])
    save_to_s3(images_df)
Basically I want to grab a dataframe from somewhere, download a bunch of images from S3, resize them, and attach them to the dataframe as a new column, then save the frame somewhere else to S3.