Manuel Mourato
05/07/2020, 8:36 AMenv = RemoteEnvironment(
executor="prefect.engine.executors.DaskExecutor",
executor_kwargs={
"local_processes": True
}
)
The execution DAG is in the annexed image.
My understing is that , using multiprocesing, Task 2 and Task 4 would execute at the same time after Task 1, but in different processes, because they do not depend on each other. But the behaviour I see, is them executing sequentially, like wait() is being called between each process.
Is my understanding not correct?josh
05/07/2020, 1:42 PMlocal_processes: True
does it run asynchronously? I wonder if that has something to do with it because local_processes
defaults to False
and I frequently use a RemoteEnvironment & the Dask Executor with no other kwargs and my tasks will execute in parallel 😄Manuel Mourato
05/07/2020, 2:18 PM[2020-05-07 14:13:30,336] INFO - prefect.FlowRunner | Beginning Flow run for 'Test-Flow-1'
[2020-05-07 14:13:30,345] INFO - prefect.FlowRunner | Starting flow run.
[2020-05-07 14:13:30,367] INFO - prefect.TaskRunner | Task 'Task1': Starting task run...
[2020-05-07 14:13:30,367] INFO - prefect.TaskRunner | Fake Implementation
[2020-05-07 14:13:33,371] INFO - prefect.TaskRunner | Executed successfully!
[2020-05-07 14:13:33,378] INFO - prefect.TaskRunner | Task 'Task1': finished task run for task with final state: 'Success'
[2020-05-07 14:13:33,395] INFO - prefect.TaskRunner | Task 'Task2': Starting task run...
[2020-05-07 14:13:33,395] INFO - prefect.TaskRunner | Fake Implementation
[2020-05-07 14:13:36,398] INFO - prefect.TaskRunner | Executed successfully!
[2020-05-07 14:13:36,429] INFO - prefect.TaskRunner | Task 'Task2': finished task run for task with final state: 'Success'
[2020-05-07 14:13:36,453] INFO - prefect.TaskRunner | Task 'Task4': Starting task run...
[2020-05-07 14:13:36,454] INFO - prefect.TaskRunner | Fake Implementation
[2020-05-07 14:13:39,457] INFO - prefect.TaskRunner | Executed successfully!
[2020-05-07 14:13:39,504] INFO - prefect.TaskRunner | Task 'Task4': finished task run for task with final state: 'Success'
[2020-05-07 14:13:39,540] INFO - prefect.TaskRunner | Task 'Task3': Starting task run...
Fake Implementation
Data was sent!
[2020-05-07 14:13:42,552] INFO - prefect.TaskRunner | Task 'Task3': finished task run for task with final state: 'Success'
[2020-05-07 14:13:42,554] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
time.sleep(3)
for each taskjosh
05/07/2020, 2:20 PM"num_workers": 2
Manuel Mourato
05/07/2020, 2:27 PMjosh
05/07/2020, 2:28 PMManuel Mourato
05/07/2020, 2:29 PMjosh
05/07/2020, 2:30 PMManuel Mourato
05/07/2020, 2:55 PM