https://prefect.io logo
#random
Title
# random
t

Tim-Oliver

04/06/2022, 2:07 PM
Hello, I have an issue with the
DaskTaskRunner
. Somehow it gets stuck on the last task and will not finish. This is my minimum example:
Copy code
from prefect import task, flow 
from prefect.tasks import task_input_hash 
import time 
from prefect.task_runners import SequentialTaskRunner, DaskTaskRunner 
 
@task 
def list_files(): 
   return ["1.txt", "2.txt"] 
 
@task 
def preprocess(in_file: str, target_dir: str): 
   time.sleep(1) 
   return f"{target_dir}-{in_file}" 
 
@task 
def segment(in_file: str, target_dir: str): 
   return f"{target_dir}-{in_file}" 
 
@task 
def mask_bg(in_file: str, mask: str, target_dir: str): 
   return f"{target_dir}-mask-{in_file}" 
 
@flow(name="Background Masking", task_runner=DaskTaskRunner()) 
def background_masking(): 
   files = list_files() 
   preprocessed = [] 
   segmented = [] 
   masked = [] 
   for f in files.wait().result(): 
       preprocessed.append(preprocess(f, target_dir="/preprocessed/")) 
       segmented.append(segment(preprocessed[-1], target_dir="/segmented/")) 
       masked.append(mask_bg(preprocessed[-1], segmented[-1], target_dir="/masked/")) 
        
if __name__ == "__main__": 
   result = background_masking()
If I run this locally (no remote file storage) with normal sqlite-db the processing does not finish. With
SequentialTaskRunner
or
ConcurrentTaskRunner
the script completes.
Copy code
prefect version 
Version:            2.0b2 
API version:        0.3.0 
Python version:     3.7.13 
Git commit:         b2a048c2 
Built:              Thu, Mar 17, 2022 2:24 PM 
OS/Arch:            linux/x86_64 
Profile:            default 
Server type:        ephemeral 
Server: 
 Database:         sqlite 
 SQLite version:   3.38.0
a

Anna Geller

04/06/2022, 2:14 PM
Dask workers are using a different working directory than your flow run. Can you change your logic to write e,g, to S3 instead?
t

Tim-Oliver

04/06/2022, 2:34 PM
I follow the first part, but I am a bit lost regarding the second part. Where am I writing?
Okay, I think I understand the second part as well:
The return values of the tasks are stored locally.
👍 1
a

Anna Geller

04/06/2022, 2:48 PM
exactly, the easiest way would be to have some S3 bucket for your Prefect flows. Then you can use it when creating default storage:
Copy code
prefect storage create # choose S3, set your bucket and set as default
then you can use it for flow storage, checkpointed flow results, as well as for writing custom data within your flows
k

Kevin Kho

04/06/2022, 2:48 PM
I can replicate I think. Will look into it
t

Tim-Oliver

04/06/2022, 2:52 PM
Currently I don't have access to a S3 bucket 😕 I am encouraged to keep most things in our local system..
k

Kevin Kho

04/06/2022, 2:52 PM
I think the issue here is actually that the stuff inside the
for
loop doesn’t have explicit dependencies on each other so they can be submitted out of order if we use the list index. like
preprocessed[-1]
may not be aligned with the current loop
t

Tim-Oliver

04/06/2022, 2:55 PM
hmm.. I get the feeling I should wait for the release of
map
in prefect 2.0 🙂
k

Kevin Kho

04/06/2022, 2:56 PM
that would simplify this yep
Yeah I think we can just open an issue for this. WIll make one now
🙏 1
t

Tim-Oliver

04/06/2022, 3:02 PM
although, I am not so sure if I follow your reasoning @Kevin Kho. Because this for-loop builds the list of futures (as far as I understand it) and is deterministic. and then the different tasks get dependencies on existing futures/tasks. even if they are submitted later in random order, the tasks "know" their order. Or maybe all I am saying is wrong because I don't develop your product. Then apoligies :D
Also if I write out files to disk from these tasks, all files are written except the one file from one of the `mask_bg`tasks. Could be coincidence..
k

Kevin Kho

04/06/2022, 3:12 PM
I thought about it more, and I was thinking the same thing. Even if the futures don’t come in the right order, the list holds it in the right order. Writing an issue now
t

Tim-Oliver

04/06/2022, 3:15 PM
This works:
Copy code
@flow(name="Background Masking", task_runner=DaskTaskRunner())
def background_masking():
    files = list_files()
    preprocessed = []
    segmented = []
    masked = []
    for f in files.wait().result():
        preprocessed.append(preprocess(f, target_dir="/preprocessed/"))
    for p in preprocessed:
        segmented.append(segment(p.wait().result(), target_dir="/segmented/"))
    for p, s in zip(preprocessed, segmented):
        masked.append(mask_bg(p.wait().result(), s.wait().result(), target_dir="/masked/"))
k

Kevin Kho

04/06/2022, 3:17 PM
But I assume you lose parallelism right?
t

Tim-Oliver

04/06/2022, 3:18 PM
Let me test this. I would assume that
preprocess
,
segment
, and
mask_bg
can still run in parallel, but there will be sync-wait times between these steps.
Copy code
(prefect_2)❯ python prefect_example_run.py 
17:18:56.138 | INFO   | prefect.engine - Created flow run 'hulking-gharial' for flow 'Background Masking' 
17:18:56.138 | INFO   | Flow run 'hulking-gharial' - Using task runner 'DaskTaskRunner' 
17:18:56.333 | INFO   | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster` 
17:18:57.996 | INFO   | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:8787/status> 
17:18:58.068 | INFO   | Flow run 'hulking-gharial' - Created task run 'list_files-806e65fc-0' for task 'list_files' 
17:18:58.775 | INFO   | Task run 'list_files-806e65fc-0' - Finished in state Completed(None) 
17:18:58.801 | INFO   | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-0' for task 'preprocess' 
17:18:58.825 | INFO   | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-1' for task 'preprocess' 
17:18:58.856 | INFO   | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-2' for task 'preprocess' 
17:18:58.999 | INFO   | Flow run 'hulking-gharial' - Created task run 'preprocess-5b673088-3' for task 'preprocess' 
17:19:00.110 | INFO   | Task run 'preprocess-5b673088-3' - Finished in state Completed(None) 
17:19:00.522 | INFO   | Task run 'preprocess-5b673088-0' - Finished in state Completed(None) 
17:19:00.550 | INFO   | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-0' for task 'segment' 
17:19:00.569 | INFO   | Task run 'preprocess-5b673088-2' - Finished in state Completed(None) 
17:19:00.580 | INFO   | Task run 'preprocess-5b673088-1' - Finished in state Completed(None) 
17:19:00.607 | INFO   | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-1' for task 'segment' 
17:19:00.638 | INFO   | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-2' for task 'segment' 
17:19:00.663 | INFO   | Flow run 'hulking-gharial' - Created task run 'segment-0f890022-3' for task 'segment' 
17:19:00.743 | INFO   | Task run 'segment-0f890022-1' - Finished in state Completed(None) 
17:19:00.766 | INFO   | Task run 'segment-0f890022-0' - Finished in state Completed(None) 
17:19:00.785 | INFO   | Task run 'segment-0f890022-2' - Finished in state Completed(None) 
17:19:00.798 | INFO   | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-0' for task 'mask_bg' 
17:19:00.827 | INFO   | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-1' for task 'mask_bg' 
17:19:00.854 | INFO   | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-2' for task 'mask_bg' 
17:19:00.865 | INFO   | Task run 'segment-0f890022-3' - Finished in state Completed(None) 
17:19:00.888 | INFO   | Flow run 'hulking-gharial' - Created task run 'mask_bg-368f0fa0-3' for task 'mask_bg' 
17:19:00.959 | INFO   | Task run 'mask_bg-368f0fa0-1' - Finished in state Completed(None) 
17:19:00.971 | INFO   | Task run 'mask_bg-368f0fa0-2' - Finished in state Completed(None) 
17:19:01.056 | INFO   | Task run 'mask_bg-368f0fa0-0' - Finished in state Completed(None) 
17:19:01.131 | INFO   | Task run 'mask_bg-368f0fa0-3' - Finished in state Completed(None) 
17:19:01.815 | INFO   | Flow run 'hulking-gharial' - Finished in state Completed('All states completed.')
Better than no parallelism (and sufficient if all files that are processed have similar size) but not as cool as the prefect 1 map 😲
That should be good enough for me for now. I can continue with prefect 2 and replace this approach with the awesome prefect2-
map
once it is released 🙂
Thank you both for the quick and helpful suggestions! This is just p~re~erfect!
k

Kevin Kho

04/06/2022, 3:26 PM
Ah ok sounds good!
Oh yeah btw, for next time, #random is really meant for random stuff so #prefect-community would be a better place to post this
t

Tim-Oliver

04/06/2022, 3:30 PM
Ah, sorry. Didn't know which channel to take. Thanks for the heads up.
k

Kevin Kho

04/06/2022, 3:32 PM
No worries!
10 Views