k

    Kamil Okáč

    2 years ago
    Hi guys, I'm having trouble using LocalDaskExecutor with simple mapping. The following code fails:
    from prefect import Flow, task
    from prefect.engine.executors.dask import LocalDaskExecutor
    
    @task
    def add_ten(x):
            return x + 10
    
    if __name__ == '__main__':
        with Flow('simple map') as flow:
            mapped_result = add_ten.map([1, 2, 3])
            executor = LocalDaskExecutor(scheduler='processes', num_workers=3)
            flow.run(executor=executor)
    The error is
    AssertionError: daemonic processes are not allowed to have children
    Is there something wrong with this code or am I missing something else?
    CJ Wright

    CJ Wright

    2 years ago
    Hi @Kamil Okáč let me look into this for you!
    b

    Ben Doremus

    2 years ago
    Nobody wants little demon children running around their offices.
    k

    Kamil Okáč

    2 years ago
    @CJ Wright thanks. This is the complete output, may it be of some help:
    [2019-11-25 21:06:43,605] INFO - prefect.FlowRunner | Beginning Flow run for 'simple map'
    [2019-11-25 21:06:43,607] INFO - prefect.FlowRunner | Starting flow run.
    [2019-11-25 21:06:43,629] INFO - prefect.TaskRunner | Task 'List': Starting task run...
    [2019-11-25 21:06:43,631] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'Success'
    [2019-11-25 21:06:43,640] INFO - prefect.TaskRunner | Task 'add_ten': Starting task run...
    [2019-11-25 21:06:43,642] ERROR - prefect.TaskRunner | Unexpected error: AssertionError('daemonic processes are not allowed to have children',)
    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/dist-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/prefect/engine/task_runner.py", line 792, in wait_for_mapped_task
        state.map_states = executor.wait(state.map_states)
      File "/usr/local/lib/python3.6/dist-packages/prefect/engine/executors/dask.py", line 260, in wait
        return dask.compute(futures)[0]
      File "/home/kamil/.local/lib/python3.6/site-packages/dask/base.py", line 446, in compute
        results = schedule(dsk, keys, **kwargs)
      File "/home/kamil/.local/lib/python3.6/site-packages/dask/multiprocessing.py", line 175, in get
        pool = context.Pool(num_workers, initializer=initialize_worker_process)
      File "/usr/lib/python3.6/multiprocessing/context.py", line 119, in Pool
        context=self.get_context())
      File "/usr/lib/python3.6/multiprocessing/pool.py", line 174, in __init__
        self._repopulate_pool()
      File "/usr/lib/python3.6/multiprocessing/pool.py", line 239, in _repopulate_pool
        w.start()
      File "/usr/lib/python3.6/multiprocessing/process.py", line 103, in start
        'daemonic processes are not allowed to have children'
    AssertionError: daemonic processes are not allowed to have children
    [2019-11-25 21:06:43,645] INFO - prefect.TaskRunner | Task 'add_ten': finished task run for task with final state: 'Failed'
    [2019-11-25 21:06:43,648] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    CJ Wright

    CJ Wright

    2 years ago
    Thanks for this
    Ok so this is a fairly simple fix Firstly, mapping isn't compatible with the process-based LocalDaskExecutor due to a limitation of multiprocessing if you change
    scheduler= 'threads'
    it should work. We plan on raising a more informative error if we're able to detect the problem prior to running the flow. One last thing I noticed is that your calling
    flow.run
    from inside of your flow and from a code organization standpoint we recommend separating opening the flow context (
    with Flow() as flow:
    ) from the executor creation and flow execution (
    flow.run()
    )
    Hope this helps!
    k

    Kamil Okáč

    2 years ago
    Thanks! Threads would be the way to go, however I can't use them in my case, as I believe that boto3 (which I use inside our real tasks) is not thread-safe.
    I checked again and found that it's possible to use boto3 in a thread-safe way.