n

    Nikola Lusic

    1 year ago
    Hey guys, I'm having some issues with parallel execution. This is my flow:
    from prefect import Flow, task
    from prefect.executors import LocalDaskExecutor
    
    @task
    def process_key(key: str):
        import time
        time.sleep(3)
        return key
    
    with Flow("process_keys") as flow:
        processed_keys = process_key.map([1,2,3])
    
    if __name__ == '__main__':
        flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=3)
        flow.run()
    This is the output:
    [2021-06-01 00:01:36+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
    [2021-06-01 00:01:36+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
    [2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:01:39+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:01:39+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    However, if I change the scheduler from
    threads
    to
    processes
    , it seems the parallel execution is substituted with a serial execution:
    from prefect import Flow, task
    from prefect.executors import LocalDaskExecutor
    
    @task
    def process_key(key: str):
        import time
        time.sleep(3)
        return key
    
    with Flow("process_keys") as flow:
        processed_keys = process_key.map([1,2,3])
    
    if __name__ == '__main__':
        flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=3)
        flow.run()
    This is the new output:
    [2021-06-01 00:04:09+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'process_keys'
    [2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Starting task run...
    [2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key': Finished task run for task with final state: 'Mapped'
    [2021-06-01 00:04:09+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Starting task run...
    [2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[0]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:04:12+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Starting task run...
    [2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[1]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:04:15+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Starting task run...
    [2021-06-01 00:04:18+0200] INFO - prefect.TaskRunner | Task 'process_key[2]': Finished task run for task with final state: 'Success'
    [2021-06-01 00:04:18+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    I cannot get any
    scheduler=processes
    flow to actually run in parallel, is there something I'm missing?
    Zach Angell

    Zach Angell

    1 year ago
    Hi Nikola, running the script you provided with
    scheduler="processes"
    works correctly for me locally. What version of Prefect and Dask are you using? I have the latest Prefect version and Dask
    2021.03.0
    n

    Nikola Lusic

    1 year ago
    dask:
    2021.4.1
    prefect:
    0.14.17
    Interesting, when I downgrade Dask to version
    2021.03.0
    the expected parallel execution occurs.
    Zach Angell

    Zach Angell

    1 year ago
    Yeah I think this is just a minor change (possibly a bug) with how the dask scheduler. If you raise the amount of "keys" by mapping over
    [0,...100]
    with dask
    2021.04.1
    , it will execute in parallel as expected