The error is
from prefect import Flow, task from prefect.engine.executors.dask import LocalDaskExecutor @task def add_ten(x): return x + 10 if __name__ == '__main__': with Flow('simple map') as flow: mapped_result = add_ten.map([1, 2, 3]) executor = LocalDaskExecutor(scheduler='processes', num_workers=3) flow.run(executor=executor)
Is there something wrong with this code or am I missing something else?
AssertionError: daemonic processes are not allowed to have children
[2019-11-25 21:06:43,605] INFO - prefect.FlowRunner | Beginning Flow run for 'simple map' [2019-11-25 21:06:43,607] INFO - prefect.FlowRunner | Starting flow run. [2019-11-25 21:06:43,629] INFO - prefect.TaskRunner | Task 'List': Starting task run... [2019-11-25 21:06:43,631] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'Success' [2019-11-25 21:06:43,640] INFO - prefect.TaskRunner | Task 'add_ten': Starting task run... [2019-11-25 21:06:43,642] ERROR - prefect.TaskRunner | Unexpected error: AssertionError('daemonic processes are not allowed to have children',) Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/prefect/engine/task_runner.py", line 792, in wait_for_mapped_task state.map_states = executor.wait(state.map_states) File "/usr/local/lib/python3.6/dist-packages/prefect/engine/executors/dask.py", line 260, in wait return dask.compute(futures) File "/home/kamil/.local/lib/python3.6/site-packages/dask/base.py", line 446, in compute results = schedule(dsk, keys, **kwargs) File "/home/kamil/.local/lib/python3.6/site-packages/dask/multiprocessing.py", line 175, in get pool = context.Pool(num_workers, initializer=initialize_worker_process) File "/usr/lib/python3.6/multiprocessing/context.py", line 119, in Pool context=self.get_context()) File "/usr/lib/python3.6/multiprocessing/pool.py", line 174, in __init__ self._repopulate_pool() File "/usr/lib/python3.6/multiprocessing/pool.py", line 239, in _repopulate_pool w.start() File "/usr/lib/python3.6/multiprocessing/process.py", line 103, in start 'daemonic processes are not allowed to have children' AssertionError: daemonic processes are not allowed to have children [2019-11-25 21:06:43,645] INFO - prefect.TaskRunner | Task 'add_ten': finished task run for task with final state: 'Failed' [2019-11-25 21:06:43,648] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
it should work. We plan on raising a more informative error if we're able to detect the problem prior to running the flow. One last thing I noticed is that your calling
from inside of your flow and from a code organization standpoint we recommend separating opening the flow context (
) from the executor creation and flow execution (
with Flow() as flow: