Nikola Lusic
05/31/2021, 10:05 PMfrom prefect import Flow, task
from prefect.executors import LocalDaskExecutor
@task
def process_key(key: str):
import time
time.sleep(3)
return key
with Flow("process_keys") as flow:
processed_keys = process_key.map([1,2,3])
if __name__ == '__main__':
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
flow.run()
This is the output:
[2021-06-01 00:01:36+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
However, if I change the scheduler from threads
to processes
, it seems the parallel execution is substituted with a serial execution:
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
@task
def process_key(key: str):
import time
time.sleep(3)
return key
with Flow("process_keys") as flow:
processed_keys = process_key.map([1,2,3])
if __name__ == '__main__':
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=3)
flow.run()
This is the new output:
[2021-06-01 00:04:09+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
[2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
[2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
[2021-06-01 00:04:18+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:18+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
I cannot get any scheduler=processes
flow to actually run in parallel, is there something I'm missing?Zach Angell
scheduler="processes"
works correctly for me locally. What version of Prefect and Dask are you using?
I have the latest Prefect version and Dask 2021.03.0
Nikola Lusic
05/31/2021, 10:56 PM2021.4.1
prefect: 0.14.17
Nikola Lusic
05/31/2021, 10:59 PM2021.03.0
the expected parallel execution occurs.Zach Angell
[0,...100]
with dask 2021.04.1
, it will execute in parallel as expected