Daniil Ponizov
09/16/2021, 3:41 PMKevin Kho
Zach Angell
Daniil Ponizov
09/16/2021, 3:46 PMKevin Kho
get_event_loop()
?Daniil Ponizov
09/16/2021, 3:58 PM@task
def some_task():
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(futures))
with Flow("flow_name") as fl:
some_task()
Kevin Kho
emre
09/16/2021, 9:07 PM@task
def async_sleeper(i):
loop = asyncio.new_event_loop()
tasks = [loop.create_task(some_coro()) for _ in range(i)]
loop.run_until_complete(asyncio.wait(tasks))
I don't know what is happening, But using loop.create_task
instead of asyncio.create_task
did the trick for me. I can execute this task in a mapped fashion even.
DaskExecutor
works with the above setup, but LocalDaskExecutor
does notasyncio.new_event_loop
instead of the usual get_event_loop
. event loops are single threaded by default. python gets confused when multiple threads try to access the single event loop owned by the python process.
Instead, create a fresh event loop at the start of your tasks with new_event_loop
. Also its probably best if you close the loop when the task finishes.