https://prefect.io logo
d

Daniil Ponizov

09/16/2021, 3:41 PM
Hello! does Prefect support asynchronous tasks?
k

Kevin Kho

09/16/2021, 3:42 PM
Hey @Daniil Ponizov, I think this is a good answer
upvote 1
z

Zach Angell

09/16/2021, 3:43 PM
We are also working on features to support asynchronous tasks 🙂
d

Daniil Ponizov

09/16/2021, 3:46 PM
how can i get event loop inside task run?
k

Kevin Kho

09/16/2021, 3:49 PM
How would you do it without Prefect?
get_event_loop()
?
d

Daniil Ponizov

09/16/2021, 3:58 PM
you mean use it like this?
Copy code
@task
def some_task():
    loop = asyncio.get_event_loop()   loop.run_until_complete(asyncio.wait(futures))   

with Flow("flow_name") as fl:
    some_task()
sorry, cant format this snippet
when i try to hide event loop in task it doesn't see any coroutine and just finish
k

Kevin Kho

09/16/2021, 4:37 PM
Let me ask the team if anyone if familiar with async
e

emre

09/16/2021, 9:07 PM
Hey @Daniil Ponizov I tried a couple things and had success with the following snippet.
Copy code
@task
def async_sleeper(i):
    loop = asyncio.new_event_loop()

    tasks = [loop.create_task(some_coro()) for _ in range(i)]
    loop.run_until_complete(asyncio.wait(tasks))
I don't know what is happening, But using
loop.create_task
instead of
asyncio.create_task
did the trick for me. I can execute this task in a mapped fashion even.
DaskExecutor
works with the above setup, but
LocalDaskExecutor
does not
upvote 1
I recommend using
asyncio.new_event_loop
instead of the usual
get_event_loop
. event loops are single threaded by default. python gets confused when multiple threads try to access the single event loop owned by the python process. Instead, create a fresh event loop at the start of your tasks with
new_event_loop
. Also its probably best if you close the loop when the task finishes.
6 Views