Hey prefect community, I’m building a prototype o...
# ask-community
f
Hey prefect community, I’m building a prototype of an ETL pipeline that is supposed to be run on dask/kubernetes. Let’s assume the dummy ETL pipeline is supposed to download 50k images from a bucket, rotate them, and write them to another bucket. To do so, I created three separate tasks:
Copy code
@task
def load_image_from_bucket(fn):
    # Load image img from filepath fn in bucket
    return img, fn

@task
def rotate_image(x):
    img, fn = x
    return img.rotate(90), fn

@task
def save_image_to_bucket(x):
    # Save rotated image to new bucket
The flow looks as follows:
Copy code
with Flow(
            'Load images from bucket',
            run_config=KubernetesRun(),
            executor=DaskExecutor(...)
          ):
    # Get list of image uris `imgs`
    loaded_images = load_image_from_bucket.map(imgs)
    rotated_images = rotate_image.map(loaded_images)
    save_image_to_bucket.map(rotated_images)
According to the docs, this should create parallel pipelines (without reducing after getting e.g.
loaded_images
):
Copy code
However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
From what I observe when using the `DaskExecutor(…)`and applying the above flow to 500 images, all workers only download images at first and only start rotating once all have been loaded. It either takes multiple minutes until they start rotating and uploading but often the flow just crashes before the flow reaches that point. I would want depth-first execution: a worker downloads an image, rotates it, pushes it to the bucket, forgets about this image and moves on to the next one. This appears to be (roughly) the case when running with the following config using
executor=LocalDaskExecutor()
instead of an ephemeral dask cluser in k8s:
Copy code
with Flow("name") as flow:
   ...

state = flow.run(executor=LocalDaskExecutor())
Transforming 500 images in a bucket then takes only ~50s. Is there a way to influence the order in which the tasks are executed when using the DaskExecutor with an ephemeral dask cluster in k8s? Thanks a lot!
k
Hey @Fabio Grätz so my understanding is that you are expecting depth-first execution but it's doing breadth first execution. So this is actually a bit surprising to me because it's normally the other way around where people complain about LocalDaskExecutor being breadth-first. So the short answer is that Dask decides which tasks to run first, but there is no way to force depth-first execution, if you're willing to try, I think there is something you can edit in the Prefect code. Let me look for it one sec. Also, would appreciate it if you could move some of these code/tracebacks to the thread to keep the main channel clean as some users like to read through the threads 🙂
f
Thanks for the quick reply! Will move the tracebacks to the thread 👍
k
For the Dask Theory behind prioritization
f
Images are loaded, rotated and saved to the backed (roughly) in an alternating way when using the
LocalDaskExecutor
.
Copy code
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[98]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[92]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[91]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[99]': Starting task run...
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[98]': Starting task run...
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[98]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[95]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[98]': Starting task run...
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[94]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[93]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[97]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[99]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[96]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[99]': Starting task run...
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[99]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[98]': Finished task run for task with final state: 'Success'
Execution is breadth first when using the `DaskExecutor`:
Copy code
[2021-09-06 15:50:34+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[330]': Finished task run for task with final state: 'Success'
[2021-09-06 15:50:34+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[327]': Starting task run...
[2021-09-06 15:50:35+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[327]': Finished task run for task with final state: 'Success'
[2021-09-06 15:50:36+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[324]': Starting task run...
[2021-09-06 15:50:37+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[324]': Finished task run for task with final state: 'Success'
[2021-09-06 15:50:37+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[321]': Starting task run...
k
And then maybe add it in this line
This is pretty experimental, I’m not sure if it will help. 100 ms is the default value, and if you increase it, then maybe you could have more batching of tasks, and then it would run in a breadth first way. Would you be willing to test?
f
Yes, happy to try! But did something get lost here?
And then maybe add it in this line
Can’t see in the thread what I am supposed to add.
k
Oh sorry add
fifo_timeout=500
or something
You’ll see it in the Dask link
f
Thanks, will try!