Hey guys, I'm having some issues with parallel ex...
# ask-community
n
Hey guys, I'm having some issues with parallel execution. This is my flow:
Copy code
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

@task
def process_key(key: str):
    import time
    time.sleep(3)
    return key

with Flow("process_keys") as flow:
    processed_keys = process_key.map([1,2,3])

if __name__ == '__main__':
    flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
    flow.run()
This is the output:
Copy code
[2021-06-01 00:01:36+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
[2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
[2021-06-01 00:01:39+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
However, if I change the scheduler from
threads
to
processes
, it seems the parallel execution is substituted with a serial execution:
Copy code
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

@task
def process_key(key: str):
    import time
    time.sleep(3)
    return key

with Flow("process_keys") as flow:
    processed_keys = process_key.map([1,2,3])

if __name__ == '__main__':
    flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=3)
    flow.run()
This is the new output:
Copy code
[2021-06-01 00:04:09+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
[2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
[2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
[2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
[2021-06-01 00:04:18+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
[2021-06-01 00:04:18+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
I cannot get any
scheduler=processes
flow to actually run in parallel, is there something I'm missing?
z
Hi Nikola, running the script you provided with
scheduler="processes"
works correctly for me locally. What version of Prefect and Dask are you using? I have the latest Prefect version and Dask
2021.03.0
n
dask:
2021.4.1
prefect:
0.14.17
Interesting, when I downgrade Dask to version
2021.03.0
the expected parallel execution occurs.
z
Yeah I think this is just a minor change (possibly a bug) with how the dask scheduler. If you raise the amount of "keys" by mapping over
[0,...100]
with dask
2021.04.1
, it will execute in parallel as expected