Hello again I am trying to perform some multiproce...
# prefect-community
m
Hello again I am trying to perform some multiprocessing tests with a DaskExecutor. This is how I set the remote env for multiprocessing:
Copy code
env = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={
        "local_processes": True
    }

)
The execution DAG is in the annexed image. My understing is that , using multiprocesing, Task 2 and Task 4 would execute at the same time after Task 1, but in different processes, because they do not depend on each other. But the behaviour I see, is them executing sequentially, like wait() is being called between each process. Is my understanding not correct?
j
Personally I have not observed this behavior before! If you run this again without the
local_processes: True
does it run asynchronously? I wonder if that has something to do with it because
local_processes
defaults to
False
and I frequently use a RemoteEnvironment & the Dask Executor with no other kwargs and my tasks will execute in parallel 😄
upvote 1
m
Hey @josh!, Nope , same problem 😕 Here is the print:
Copy code
[2020-05-07 14:13:30,336] INFO - prefect.FlowRunner | Beginning Flow run for 'Test-Flow-1'
[2020-05-07 14:13:30,345] INFO - prefect.FlowRunner | Starting flow run.
[2020-05-07 14:13:30,367] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
[2020-05-07 14:13:30,367] INFO - prefect.TaskRunner | Fake Implementation
[2020-05-07 14:13:33,371] INFO - prefect.TaskRunner | Executed successfully!
[2020-05-07 14:13:33,378] INFO - prefect.TaskRunner | Task 'Task1': finished task run for task with final state: 'Success'
[2020-05-07 14:13:33,395] INFO - prefect.TaskRunner | Task 'Task2': Starting task run...
[2020-05-07 14:13:33,395] INFO - prefect.TaskRunner | Fake Implementation
[2020-05-07 14:13:36,398] INFO - prefect.TaskRunner | Executed successfully!
[2020-05-07 14:13:36,429] INFO - prefect.TaskRunner | Task 'Task2': finished task run for task with final state: 'Success'
[2020-05-07 14:13:36,453] INFO - prefect.TaskRunner | Task 'Task4': Starting task run...
[2020-05-07 14:13:36,454] INFO - prefect.TaskRunner | Fake Implementation
[2020-05-07 14:13:39,457] INFO - prefect.TaskRunner | Executed successfully!
[2020-05-07 14:13:39,504] INFO - prefect.TaskRunner | Task 'Task4': finished task run for task with final state: 'Success'
[2020-05-07 14:13:39,540] INFO - prefect.TaskRunner | Task 'Task3': Starting task run...
Fake Implementation
Data was sent!
[2020-05-07 14:13:42,552] INFO - prefect.TaskRunner | Task 'Task3': finished task run for task with final state: 'Success'
[2020-05-07 14:13:42,554] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
the code is just a demo: I do a
time.sleep(3)
for each task
time.sleep should affect only the thread where a task is running, so what I expected to see would be task2 and task4 executing at the same time...
j
Hmm 🤔 could you try setting the number of workers directly in the executor kwargs with something like
"num_workers": 2
m
Same problem....
j
Interesting, would you mind opening an issue on the repo with a reproducible example so we can triage and get more eyes on it?
m
Of course, thank you.
j
Np thanks @Manuel Mourato 🙂
m
@josh if you want to take a look: https://github.com/PrefectHQ/prefect/issues/2515