https://prefect.io logo
Title
t

Tim-Oliver

04/06/2022, 2:07 PM
Hello, I have an issue with the
DaskTaskRunner
. Somehow it gets stuck on the last task and will not finish. This is my minimum example:
from prefect import task, flow 
from prefect.tasks import task_input_hash 
import time 
from prefect.task_runners import SequentialTaskRunner, DaskTaskRunner 
 
@task 
def list_files(): 
   return ["1.txt", "2.txt"] 
 
@task 
def preprocess(in_file: str, target_dir: str): 
   time.sleep(1) 
   return f"{target_dir}-{in_file}" 
 
@task 
def segment(in_file: str, target_dir: str): 
   return f"{target_dir}-{in_file}" 
 
@task 
def mask_bg(in_file: str, mask: str, target_dir: str): 
   return f"{target_dir}-mask-{in_file}" 
 
@flow(name="Background Masking", task_runner=DaskTaskRunner()) 
def background_masking(): 
   files = list_files() 
   preprocessed = [] 
   segmented = [] 
   masked = [] 
   for f in files.wait().result(): 
       preprocessed.append(preprocess(f, target_dir="/preprocessed/")) 
       segmented.append(segment(preprocessed[-1], target_dir="/segmented/")) 
       masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/masked/")) 
        
if __name__ == "__main__": 
   result = background_masking()
If I run this locally (no remote file storage) with normal sqlite-db the processing does not finish. With
SequentialTaskRunner
or
ConcurrentTaskRunner
the script completes.
prefect version 
Version:            2.0b2 
API version:        0.3.0 
Python version:     3.7.13 
Git commit:         b2a048c2 
Built:              Thu, Mar 17, 2022 2:24 PM 
OS/Arch:            linux/x86_64 
Profile:            default 
Server type:        ephemeral 
Server: 
 Database:         sqlite 
 SQLite version:   3.38.0
a

Anna Geller

04/06/2022, 2:14 PM
Dask workers are using a different working directory than your flow run. Can you change your logic to write e,g, to S3 instead?
t

Tim-Oliver

04/06/2022, 2:34 PM
I follow the first part, but I am a bit lost regarding the second part. Where am I writing?
Okay, I think I understand the second part as well:
The return values of the tasks are stored locally.
๐Ÿ‘ 1
a

Anna Geller

04/06/2022, 2:48 PM
exactly, the easiest way would be to have some S3 bucket for your Prefect flows. Then you can use it when creating default storage:
prefect storage create # choose S3, set your bucket and set as default
then you can use it for flow storage, checkpointed flow results, as well as for writing custom data within your flows
k

Kevin Kho

04/06/2022, 2:48 PM
I can replicate I think. Will look into it
t

Tim-Oliver

04/06/2022, 2:52 PM
Currently I don't have access to a S3 bucket ๐Ÿ˜• I am encouraged to keep most things in our local system..
k

Kevin Kho

04/06/2022, 2:52 PM
I think the issue here is actually that the stuff inside the
for
loop doesnโ€™t have explicit dependencies on each other so they can be submitted out of order if we use the list index. like
preprocessed[-1]
may not be aligned with the current loop
t

Tim-Oliver

04/06/2022, 2:55 PM
hmm.. I get the feeling I should wait for the release of
map
in prefect 2.0 ๐Ÿ™‚
k

Kevin Kho

04/06/2022, 2:56 PM
that would simplify this yep
Yeah I think we can just open an issue for this. WIll make one now
๐Ÿ™ 1
t

Tim-Oliver

04/06/2022, 3:02 PM
although, I am not so sure if I follow your reasoning @Kevin Kho. Because this for-loop builds the list of futures (as far as I understand it) and is deterministic. and then the different tasks get dependencies on existing futures/tasks. even if they are submitted later in random order, the tasks "know" their order. Or maybe all I am saying is wrong because I don't develop your product. Then apoligies :D
Also if I write out files to disk from these tasks, all files are written except the one file from one of the `mask_bg`tasks. Could be coincidence..
k

Kevin Kho

04/06/2022, 3:12 PM
I thought about it more, and I was thinking the same thing. Even if the futures donโ€™t come in the right order, the list holds it in the right order. Writing an issue now
t

Tim-Oliver

04/06/2022, 3:15 PM
This works:
@flow(name="Background Masking", task_runner=DaskTaskRunner())
def background_masking():
    files = list_files()
    preprocessed = []
    segmented = []
    masked = []
    for f in files.wait().result():
        preprocessed.append(preprocess(f, target_dir="/preprocessed/"))
    for p in preprocessed:
        segmented.append(segment(p.wait().result(), target_dir="/segmented/"))
    for p, s in zip(preprocessed, segmented):
        masked.append(mask_bg(p.wait().result(), s.wait().result(), target_dir="/masked/"))
k

Kevin Kho

04/06/2022, 3:17 PM
But I assume you lose parallelism right?
t

Tim-Oliver

04/06/2022, 3:18 PM
Let me test this. I would assume that
preprocess
,
segment
, and
mask_bg
can still run in parallel, but there will be sync-wait times between these steps.
(prefect_2)โฏ python prefect_example_run.py 
17:18:56.138 | INFO   | prefect.engine - Created flow run 'hulking-gharial' for flow 'Background Masking' 
17:18:56.138 | INFO   | Flow run 'hulking-gharial' - Using task runner 'DaskTaskRunner' 
17:18:56.333 | INFO   | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster` 
17:18:57.996 | INFO   | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:8787/status> 
17:18:58.068 | INFO   | Flow run 'hulking-gharial' - Created task run 'list_files-806e65fc-0' for task 'list_files' 
17:18:58.775 | INFO   | Task run 'list_files-806e65fc-0' - Finished in state Completed(None) 
17:18:58.801 | INFO   | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-0' for task 'preprocess' 
17:18:58.825 | INFO   | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-1' for task 'preprocess' 
17:18:58.856 | INFO   | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-2' for task 'preprocess' 
17:18:58.999 | INFO   | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-3' for task 'preprocess' 
17:19:00.110 | INFO   | Task run 'preprocess-5b673088-3' - Finished in state Completed(None) 
17:19:00.522 | INFO   | Task run 'preprocess-5b673088-0' - Finished in state Completed(None) 
17:19:00.550 | INFO   | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-0' for task 'segment' 
17:19:00.569 | INFO   | Task run 'preprocess-5b673088-2' - Finished in state Completed(None) 
17:19:00.580 | INFO   | Task run 'preprocess-5b673088-1' - Finished in state Completed(None) 
17:19:00.607 | INFO   | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-1' for task 'segment' 
17:19:00.638 | INFO   | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-2' for task 'segment' 
17:19:00.663 | INFO   | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-3' for task 'segment' 
17:19:00.743 | INFO   | Task run 'segment-0f890022-1' - Finished in state Completed(None) 
17:19:00.766 | INFO   | Task run 'segment-0f890022-0' - Finished in state Completed(None) 
17:19:00.785 | INFO   | Task run 'segment-0f890022-2' - Finished in state Completed(None) 
17:19:00.798 | INFO   | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-0' for task 'mask_bg' 
17:19:00.827 | INFO   | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-1' for task 'mask_bg' 
17:19:00.854 | INFO   | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-2' for task 'mask_bg' 
17:19:00.865 | INFO   | Task run 'segment-0f890022-3' - Finished in state Completed(None) 
17:19:00.888 | INFO   | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-3' for task 'mask_bg' 
17:19:00.959 | INFO   | Task run 'mask_bg-368f0fa0-1' - Finished in state Completed(None) 
17:19:00.971 | INFO   | Task run 'mask_bg-368f0fa0-2' - Finished in state Completed(None) 
17:19:01.056 | INFO   | Task run 'mask_bg-368f0fa0-0' - Finished in state Completed(None) 
17:19:01.131 | INFO   | Task run 'mask_bg-368f0fa0-3' - Finished in state Completed(None) 
17:19:01.815 | INFO   | Flow run 'hulking-gharial' - Finished in state Completed('All states completed.')
Better than no parallelism (and sufficient if all files that are processed have similar size) but not as cool as the prefect 1 map ๐Ÿ˜ฒ
That should be good enough for me for now. I can continue with prefect 2 and replace this approach with the awesome prefect2-
map
once it is released ๐Ÿ™‚
Thank you both for the quick and helpful suggestions! This is just p~re~erfect!
k

Kevin Kho

04/06/2022, 3:26 PM
Ah ok sounds good!
Oh yeah btw, for next time, #random is really meant for random stuff so #prefect-community would be a better place to post this
t

Tim-Oliver

04/06/2022, 3:30 PM
Ah, sorry. Didn't know which channel to take. Thanks for the heads up.
k

Kevin Kho

04/06/2022, 3:32 PM
No worries!