Tom Forbes

05/17/2021, 8:36 PM
Hey, We’re evaluating Prefect and I had a few questions about how you’re meant to structure mapping tasks that use a Dask executor. I’ve got the following simple workflow:
def run_export():
    prefix = trigger_export()
    return dask.dataframe.read_parquet(f'{prefix}/*.parquet')

def download_image(uuid):
    s3 = get_boto_client("s3")
    img = imageio.imread(s3.get_object(Bucket="..", Key=f"{uuid}")["Body"])
    return skimage.resize(img, (1024, 1024))

def save_to_s3(result_df):"<s3://bucket/output_prefix/>")

with Flow("something") as flow:
    df = run_export()
    images_df =["uuid"])
Basically I want to grab a dataframe from somewhere, download a bunch of images from S3, resize them, and attach them to the dataframe as a new column, then save the frame somewhere else to S3.