Fabio Grätz
09/06/2021, 4:16 PM@task
def load_image_from_bucket(fn):
# Load image img from filepath fn in bucket
return img, fn
@task
def rotate_image(x):
img, fn = x
return img.rotate(90), fn
@task
def save_image_to_bucket(x):
# Save rotated image to new bucket
The flow looks as follows:
with Flow(
'Load images from bucket',
run_config=KubernetesRun(),
executor=DaskExecutor(...)
):
# Get list of image uris `imgs`
loaded_images = load_image_from_bucket.map(imgs)
rotated_images = rotate_image.map(loaded_images)
save_image_to_bucket.map(rotated_images)
According to the docs, this should create parallel pipelines (without reducing after getting e.g. loaded_images
):
However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
From what I observe when using the `DaskExecutor(…)`and applying the above flow to 500 images, all workers only download images at first and only start rotating once all have been loaded. It either takes multiple minutes until they start rotating and uploading but often the flow just crashes before the flow reaches that point.
I would want depth-first execution: a worker downloads an image, rotates it, pushes it to the bucket, forgets about this image and moves on to the next one.
This appears to be (roughly) the case when running with the following config using executor=LocalDaskExecutor()
instead of an ephemeral dask cluser in k8s:
with Flow("name") as flow:
...
state = flow.run(executor=LocalDaskExecutor())
Transforming 500 images in a bucket then takes only ~50s.
Is there a way to influence the order in which the tasks are executed when using the DaskExecutor with an ephemeral dask cluster in k8s?
Thanks a lot!Kevin Kho
Fabio Grätz
09/06/2021, 4:29 PMKevin Kho
Fabio Grätz
09/06/2021, 4:30 PMLocalDaskExecutor
.
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[98]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[92]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[91]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[99]': Starting task run...
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[98]': Starting task run...
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[98]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[95]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[98]': Starting task run...
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[94]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[93]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[97]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[99]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[96]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[99]': Starting task run...
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'rotate_image[99]': Finished task run for task with final state: 'Success'
[2021-09-06 18:01:27+0200] INFO - prefect.TaskRunner | Task 'save_image_to_bucket[98]': Finished task run for task with final state: 'Success'
Fabio Grätz
09/06/2021, 4:31 PM[2021-09-06 15:50:34+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[330]': Finished task run for task with final state: 'Success'
[2021-09-06 15:50:34+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[327]': Starting task run...
[2021-09-06 15:50:35+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[327]': Finished task run for task with final state: 'Success'
[2021-09-06 15:50:36+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[324]': Starting task run...
[2021-09-06 15:50:37+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[324]': Finished task run for task with final state: 'Success'
[2021-09-06 15:50:37+0000] INFO - prefect.TaskRunner | Task 'load_image_from_bucket[321]': Starting task run...
Kevin Kho
Kevin Kho
Fabio Grätz
09/06/2021, 4:41 PMAnd then maybe add it in this line
Can’t see in the thread what I am supposed to add.Kevin Kho
fifo_timeout=500
or somethingKevin Kho
Fabio Grätz
09/07/2021, 7:04 AM