Hello! does Prefect support asynchronous tasks?
# ask-community
d
Hello! does Prefect support asynchronous tasks?
k
Hey @Daniil Ponizov, I think this is a good answer
upvote 1
z
We are also working on features to support asynchronous tasks 🙂
d
how can i get event loop inside task run?
k
How would you do it without Prefect?
get_event_loop()
?
d
you mean use it like this?
Copy code
@task
def some_task():
    loop = asyncio.get_event_loop()   loop.run_until_complete(asyncio.wait(futures))   

with Flow("flow_name") as fl:
    some_task()
sorry, cant format this snippet
when i try to hide event loop in task it doesn't see any coroutine and just finish
k
Let me ask the team if anyone if familiar with async
e
Hey @Daniil Ponizov I tried a couple things and had success with the following snippet.
Copy code
@task
def async_sleeper(i):
    loop = asyncio.new_event_loop()

    tasks = [loop.create_task(some_coro()) for _ in range(i)]
    loop.run_until_complete(asyncio.wait(tasks))
I don't know what is happening, But using
loop.create_task
instead of
asyncio.create_task
did the trick for me. I can execute this task in a mapped fashion even.
DaskExecutor
works with the above setup, but
LocalDaskExecutor
does not
upvote 1
I recommend using
asyncio.new_event_loop
instead of the usual
get_event_loop
. event loops are single threaded by default. python gets confused when multiple threads try to access the single event loop owned by the python process. Instead, create a fresh event loop at the start of your tasks with
new_event_loop
. Also its probably best if you close the loop when the task finishes.