Tim-Oliver
04/06/2022, 2:07 PMDaskTaskRunner
. Somehow it gets stuck on the last task and will not finish.
This is my minimum example:
from prefect import task, flow
from prefect.tasks import task_input_hash
import time
from prefect.task_runners import SequentialTaskRunner, DaskTaskRunner
@task
def list_files():
return ["1.txt", "2.txt"]
@task
def preprocess(in_file: str, target_dir: str):
time.sleep(1)
return f"{target_dir}-{in_file}"
@task
def segment(in_file: str, target_dir: str):
return f"{target_dir}-{in_file}"
@task
def mask_bg(in_file: str, mask: str, target_dir: str):
return f"{target_dir}-mask-{in_file}"
@flow(name="Background Masking", task_runner=DaskTaskRunner())
def background_masking():
files = list_files()
preprocessed = []
segmented = []
masked = []
for f in files.wait().result():
preprocessed.append(preprocess(f, target_dir="/preprocessed/"))
segmented.append(segment(preprocessed[-1], target_dir="/segmented/"))
masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/masked/"))
if __name__ == "__main__":
result = background_masking()
If I run this locally (no remote file storage) with normal sqlite-db the processing does not finish. With SequentialTaskRunner
or ConcurrentTaskRunner
the script completes.prefect version
Version: 2.0b2
API version: 0.3.0
Python version: 3.7.13
Git commit: b2a048c2
Built: Thu, Mar 17, 2022 2:24 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.38.0
Anna Geller
Tim-Oliver
04/06/2022, 2:34 PMAnna Geller
prefect storage create # choose S3, set your bucket and set as default
then you can use it for flow storage, checkpointed flow results, as well as for writing custom data within your flowsKevin Kho
Tim-Oliver
04/06/2022, 2:52 PMKevin Kho
for
loop doesn’t have explicit dependencies on each other so they can be submitted out of order if we use the list index. like preprocessed[-1]
may not be aligned with the current loopTim-Oliver
04/06/2022, 2:55 PMmap
in prefect 2.0 🙂Kevin Kho
Tim-Oliver
04/06/2022, 3:02 PMKevin Kho
Tim-Oliver
04/06/2022, 3:15 PM@flow(name="Background Masking", task_runner=DaskTaskRunner())
def background_masking():
files = list_files()
preprocessed = []
segmented = []
masked = []
for f in files.wait().result():
preprocessed.append(preprocess(f, target_dir="/preprocessed/"))
for p in preprocessed:
segmented.append(segment(p.wait().result(), target_dir="/segmented/"))
for p, s in zip(preprocessed, segmented):
masked.append(mask_bg(p.wait().result(), s.wait().result(), target_dir="/masked/"))
Kevin Kho
Tim-Oliver
04/06/2022, 3:18 PMpreprocess
, segment
, and mask_bg
can still run in parallel, but there will be sync-wait times between these steps.(prefect_2)❯ python prefect_example_run.py
17:18:56.138 | INFO | prefect.engine - Created flow run 'hulking-gharial' for flow 'Background Masking'
17:18:56.138 | INFO | Flow run 'hulking-gharial' - Using task runner 'DaskTaskRunner'
17:18:56.333 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
17:18:57.996 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:8787/status>
17:18:58.068 | INFO | Flow run 'hulking-gharial' - Created task run 'list_files-806e65fc-0' for task 'list_files'
17:18:58.775 | INFO | Task run 'list_files-806e65fc-0' - Finished in state Completed(None)
17:18:58.801 | INFO | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-0' for task 'preprocess'
17:18:58.825 | INFO | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-1' for task 'preprocess'
17:18:58.856 | INFO | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-2' for task 'preprocess'
17:18:58.999 | INFO | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-3' for task 'preprocess'
17:19:00.110 | INFO | Task run 'preprocess-5b673088-3' - Finished in state Completed(None)
17:19:00.522 | INFO | Task run 'preprocess-5b673088-0' - Finished in state Completed(None)
17:19:00.550 | INFO | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-0' for task 'segment'
17:19:00.569 | INFO | Task run 'preprocess-5b673088-2' - Finished in state Completed(None)
17:19:00.580 | INFO | Task run 'preprocess-5b673088-1' - Finished in state Completed(None)
17:19:00.607 | INFO | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-1' for task 'segment'
17:19:00.638 | INFO | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-2' for task 'segment'
17:19:00.663 | INFO | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-3' for task 'segment'
17:19:00.743 | INFO | Task run 'segment-0f890022-1' - Finished in state Completed(None)
17:19:00.766 | INFO | Task run 'segment-0f890022-0' - Finished in state Completed(None)
17:19:00.785 | INFO | Task run 'segment-0f890022-2' - Finished in state Completed(None)
17:19:00.798 | INFO | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-0' for task 'mask_bg'
17:19:00.827 | INFO | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-1' for task 'mask_bg'
17:19:00.854 | INFO | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-2' for task 'mask_bg'
17:19:00.865 | INFO | Task run 'segment-0f890022-3' - Finished in state Completed(None)
17:19:00.888 | INFO | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-3' for task 'mask_bg'
17:19:00.959 | INFO | Task run 'mask_bg-368f0fa0-1' - Finished in state Completed(None)
17:19:00.971 | INFO | Task run 'mask_bg-368f0fa0-2' - Finished in state Completed(None)
17:19:01.056 | INFO | Task run 'mask_bg-368f0fa0-0' - Finished in state Completed(None)
17:19:01.131 | INFO | Task run 'mask_bg-368f0fa0-3' - Finished in state Completed(None)
17:19:01.815 | INFO | Flow run 'hulking-gharial' - Finished in state Completed('All states completed.')
map
once it is released 🙂Kevin Kho
Tim-Oliver
04/06/2022, 3:30 PMKevin Kho