Hi guys, I'm having trouble using LocalDaskExecuto...
# prefect-community
k
Hi guys, I'm having trouble using LocalDaskExecutor with simple mapping. The following code fails:
Copy code
from prefect import Flow, task
from prefect.engine.executors.dask import LocalDaskExecutor

@task
def add_ten(x):
        return x + 10

if __name__ == '__main__':
    with Flow('simple map') as flow:
        mapped_result = add_ten.map([1, 2, 3])
        executor = LocalDaskExecutor(scheduler='processes', num_workers=3)
        flow.run(executor=executor)
The error is
AssertionError: daemonic processes are not allowed to have children
Is there something wrong with this code or am I missing something else?
c
Hi @Kamil Okáč let me look into this for you!
b
Nobody wants little demon children running around their offices.
😂 7
k
@CJ Wright thanks. This is the complete output, may it be of some help:
Copy code
[2019-11-25 21:06:43,605] INFO - prefect.FlowRunner | Beginning Flow run for 'simple map'
[2019-11-25 21:06:43,607] INFO - prefect.FlowRunner | Starting flow run.
[2019-11-25 21:06:43,629] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2019-11-25 21:06:43,631] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'Success'
[2019-11-25 21:06:43,640] INFO - prefect.TaskRunner | Task 'add_ten': Starting task run...
[2019-11-25 21:06:43,642] ERROR - prefect.TaskRunner | Unexpected error: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/prefect/engine/task_runner.py", line 792, in wait_for_mapped_task
    state.map_states = executor.wait(state.map_states)
  File "/usr/local/lib/python3.6/dist-packages/prefect/engine/executors/dask.py", line 260, in wait
    return dask.compute(futures)[0]
  File "/home/kamil/.local/lib/python3.6/site-packages/dask/base.py", line 446, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/kamil/.local/lib/python3.6/site-packages/dask/multiprocessing.py", line 175, in get
    pool = context.Pool(num_workers, initializer=initialize_worker_process)
  File "/usr/lib/python3.6/multiprocessing/context.py", line 119, in Pool
    context=self.get_context())
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 174, in __init__
    self._repopulate_pool()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 239, in _repopulate_pool
    w.start()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
[2019-11-25 21:06:43,645] INFO - prefect.TaskRunner | Task 'add_ten': finished task run for task with final state: 'Failed'
[2019-11-25 21:06:43,648] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
c
Thanks for this
Ok so this is a fairly simple fix Firstly, mapping isn't compatible with the process-based LocalDaskExecutor due to a limitation of multiprocessing if you change
scheduler= 'threads'
it should work. We plan on raising a more informative error if we're able to detect the problem prior to running the flow. One last thing I noticed is that your calling
flow.run
from inside of your flow and from a code organization standpoint we recommend separating opening the flow context (
with Flow() as flow:
) from the executor creation and flow execution (
flow.run()
)
Hope this helps!
k
Thanks! Threads would be the way to go, however I can't use them in my case, as I believe that boto3 (which I use inside our real tasks) is not thread-safe.
I checked again and found that it's possible to use boto3 in a thread-safe way.