Kamil Okáč
11/25/2019, 9:14 PMfrom prefect import Flow, task
from prefect.engine.executors.dask import LocalDaskExecutor
@task
def add_ten(x):
return x + 10
if __name__ == '__main__':
with Flow('simple map') as flow:
mapped_result = add_ten.map([1, 2, 3])
executor = LocalDaskExecutor(scheduler='processes', num_workers=3)
flow.run(executor=executor)
The error is AssertionError: daemonic processes are not allowed to have children
Is there something wrong with this code or am I missing something else?CJ Wright
11/25/2019, 9:15 PMBen Doremus
11/25/2019, 9:16 PMKamil Okáč
11/25/2019, 9:20 PM[2019-11-25 21:06:43,605] INFO - prefect.FlowRunner | Beginning Flow run for 'simple map'
[2019-11-25 21:06:43,607] INFO - prefect.FlowRunner | Starting flow run.
[2019-11-25 21:06:43,629] INFO - prefect.TaskRunner | Task 'List': Starting task run...
[2019-11-25 21:06:43,631] INFO - prefect.TaskRunner | Task 'List': finished task run for task with final state: 'Success'
[2019-11-25 21:06:43,640] INFO - prefect.TaskRunner | Task 'add_ten': Starting task run...
[2019-11-25 21:06:43,642] ERROR - prefect.TaskRunner | Unexpected error: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/prefect/engine/task_runner.py", line 792, in wait_for_mapped_task
state.map_states = executor.wait(state.map_states)
File "/usr/local/lib/python3.6/dist-packages/prefect/engine/executors/dask.py", line 260, in wait
return dask.compute(futures)[0]
File "/home/kamil/.local/lib/python3.6/site-packages/dask/base.py", line 446, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/kamil/.local/lib/python3.6/site-packages/dask/multiprocessing.py", line 175, in get
pool = context.Pool(num_workers, initializer=initialize_worker_process)
File "/usr/lib/python3.6/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/usr/lib/python3.6/multiprocessing/pool.py", line 174, in __init__
self._repopulate_pool()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 239, in _repopulate_pool
w.start()
File "/usr/lib/python3.6/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
[2019-11-25 21:06:43,645] INFO - prefect.TaskRunner | Task 'add_ten': finished task run for task with final state: 'Failed'
[2019-11-25 21:06:43,648] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
CJ Wright
11/25/2019, 9:28 PMscheduler= 'threads'
it should work. We plan on raising a more informative error if we're able to detect the problem prior to running the flow.
One last thing I noticed is that your calling flow.run
from inside of your flow and from a code organization standpoint we recommend separating opening the flow context (with Flow() as flow:
) from the executor creation and flow execution (flow.run()
)Kamil Okáč
11/26/2019, 6:50 AM