Thread
#prefect-community
    Fabio Grätz

    Fabio Grätz

    1 year ago
    Hey prefect community, I’m building a prototype of an ETL pipeline that is supposed to be run on dask/kubernetes. Let’s assume the dummy ETL pipeline is supposed to download 50k images from a bucket, rotate them, and write them to another bucket. To do so, I created three separate tasks:
    @task
    def load_image_from_bucket(fn):
        # Load image img from filepath fn in bucket
        return img, fn
    
    @task
    def rotate_image(x):
        img, fn = x
        return img.rotate(90), fn
    
    @task
    def save_image_to_bucket(x):
        # Save rotated image to new bucket
    The flow looks as follows:
    with Flow(
                'Load images from bucket',
                run_config=KubernetesRun(),
                executor=DaskExecutor(...)
              ):
        # Get list of image uris `imgs`
        loaded_images = load_image_from_bucket.map(imgs)
        rotated_images = rotate_image.map(loaded_images)
        save_image_to_bucket.map(rotated_images)
    According to the docs, this should create parallel pipelines (without reducing after getting e.g.
    loaded_images
    ):
    However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
    From what I observe when using the DaskExecutor(…)and applying the above flow to 500 images, all workers only download images at first and only start rotating once all have been loaded. It either takes multiple minutes until they start rotating and uploading but often the flow just crashes before the flow reaches that point. I would want depth-first execution: a worker downloads an image, rotates it, pushes it to the bucket, forgets about this image and moves on to the next one. This appears to be (roughly) the case when running with the following config using
    executor=LocalDaskExecutor()
    instead of an ephemeral dask cluser in k8s:
    with Flow("name") as flow:
       ...
    
    state = flow.run(executor=LocalDaskExecutor())
    Transforming 500 images in a bucket then takes only ~50s. Is there a way to influence the order in which the tasks are executed when using the DaskExecutor with an ephemeral dask cluster in k8s? Thanks a lot!
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Fabio Grätz so my understanding is that you are expecting depth-first execution but it's doing breadth first execution. So this is actually a bit surprising to me because it's normally the other way around where people complain about LocalDaskExecutor being breadth-first. So the short answer is that Dask decides which tasks to run first, but there is no way to force depth-first execution, if you're willing to try, I think there is something you can edit in the Prefect code. Let me look for it one sec. Also, would appreciate it if you could move some of these code/tracebacks to the thread to keep the main channel clean as some users like to read through the threads 🙂
    Fabio Grätz

    Fabio Grätz

    1 year ago
    Thanks for the quick reply! Will move the tracebacks to the thread 👍
    Kevin Kho

    Kevin Kho

    1 year ago
    For the Dask Theory behind prioritization
    Fabio Grätz

    Fabio Grätz

    1 year ago
    Images are loaded, rotated and saved to the backed (roughly) in an alternating way when using the
    LocalDaskExecutor
    .
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[98]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[92]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[91]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[99]': Starting task run...
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[98]': Starting task run...
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[98]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[95]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[98]': Starting task run...
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[94]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[93]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[97]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[99]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[96]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[99]': Starting task run...
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[99]': Finished task run for task with final state: 'Success'
    [2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[98]': Finished task run for task with final state: 'Success'
    Execution is breadth first when using the DaskExecutor:
    [2021-09-06 15:50:34+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[330]': Finished task run for task with final state: 'Success'
    [2021-09-06 15:50:34+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[327]': Starting task run...
    [2021-09-06 15:50:35+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[327]': Finished task run for task with final state: 'Success'
    [2021-09-06 15:50:36+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[324]': Starting task run...
    [2021-09-06 15:50:37+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[324]': Finished task run for task with final state: 'Success'
    [2021-09-06 15:50:37+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[321]': Starting task run...
    Kevin Kho

    Kevin Kho

    1 year ago
    And then maybe add it in this line
    This is pretty experimental, I’m not sure if it will help. 100 ms is the default value, and if you increase it, then maybe you could have more batching of tasks, and then it would run in a breadth first way. Would you be willing to test?
    Fabio Grätz

    Fabio Grätz

    1 year ago
    Yes, happy to try! But did something get lost here?
    And then maybe add it in this line
    Can’t see in the thread what I am supposed to add.
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh sorry add
    fifo_timeout=500
    or something
    You’ll see it in the Dask link
    Fabio Grätz

    Fabio Grätz

    1 year ago
    Thanks, will try!